RabbitMQ のクラスタリング機能にキューのミラーリングが追加されたので RabbitFoot (AnyEvent::RabbitMQ) から試してみる
クラスタリングやキューのミラーリングの詳細は、下記参照の事。
RabbitMQ をクラスタリングする
今回はサーバを複数用意できなかったので、一つのサーバ上で RabbitMQ を二つ起動する。
% RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit1 rabbitmq-server % RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit2 rabbitmq-server
起動したら、rabbit2 の方を一旦停止(ErlangVM は停止しない。RabbitMQ だけ停止)して、クラスタリングの設定を行ってから再起動する。
% rabbitmqctl -n rabbit2 stop_app % rabbitmqctl -n rabbit2 reset % rabbitmqctl -n rabbit2 cluster rabbit1@`hostname -s` % rabbitmqctl -n rabbit2 start_app
クラスタリング構成が正しく組まれているか確認するには、下記のコマンドを実行する。
% rabbitmqctl -n rabbit1 cluster_status Cluster status of node rabbit1@ljob04 ... [{nodes,[{disc,[rabbit1@ljob04]},{ram,[rabbit2@ljob04]}]}, {running_nodes,[rabbit2@ljob04,rabbit1@ljob04]}] ...done.
ミラーリングされたキューを作る
今回は、Perl Script からキューを作る。
use Coro; use Net::RabbitFoot; my $rf = Net::RabbitFoot->new( verbose => 1 )->load_xml_spec()->connect( host => 'localhost', port => 5672, user => 'guest', pass => 'guest', vhost => '/', timeout => 1, ); my $ch = $rf->open_channel(); $ch->declare_queue( queue => 'test_q', arguments => { 'x-ha-policy' => 'all', }, ); $rf->close();
arguments テーブルを使用しているので AMQP 0-8 から使える。
x-ha-policy に 'all' を指定しているので、クラスタリングされている全てのノード上にキューがミラーリングされる。
x-ha-policy の詳細は、RabbitMQ - Highly Available Queues 参照の事。
キューのミラーリング状態を確認するには、下記のコマンドを実行する。
% rabbitmqctl -n rabbit1 list_queues name messages pid slave_pids synchronised_slave_pids Listing queues ... test_q 0 <rabbit1@ljob04.2.5022.0> [<rabbit2@ljob04.1.4932.0>] [<rabbit2@ljob04.1.4932.0>] ...done.
上記の場合、マスターとなるキューは rabbit1 上に存在し、コピーが rabbit2 に存在している。
念のため、幾つかメッセージをキューに追加しておく。
use Data::Dumper; for (1..2) { $ch->publish( routing_key => 'test_q', body => 'Hello HA Queue.', on_return => unblock_sub {die Dumper(shift)}, ); }
ノードを停止・起動してみる
キューのマスターが存在している rabbit1 を停止する。
% rabbitmqctl -n rabbit1 stop
状態を確認してみると…
% rabbitmqctl -n rabbit2 list_queues name messages pid slave_pids synchronised_slave_pids Listing queues ... test_q 2 <rabbit2@ljob04.1.4932.0> [] [] ...done.
キューのマスターが rabbit2 に移っており、コピーは空になっている。
この状態でキューにメッセージを追加しておく(この時点でメッセージ数 4)。追加の方法は、前述を参照の事。
次に、rabbit1 を起動する。
% RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit1 rabbitmq-server
クラスタリングの設定は保存されているので、これだけで良い。
状態を確認してみると…
% rabbitmqctl -n rabbit2 list_queues name messages pid slave_pids synchronised_slave_pids Listing queues ... test_q 4 <rabbit2@ljob04.1.4932.0> [<rabbit1@ljob04.3.225.0>] [] ...done.
コピーがノード上に作られているが、同期は行われていない。
詳細は、RabbitMQ - Highly Available Queues の Unsynchronised Slaves を参照の事。
この状態で、更にキューにメッセージを追加しておく(この時点でメッセージ数 6)。
ここでメッセージを 4 つ受信すると同期状態となる(1 つずつメッセージを受信した方が解りやすい)。
for (1..4) { my $response = $ch->get(queue => 'test_q'); }