非同期処理の実行先のサーバが落ちていた場合

コメントで質問をもらいました。
# 興味をもって触っていただけてるようで嬉しいです。

長くなりそうだったので、こちらで... (^^;

sappie 2008/03/07 18:57
はじめまして、いつも参考にしています
キューのメッセージを受信して処理するサーバーが落ちていた場合、AP4Rは受信アプリケーションを起動できず、メッセージはDLQに行くという動きで正しいのでしょうか。
キューに滞留するということはないのでしょうか。。。
永続化先はディスクです


答え: メッセージは DLQ にはいります。


「キューに滞留することはないのでしょうか。。。」の「。。。」の部分が気になりました。
キューに滞留してしまったのでしょうか? あるいは、キューに滞留したままでいて欲しいということでしょうか??


ちなみに、DLQ にはいったメッセージは、(手動ですが) 元のキューに戻すことができます。


メッセージの遅延実行や時間指定の実行、あるいは DLQ からの自動リカバリとか、やっぱりあったほうがいいですかね...


# あと細かいことですが、AP4R はキューにはいってきたメッセージを、非同期処理の実行先サーバに HTTP リクエストしているだけなので、「受信アプリケーションを起動できず」というよりは、サーバとの接続が確立できないといったほうが正確かな、と思います。



以下、HelloWorld サンプルでリカバリまでしてみます。
使っているのは、ap4r-0.3.6 と HelloWorld-0.3.6.tar.gz です。

# やたら長くなってしまいまたが、「で、メッセージはどうなったのか?」あたりからが本題です。



サンプルアプリの起動。

% cd HelloWorld
% ruby script/server


AP4R の起動。

% cd my_ap4r
% ruby script/mongrel_ap4r start -A config/queues_disk.cfg


http://localhost:3000 で サンプルの画面を開いて "HTTP POST" ボタンを押下。
ファイルに同期で Hello と出力され、その後、非同期で World と出力。
だいたい 10 秒くらい待ちます。


"非同期で World と出力" する部分は、今の状態では同期部分と同じサーバで処理されているので、別のサーバで処理するようにします。
以下でコメントアウトしている部分をコメントイン。これが URL 書き替え機能です。

この設定だと localhost:9999 に HTTP リクエストを送信します。
(そんなとこで待ってるサーバはたててないので、落ちてる状態と同じことになります)

  • config/queues_disk.cfg
dispatchers:
  -
    targets: queue.*
    threads: 1
#    modify_rules:
#      url: proc {|url| url.port = 9999}


AP4R を再起動。

% ruby script/mongrel_ap4r start -A config/queues_disk.cfg


http://localhost:3000 で サンプルの画面を開いて "HTTP POST" ボタンを押下。
Rails の処理は正常に終了しますが、AP4R でエラーが発生。以下に一部抜粋。

Transaction be2f5340-ce97-012a-5b3c-0019e337827a aborted by clientdispatch err #<
Errno::ECONNREFUSED: Connection refused - connect(2)>
/usr/local/lib/ruby/1.8/net/http.rb:560:in `initialize'
/usr/local/lib/ruby/1.8/net/http.rb:560:in `open'
/usr/local/lib/ruby/1.8/net/http.rb:560:in `connect'
/usr/local/lib/ruby/1.8/timeout.rb:48:in `timeout'

で、メッセージはどうなったのか?

% irb --simple-prompt -rubygems
>> require 'reliable-msg'
=> true
>> dlq = ReliableMsg::Queue.new "$dlq"
=> #<ReliableMsg::Queue:0x1c3cae8 @queue="$dlq">
>> qm = dlq.send :qm
=> #<DRb::DRbObject:0x1c39d0c @ref=nil, @uri="druby://localhost:6438">
>> qm.store.queues.keys
=> ["queue.async_world.execute_via_http", (省略), "$dlq"]
>> qm.store.queues["queue.async_world.execute_via_http"].size
=> 0
>> qm.store.queues["$dlq"].size
=> 1


滞留せずに、DLQ にはいってますね。

最後に、URL 書き替え機能の部分を再びコメントアウトして、AP4Rを再起動。
これで、非同期処理を実行するサーバが生きてる状態に復帰しました。


DLQ にはいっているメッセージをリカバリします。
以下のスクリプトでは,一見 DLQ に put しているように見えますが,実際には,m.headers の :queue で指定されたキューに入ることになります。

>> dlq.get{|m| dlq.put m.object, m.headers}
=> "5482fdc0-cee5-012a-5b3e-0019e337827a"
>> qm.store.queues["$dlq"].size
=> 0
>> qm.store.queues["queue.async_world.execute_via_http"].size
=> 0


DLQ は空になり、サンプルの画面をみると、World と出力されているのが確認できるはずです。