Perl で作成した RabbitMQ 専用クライアントライブラリを AnyEvent と Coro で非同期化しました
以前、AMQP と RabbitMQ を学ぶために RabbitFoot という名前の Perl 版のクライアントライブラリを作成したのですが、非同期化して欲しいと要望を頂いたので、AnyEvent と Coro を利用してみました。
AnyEvent と Coro は、今回、初めての利用となるため、識者からの厳しいツッコミがあると嬉しいです。
cooldaemon's RabbitFoot at master - GitHub
Consume 用の Channel を五つ、Publish 用の Channel を一つ開き、Channel 間でメッセージを送受信する例は、下記の通りです。
use Coro; use RabbitFoot; my $rf = RabbitFoot->new()->load_xml_spec( '/path/to/fixed_amqp0-8.xml', )->connect( host => 'localhosti', port => 5672, user => 'guest', port => 'guest', vhost => '/', ); my $main = $Coro::current; my $done = 0; my @queues = map { my $queue = 'test_q' . $_; my $ch = $rf->open_channel(); $ch->declare_queue(queue => $queue); my $frame; $frame = $ch->consume( queue => $queue, on_consume => unblock_sub { my $response = shift; return if 'stop' ne $response->{body}->payload; $ch->cancel(consumer_tag => $frame->method_frame->consumer_tag); $done++; $main->ready; schedule; }, ); $queue; } (1 .. 5); my $ch = $rf->open_channel(); for my $queue (@queues) { for (qw(hello stop)) { $ch->publish( routing_key => $queue, body => $message, ); } } schedule while $done < 5; $ch->delete_queue(queue => $_) for @queues; $rf->close;
consume メソッドの on_consume や、今回は利用していませんが publish メソッドの on_return には、sub ではなく unblock_sub を利用するのがポイントです。
sub を利用すると、イベントループからコールバック関数が呼ばれるので、Coro::rouse_cb を内部で利用している cancel は利用できません。
これで「一つのプロセスをスレッドにより分割し、一つのソケット接続を Channel により分割するので、資源を有効に利用できる」という AMQP の恩恵を、Perl 経由で享受できるハズです。*1
蛇足ですが、Coro はマルチコアに対応しているわけではないので、マルチコアを使い切りたいのであれば、プロセスを複数作る必要があります。