公開/購読
前の記事で、作業メッセージキューの作成について説明しました。 各メッセージは1つのハンドラー(ワーカー)に送信されると想定されていました。 この記事はタスクを複雑にします-複数のサブスクライバーにメッセージを送信します。 このパターンは「 パブリッシュ/サブスクライブ 」として知られています。
このパターンを理解するために、簡単なロギングシステムを作成します。 2つのプログラムで構成されます。1つ目はログを作成し、2つ目はそれらを読み取り、印刷します。
ロギングシステムでは、サブスクライバーが各メッセージを受信する各プログラム。 これにより、1人のサブスクライバーを起動してログをディスクに保存し、いつでも別のサブスクライバーを作成して画面にログを表示できます。
基本的に、各メッセージは各サブスクライバーにブロードキャストされます。
交換ポイント
前の記事では、キューを使用してメッセージを送受信しました。 次に、高度なRabbitメッセージングモデルを検討します。
前の記事の条件を思い出してください。
- プロデューサー(サプライヤー)-メッセージを送信するプログラム
- キュー-メッセージを保存するバッファー
- コンシューマー(サブスクライバー)-メッセージを受け入れるプログラム。
Rabbitメッセージ送信モデルの主なアイデアは、プロデューサーがキューにメッセージを直接送信しないことです。 実際、プロバイダはそのメッセージが特定のキューに到達したかどうかを知らないことが非常に多くあります。
代わりに、プロバイダーはアクセスポイントにメッセージを送信します。 アクセスポイントに複雑なものはありません。 アクセスポイントには2つの機能があります。
-サプライヤーからのメッセージを受信します。
-これらのメッセージをキューに送信します。
アクセスポイントは、着信メッセージの処理方法を正確に認識しています。 特定のキューまたは複数のキューにメッセージを送信するか、誰にも送信して削除しないでください。 これらのルールは、交換タイプで記述されます。
直接、トピック、ヘッダー、ファンアウトのいくつかのタイプがあります。 最後のタイプのファンアウトに焦点を当てます。 このタイプのアクセスポイントを作成して呼び出します-ログ:
$channel->exchange_declare('logs', 'fanout', false, false, false);
タイプのファンアウトは非常に単純です。 彼は、利用可能なすべてのキューにあるすべてのメッセージをコピーします。 これは、ロギングシステムに必要なものです。
アクセスポイントのリストを表示します。
サーバー上のすべてのアクセスポイントを表示するには、rabbitmqctlコマンドを実行する必要があります。
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
amqという名前のアクセスポイントのリストが表示されます*そして、デフォルトで使用される名前のないアクセスポイント(このタスクには適していません)。
アクセスポイントの名前。
以前の記事では、アクセスポイントについては何も知りませんでしたが、それでもキューに手紙を送ることができました。 空の文字列「」で識別されるデフォルトのアクセスポイントを使用したため、これが可能になりました。
以前に手紙を送った方法を思い出してください:
$channel->basic_publish($msg, '', 'hello');
デフォルトのアクセスポイントまたは匿名アクセスポイントがここで使用されます。メッセージはrouting_keyキーで識別されるキューに送信されます。 キー「routing_key」は、basic_publish関数の3番目のパラメーターを介して渡されます。
これで、名前付きアクセスポイントにメッセージを送信できます。
$channel->exchange_declare('logs', 'fanout', false, false, false); $channel->basic_publish($msg, 'logs');
タイムキュー:
今回は、キューの名前(「hello」または「task_queue」)を使用しました。 名前を付ける機能は、ワーカーに特定のキューを指定するのに役立ち、プロデューサーとサブスクライバーの間でキューを分割するのにも役立ちます。
ただし、ログシステムでは、一部だけでなく、すべてのメッセージをキューに送信する必要があります。 また、メッセージは古いものではなく、最新のものにする必要があります。 これには、2つのことが必要です。
-Rabbitに接続するたびに、新しいキューを作成するか、サーバーにランダムな名前を作成させます。
-サブスクライバーがRabbitから切断するたびに、キューを削除します。
php-amqplibクライアントでは、名前なしでキューにアクセスすると、自動的に生成された名前で一時キューを作成します。
list($queue_name, ,) = $channel->queue_declare("");
メソッドは、自動生成されたキュー名を返します。 「amq.gen-JzTY20BRgKO-HjmUJj0wLg。」のようになります。
要求された接続が終了すると、キューは自動的に削除されます。
バインディング
そのため、ファンアウトタイプのキューとキューがあります。 次に、アクセスポイントにメッセージをキューに送信するように指示する必要があります。 アクセスポイントとキューの関係は、バインディングと呼ばれます。
$channel->queue_bind($queue_name, 'logs');
これ以降、キューのメッセージはアクセスポイントを通過します。
rabbitmqctl list_bindingsコマンドを使用して、バインディングのリストを表示できます。
すべてのキューへの送信:
投稿を作成するプロデューサープログラムは、前の記事から変更されていません。 唯一の重要な違いは、デフォルトのアクセスポイントではなく、指定したアクセスポイントの「ログ」にメッセージをルーティングすることです。 メッセージを送信するときにキューの名前を指定する必要がありました。 ただし、ファンアウトタイプのアクセスポイントの場合、これは必要ありません。
スクリプトコードemit_log.phpを検討してください。
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "info: Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo " [x] Sent ", $data, "\n"; $channel->close(); $connection->close(); ?>
(emit_log.phpソース)
ご覧のとおり、接続を確立した後、アクセスポイントを作成します。 存在しないアクセスポイントの使用は禁止されているため、この手順が必要です。
アクセスポイントに関連付けられているキューがないため、アクセスポイントのメッセージは失われます。 しかし、これは私たちにとっては良いことです。アクセスポイントへのサブスクライバーが1人もいない限り、すべてのメッセージを安全に削除できます。
Receive_logs.phpサブスクライバーコード:
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
(receive_logs.phpソース)
ログをファイルに保存する場合は、コンソールを開いて次のように入力する必要があります。
$ php receive_logs.php > logs_from_rabbit.log
画面にログを表示する場合は、別のウィンドウを開いて次を入力します。
$ php receive_logs.php
そして、もちろん、メッセージプロデューサーの立ち上げ:
$ php emit_log.php
rabbitmqctl list_bindingsコマンドを使用して、コードがキューを正しく作成し、アクセスポイントに関連付けたことを確認できます。 2つの開いているreceive_logs.phpプログラムでは、次のものが必要です。
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
ここでは、データアクセスポイントのログが2つのキューで送信され、その名前が自動的に作成されることが示されています。 これはまさに我々が求めていたものです。
次の記事では、メッセージのサブセットのみを聞く方法について説明します。