非同期処理の実行先のサーバが落ちていた場合
コメントで質問をもらいました。
# 興味をもって触っていただけてるようで嬉しいです。
長くなりそうだったので、こちらで... (^^;
sappie 2008/03/07 18:57
はじめまして、いつも参考にしています
キューのメッセージを受信して処理するサーバーが落ちていた場合、AP4Rは受信アプリケーションを起動できず、メッセージはDLQに行くという動きで正しいのでしょうか。
キューに滞留するということはないのでしょうか。。。
永続化先はディスクです
答え: メッセージは DLQ にはいります。
「キューに滞留することはないのでしょうか。。。」の「。。。」の部分が気になりました。
キューに滞留してしまったのでしょうか? あるいは、キューに滞留したままでいて欲しいということでしょうか??
ちなみに、DLQ にはいったメッセージは、(手動ですが) 元のキューに戻すことができます。
メッセージの遅延実行や時間指定の実行、あるいは DLQ からの自動リカバリとか、やっぱりあったほうがいいですかね...
# あと細かいことですが、AP4R はキューにはいってきたメッセージを、非同期処理の実行先サーバに HTTP リクエストしているだけなので、「受信アプリケーションを起動できず」というよりは、サーバとの接続が確立できないといったほうが正確かな、と思います。
以下、HelloWorld サンプルでリカバリまでしてみます。
使っているのは、ap4r-0.3.6 と HelloWorld-0.3.6.tar.gz です。
# やたら長くなってしまいまたが、「で、メッセージはどうなったのか?」あたりからが本題です。
サンプルアプリの起動。
% cd HelloWorld % ruby script/server
AP4R の起動。
% cd my_ap4r % ruby script/mongrel_ap4r start -A config/queues_disk.cfg
http://localhost:3000 で サンプルの画面を開いて "HTTP POST" ボタンを押下。
ファイルに同期で Hello と出力され、その後、非同期で World と出力。
だいたい 10 秒くらい待ちます。
"非同期で World と出力" する部分は、今の状態では同期部分と同じサーバで処理されているので、別のサーバで処理するようにします。
以下でコメントアウトしている部分をコメントイン。これが URL 書き替え機能です。
この設定だと localhost:9999 に HTTP リクエストを送信します。
(そんなとこで待ってるサーバはたててないので、落ちてる状態と同じことになります)
- config/queues_disk.cfg
dispatchers: - targets: queue.* threads: 1 # modify_rules: # url: proc {|url| url.port = 9999}
AP4R を再起動。
% ruby script/mongrel_ap4r start -A config/queues_disk.cfg
http://localhost:3000 で サンプルの画面を開いて "HTTP POST" ボタンを押下。
Rails の処理は正常に終了しますが、AP4R でエラーが発生。以下に一部抜粋。
Transaction be2f5340-ce97-012a-5b3c-0019e337827a aborted by clientdispatch err #< Errno::ECONNREFUSED: Connection refused - connect(2)> /usr/local/lib/ruby/1.8/net/http.rb:560:in `initialize' /usr/local/lib/ruby/1.8/net/http.rb:560:in `open' /usr/local/lib/ruby/1.8/net/http.rb:560:in `connect' /usr/local/lib/ruby/1.8/timeout.rb:48:in `timeout'
で、メッセージはどうなったのか?
% irb --simple-prompt -rubygems >> require 'reliable-msg' => true >> dlq = ReliableMsg::Queue.new "$dlq" => #<ReliableMsg::Queue:0x1c3cae8 @queue="$dlq"> >> qm = dlq.send :qm => #<DRb::DRbObject:0x1c39d0c @ref=nil, @uri="druby://localhost:6438"> >> qm.store.queues.keys => ["queue.async_world.execute_via_http", (省略), "$dlq"] >> qm.store.queues["queue.async_world.execute_via_http"].size => 0 >> qm.store.queues["$dlq"].size => 1
滞留せずに、DLQ にはいってますね。
最後に、URL 書き替え機能の部分を再びコメントアウトして、AP4Rを再起動。
これで、非同期処理を実行するサーバが生きてる状態に復帰しました。
DLQ にはいっているメッセージをリカバリします。
以下のスクリプトでは,一見 DLQ に put しているように見えますが,実際には,m.headers の :queue で指定されたキューに入ることになります。
>> dlq.get{|m| dlq.put m.object, m.headers} => "5482fdc0-cee5-012a-5b3e-0019e337827a" >> qm.store.queues["$dlq"].size => 0 >> qm.store.queues["queue.async_world.execute_via_http"].size => 0
DLQ は空になり、サンプルの画面をみると、World と出力されているのが確認できるはずです。