Rails 以外の Ruby プログラムから AP4R を使ってみる (HTTP POST 編)

AP4R では、Rails とシームレスな連携ができるよう、プラグインを提供していますが、今回は Rails 以外の Ruby プログラムから非同期メッセージを送信してみようと思います。受信側は Rails アプリにしましたが、HTTP POST でリクエストしているだけなので、HTTP を解釈できるサーバであればメッセージを受けとって処理できるはずです。
#現状、シンプルな API はまだ十分に準備できていないので、ちょっと泥臭くなりました。 (^^;


以下の構成で非同期処理を実行します。


■ 受信側の Rails アプリの準備
あまり深く考えずにさくっとつくります...

% rails test_for_accept_message
% cd test_for_accept_message
% ruby script/generate controller consumer accept


コントローラを以下のように修正します。

class ConsumerController < ApplicationController

  def accept
    puts params.inspect
    render :text => "true"
  end

end


受けとっているメッセージの内容を確認するため、params の内容を出力しておきます。(あえて puts しなくても確認できますが...)
また、render :text => "true" は、AP4R の「おまじない」です。
現在の AP4R では、非同期処理の成功判定を次の 2 点の両方が満たされることで行っています。

  • ステータス 200 番が返ってくること
  • 文字列 "true" が 返ってくること

したがって、この「おまじない」を忘れると、アクション内での処理が正常に実行されても AP4R は処理が成功したと認識できず、メッセージを DLQ (エラーになったメッセージが格納されるキュー) に放り込んでしまいます。


さて、この状態で動作を確認しておきましょう。

% ruby script/server

以下の URL にブラウザからアクセスしたとき、true と表示されれば OK です。


コンソール出力はこんな感じ。

{"action"=>"accept", "controller"=>"consumer"}                                           


Processing ConsumerController#accept (for 127.0.0.1 at 2007-10-04 09:58:37) [GET]
  Session ID: 8f3c6b82977601a05afffd4b63f2fa4a
  Parameters: {"action"=>"accept", "controller"=>"consumer"}
Completed in 0.00104 (964 reqs/sec) | Rendering: 0.00014 (13%) | 200 OK [http://localhost/consumer/accept]


AP4R を起動
ここは簡単に disk モードで実行しておきます。
別のコンソールを開いて、適当な場所で...

% ap4r_setup my_ap4r
% cd my_ap4r
% ruby script/mongrel_ap4r start -A config/queues_disk.cfg

設定ファイルはインストール時のまま。

% less config/queues_disk.cfg
--- 
store: 
  type: disk
drb: 
  host: 
  port: 6438
  acl: allow 127.0.0.1 allow ::1 allow 10.0.0.0/8
dispatchers:
  -
    targets: queue.*
    threads: 1

■ 送信側 (irb) からメッセージを送信
もうひとつコンソールを開きます。
AP4R の設定ファイルの dispatchers の項目を見ると "queue." ではじまるチャネルを監視しているので、以下のようにチャネル "queue.test" 宛にメッセージを送信します。

# 見やすいよう適当に改行を入れています
% irb -rubygems -rap4r
irb(main):001:0> q = ReliableMsg::Queue.new "queue.test"
irb(main):002:0> q.put "hoge=fuge&foo=bar", {:dispatch_mode => :HTTP,
                                             :target_method => :POST,
                                             :target_url => "http://localhost:3000/consumer/accept",
                                             :queue => "queue.test",
                                             :delivery => :once}
=> "F63612f0-53d6-012a-cff0-0016cb9ad524"


put の第 1 引数は、メッセージのボディ部です。AP4R から非同期メッセージを送信する際のデフォルトの Content-type は "application/x-www-form-urlencoded" なので、URL エンコードしています。

第 2 引数は、メッセージのヘッダ部にあたります。
HTTP POST で AP4R から送信することを指定し、キューの名前も保持しておきます。これはメッセージの処理に失敗したとき、DLQ からのリカバリ時に必要になります。delivery は reliable-msg のもつオプションで、指定により配送モードが変わります。

詳細は ReliableMsg::Queue の RDoc をご覧ください。

# The following headers have special meaning:
# * <tt>:delivery</tt> -- The message delivery mode.
# * <tt>:queue</tt> -- Puts the message in the named queue. Otherwise, uses the queue
#   specified when creating this Queue object.
# * <tt>:priority</tt> -- The message priority. Messages with higher priority are
#   retrieved first.
# * <tt>:expires</tt> -- Message expiration in seconds. Messages do not expire unless
#   specified. Zero or +nil+ means no expiration.
# * <tt>:expires_at</tt> -- Specifies when the message expires (timestamp). Alternative
#   to <tt>:expires</tt>.
# * <tt>:max_deliveries</tt> -- Maximum number of attempts to deliver message, afterwhich
#   message moves to the DLQ. Minimum is 1 (deliver only once), default is 5 (deliver
#   up to 5 times).
        
# Messages can be delivered using one of three delivery modes:
# * <tt>:best_effort</tt> -- Attempt to deliver the message once. If the message expires or
#   cannot be delivered, discard the message. The is the default delivery mode.
# * <tt>:repeated</tt> -- Attempt to deliver until message expires, or up to maximum
#   delivery attempts (see <tt>:max_deliveries</tt>). Afterwards, move message to
#   dead-letter queue.
# * <tt>:once</tt> -- Attempt to deliver message exactly once. If message expires, or
#   first delivery attempt fails, move message to dead-letter queue.


さて、Rails アプリを起動したコンソールの出力を確認してみましょう。以下のように、アクションが実行され、メッセージのボディ部に渡したメッセージも params で受けとれていることが分かるでしょう。

{"action"=>"accept", "hoge"=>"fuge", "foo"=>"bar", "controller"=>"consumer"}             
                                                                                         
                                                                                         
Processing ConsumerController#accept (for 127.0.0.1 at 2007-10-04 10:01:05) [POST]       
  Session ID: d78f6c371062850c4cc682fc19732946                                           
  Parameters: {"action"=>"accept", "hoge"=>"fuge", "foo"=>"bar", "controller"=>"consumer"}                                                                                        
Completed in 0.00118 (847 reqs/sec) | Rendering: 0.00005 (3%) | 200 OK [http://localhost/consumer/accept]  

注意点としては、送信側でのメッセージ保証となります。上記のやりかたでは、たとえば irb からのメッセージ送信中に、N/W の切断や AP4R プロセスの障害があった場合にメッセージが消失してしまう可能性があります。直前の処理が実行済みの場合に、確実に後続の非同期処理を実行する必要があるなら、こうした状況は許されません。対処法としては、送信前にメッセージをいったん保存するといったことが考えられます。

AP4R ではそのために SAF (Store and Forward) という機能を用意しています。SAF についてのもう少し詳しい解説はこちらにまとめています。

AP4RRubyで非同期メッセージング:第1回 軽量さと堅牢さを兼ね備えたメッセージング|gihyo.jp
http://gihyo.jp/dev/feature/01/ap4r/0001?page=3


ただ、Rails からの場合にはシンプルな API が用意されていますが、現状、素で使うとやっぱりちょっと泥臭くなります。
このあたりは今後もっと改善していく予定です。