Rails 以外の Ruby プログラムから AP4R を使ってみる (HTTP POST 編)
AP4R では、Rails とシームレスな連携ができるよう、プラグインを提供していますが、今回は Rails 以外の Ruby プログラムから非同期メッセージを送信してみようと思います。受信側は Rails アプリにしましたが、HTTP POST でリクエストしているだけなので、HTTP を解釈できるサーバであればメッセージを受けとって処理できるはずです。
#現状、シンプルな API はまだ十分に準備できていないので、ちょっと泥臭くなりました。 (^^;
以下の構成で非同期処理を実行します。
- 送信側: irb
- 受信側: Rails アプリ
- プロトコル: irb -> AP4R は dRuby、AP4R -> Rails は HTTP POST
- バージョン: ap4r-0.3.3、rails-1.2.3
■ 受信側の Rails アプリの準備
あまり深く考えずにさくっとつくります...
% rails test_for_accept_message % cd test_for_accept_message % ruby script/generate controller consumer accept
コントローラを以下のように修正します。
class ConsumerController < ApplicationController def accept puts params.inspect render :text => "true" end end
受けとっているメッセージの内容を確認するため、params の内容を出力しておきます。(あえて puts しなくても確認できますが...)
また、render :text => "true" は、AP4R の「おまじない」です。
現在の AP4R では、非同期処理の成功判定を次の 2 点の両方が満たされることで行っています。
- ステータス 200 番が返ってくること
- 文字列 "true" が 返ってくること
したがって、この「おまじない」を忘れると、アクション内での処理が正常に実行されても AP4R は処理が成功したと認識できず、メッセージを DLQ (エラーになったメッセージが格納されるキュー) に放り込んでしまいます。
さて、この状態で動作を確認しておきましょう。
% ruby script/server
以下の URL にブラウザからアクセスしたとき、true と表示されれば OK です。
コンソール出力はこんな感じ。
{"action"=>"accept", "controller"=>"consumer"} Processing ConsumerController#accept (for 127.0.0.1 at 2007-10-04 09:58:37) [GET] Session ID: 8f3c6b82977601a05afffd4b63f2fa4a Parameters: {"action"=>"accept", "controller"=>"consumer"} Completed in 0.00104 (964 reqs/sec) | Rendering: 0.00014 (13%) | 200 OK [http://localhost/consumer/accept]
■ AP4R を起動
ここは簡単に disk モードで実行しておきます。
別のコンソールを開いて、適当な場所で...
% ap4r_setup my_ap4r % cd my_ap4r % ruby script/mongrel_ap4r start -A config/queues_disk.cfg
設定ファイルはインストール時のまま。
% less config/queues_disk.cfg --- store: type: disk drb: host: port: 6438 acl: allow 127.0.0.1 allow ::1 allow 10.0.0.0/8 dispatchers: - targets: queue.* threads: 1
■ 送信側 (irb) からメッセージを送信
もうひとつコンソールを開きます。
AP4R の設定ファイルの dispatchers の項目を見ると "queue." ではじまるチャネルを監視しているので、以下のようにチャネル "queue.test" 宛にメッセージを送信します。
# 見やすいよう適当に改行を入れています % irb -rubygems -rap4r irb(main):001:0> q = ReliableMsg::Queue.new "queue.test" irb(main):002:0> q.put "hoge=fuge&foo=bar", {:dispatch_mode => :HTTP, :target_method => :POST, :target_url => "http://localhost:3000/consumer/accept", :queue => "queue.test", :delivery => :once} => "F63612f0-53d6-012a-cff0-0016cb9ad524"
put の第 1 引数は、メッセージのボディ部です。AP4R から非同期メッセージを送信する際のデフォルトの Content-type は "application/x-www-form-urlencoded" なので、URL エンコードしています。
第 2 引数は、メッセージのヘッダ部にあたります。
HTTP POST で AP4R から送信することを指定し、キューの名前も保持しておきます。これはメッセージの処理に失敗したとき、DLQ からのリカバリ時に必要になります。delivery は reliable-msg のもつオプションで、指定により配送モードが変わります。
詳細は ReliableMsg::Queue の RDoc をご覧ください。
# The following headers have special meaning: # * <tt>:delivery</tt> -- The message delivery mode. # * <tt>:queue</tt> -- Puts the message in the named queue. Otherwise, uses the queue # specified when creating this Queue object. # * <tt>:priority</tt> -- The message priority. Messages with higher priority are # retrieved first. # * <tt>:expires</tt> -- Message expiration in seconds. Messages do not expire unless # specified. Zero or +nil+ means no expiration. # * <tt>:expires_at</tt> -- Specifies when the message expires (timestamp). Alternative # to <tt>:expires</tt>. # * <tt>:max_deliveries</tt> -- Maximum number of attempts to deliver message, afterwhich # message moves to the DLQ. Minimum is 1 (deliver only once), default is 5 (deliver # up to 5 times). # Messages can be delivered using one of three delivery modes: # * <tt>:best_effort</tt> -- Attempt to deliver the message once. If the message expires or # cannot be delivered, discard the message. The is the default delivery mode. # * <tt>:repeated</tt> -- Attempt to deliver until message expires, or up to maximum # delivery attempts (see <tt>:max_deliveries</tt>). Afterwards, move message to # dead-letter queue. # * <tt>:once</tt> -- Attempt to deliver message exactly once. If message expires, or # first delivery attempt fails, move message to dead-letter queue.
さて、Rails アプリを起動したコンソールの出力を確認してみましょう。以下のように、アクションが実行され、メッセージのボディ部に渡したメッセージも params で受けとれていることが分かるでしょう。
{"action"=>"accept", "hoge"=>"fuge", "foo"=>"bar", "controller"=>"consumer"} Processing ConsumerController#accept (for 127.0.0.1 at 2007-10-04 10:01:05) [POST] Session ID: d78f6c371062850c4cc682fc19732946 Parameters: {"action"=>"accept", "hoge"=>"fuge", "foo"=>"bar", "controller"=>"consumer"} Completed in 0.00118 (847 reqs/sec) | Rendering: 0.00005 (3%) | 200 OK [http://localhost/consumer/accept]
注意点としては、送信側でのメッセージ保証となります。上記のやりかたでは、たとえば irb からのメッセージ送信中に、N/W の切断や AP4R プロセスの障害があった場合にメッセージが消失してしまう可能性があります。直前の処理が実行済みの場合に、確実に後続の非同期処理を実行する必要があるなら、こうした状況は許されません。対処法としては、送信前にメッセージをいったん保存するといったことが考えられます。
AP4R ではそのために SAF (Store and Forward) という機能を用意しています。SAF についてのもう少し詳しい解説はこちらにまとめています。
AP4R,Rubyで非同期メッセージング:第1回 軽量さと堅牢さを兼ね備えたメッセージング|gihyo.jp
http://gihyo.jp/dev/feature/01/ap4r/0001?page=3
ただ、Rails からの場合にはシンプルな API が用意されていますが、現状、素で使うとやっぱりちょっと泥臭くなります。
このあたりは今後もっと改善していく予定です。