Exchangeがメッセージを公開するように設計されている場合、キューはメッセージを受信するように設計されています。 Exchangeとキューの間で、ルーティングキー(routing_key)を介して接続が確立されます(バインドまたはバインドを頻繁に変換します)。
Exchangeに似たキューを宣言する必要があります。
AMQP amqp ;
auto_ptr < AMQPQueue > qu ( amqp。createQueue ( "q2" ) ) ;
qu- >宣言( )
//または
auto_ptr < AMQPQueue > qu2 ( amqp。createQueue ( ) ) ;
qu2- > Declare ( "q2" ) ;
キューには次のものがあります。
- 自己削除(AMQP_AUTODELETE)、つまり 空で使用されていない場合は削除されます(クライアント接続なし)
- 保存済み(AMQP_DURABLE)つまり ブローカーがキューを再起動すると、データが保存されます
- 排他的(AMQP_EXCLUSIVE)つまり 1つの接続のみのために設計
- パッシブ(AMQP_PASSIVE)つまり 開始はクライアントから来ます
パラメーターが指定されていない場合、デフォルトでキューは自己削除(AMQP_AUTODELETE)として宣言されます。 以下の例では、キューは自己削除および永続的として宣言されています。
AMQP amqp ;キューには次のものがあります。
auto_ptr < AMQPQueue > qu3 ( amqp。createQueue ( ) ) ;
qu3- > Declare ( "q2" 、AMQP_AUTODELETE | AMQP_DURABLE ) ; //永続モードおよび自動削除モード
- AMQPQueueの決済::削除();
- AMQPQueueをクリア::パージ();
- サブスクリプションのリセットAMQPQueue ::キャンセル();
キューは、Exchangeにアタッチ(バインド)またはアンタイド(アンバインド)できます。 バインディングはルーティングキーを介して行われます。 キーは単純なものでも複合的なものでもかまいません。 複合キーの場合、パターンが使用されます。 たとえば、不動産取引所で公開されているすべてのニュースを購読しています。 キーは「* .news」にするか、サンクトペテルブルクに関連するすべてのメッセージ「spb。*」に登録します。 パート1で既に説明したように、Exchangeには、直接、トピック、ファンアウトの3つのタイプがあります。 トピックタイプの交換の場合、直接タイプの交換の場合、パトレンを使用できます-単純なキーのみが使用され、タイプファンアウトはキーをまったく使用しないため、キー値は純粋に形式的に指定され、空の文字列が指定されます。
AMQP amqp ;キューがエクスチェンジに接続され、バインディングキーを持つメッセージがエクスチェンジで公開されている場合、これらのメッセージは対応するキューにリダイレクトされます。 メッセージキューから読み取る方法は2つあります。
auto_ptr < AMQPQueue > qu3 ( amqp。createQueue ( ) ) ;
//キューと交換は以前に発表された
qu3- > Bind ( "ex" 、 "news" ) ; //永続モードおよび自動削除モード
- AMQPQueu :: Get()メソッドを介して、非同期的に。
- AMQPQueu :: Consume()メソッドを使用して同期的に。
AMQP amqp ;AMQPQueu :: Get()メソッドには、このメッセージを「既読」としてマークしないようブローカーに「伝える」AMQP_NOACKパラメーターがあります。 AMQP_NOACKパラメーターと一緒にAMQPQueu :: Ack()メソッドが使用され、メッセージが配信されたことを確認します。 すべてのメッセージ情報はAMQPMessageデータオブジェクトにカプセル化されます。 Messageオブジェクトには、フィールドにアクセスするためのメソッドがあり、名前はそれ自体を表しています:getMessage()、getExchange()、getRoutingKey()、get MessageCount()。 getConsumerTag()およびgetDeliveryTag()メソッドに注目する必要があります。
auto_ptr < AMQPQueue > qu3 ( amqp。createQueue ( ) ) ;
while ( 1 ) {
qu2- > Get ( ) ;
auto_ptr < AMQPMessage > m ( qu2- > getMessage ( ) ) ;
cout << "count:" << m- > getMessageCount ( ) << endl ;
if ( m- > getMessageCount ( ) > - 1 ) {
cout << "メッセージ\ n " << m- > getMessage ( ) << " \ nメッセージキー:" << m- > getRoutingKey ( ) << endl ;
cout << "exchange:" << m- > getExchange ( ) << endl ;
} その他
休憩 ;
}
サブスクライバータグ(consumer_tag)は、公開時に割り当てられるか、セッション名などのようにブローカーによって自動的に割り当てられる個別の一意の行です。 Cancelコマンドを送信し、dataパラメーターにconsumer_tagを渡すことで、たとえばAMQPQueu :: Cancel(m-> getConsumerTag())のように登録を解除できます。
配信タグは、このセッションの配信されたメッセージのカウンターに等しい数値であり、最初のメッセージの場合、配信タグは-1、2番目の-2、3番目の-3などになります。 メッセージの受信を確認するには、AMQPQueu :: Ack(delivery_tag)メソッドを呼び出す必要があります。delivery_tag変数には配信タグの値が含まれます。
AMQP amqp ;AMQPQueue :: Getメソッドとは異なり、AMQPQueue :: Consumeメソッドには同期受信スキームがあるため、ここではイベントモデルが使用されます。 Subscribeメソッドを使用する前に、AMQP_MESSAGEイベントを追加する必要があります。 イベントハンドラは、入力パラメータがメッセージデータである関数です。 また、出力はブール値です:停止/データ受信なし。 例でより明確に:
auto_ptr < AMQPQueue > qu3 ( amqp。createQueue ( ) ) ;
while ( 1 ) {
qu2- > Get ( AMQP_NOACK ) ;
auto_ptr < AMQPMessage > m ( qu2- > getMessage ( ) ) ;
if ( m- > getMessageCount ( ) > - 1 ) {
qu2- > Ack ( m- > getDeliveryTag ( ) ) ;
} その他
休憩 ;
}
AMQP amqp ;
auto_ptr < AMQPQueue > qu ( amqp。createQueue ( "q2" ) ) ;
qu- >宣言( )
//または
auto_ptr < AMQPQueue > qu2 ( amqp。createQueue ( ) ) ;
qu2- > Declare ( "q2" ) ;
static int i = 0 ;
int onMessage ( AMQPMessage * message ) {
char * data = message- > getMessage ( ) ;
if (データ)
cout << data << endl ;
i ++ ;
cout << "#" << i << "tag =" << message- > getDeliveryTag ( ) << endl ;
if ( i > 5 )
1を 返し ます。
0を 返し ます 。
} ;
ゼロが印刷されなかった;)
どうする
このライブラリは完全なふりをしているわけではありません。もちろん、イベントロジックを開発し、onCancel、onSignal、onTimerなどのイベントを追加したいと思います。サブスクリプション用にマルチスレッドを作成すると思います。 非ブロッキングソケットですべてを行うには、librabbitmqのネットワーク部分を書き換える必要がある場合があります。
関連リンク
ロシア語のAMQP
RabbitMQ:AMQPの概要
AMQP アプリケーションのデバッグ
PHP-AMQPフレンド向けの新機能
AMQP-PHPチャット
結論の代わりに
ベータステータスではありますが、比較的安定して動作します。 MPLライセンス。 プロジェクトを支援したい人はいつでも歓迎します。バグはトラッカーで登録解除してください
さらなる開発のアイデアについては、ここまたはニュースレターで議論できます。
PS。 ロシア語については、強く蹴らないようにお願いします;私はいつも彼と対立していました。
エラーを修正し、個人的に書き込みます。