Amqpcppライブラリ。 パート2-キュー

記事「librabbitmqのLib amqpcppラッパー」は、AMQPメッセージの公開をレビューしました。 この記事はその続きであり、以下のキューの使用方法について説明しています。



Exchangeがメッセージを公開するように設計されている場合、キューはメッセージを受信するように設計されています。 Exchangeとキューの間で、ルーティングキー(routing_key)を介して接続が確立されます(バインドまたはバインドを頻繁に変換します)。

Exchangeに似たキューを宣言する必要があります。
AMQP amqp ;

auto_ptr < AMQPQueue > qu amqp。createQueue "q2" ;

qu- >宣言

//または

auto_ptr < AMQPQueue > qu2 amqp。createQueue ;

qu2- > Declare "q2" ;


キューには次のものがあります。



パラメーターが指定されていない場合、デフォルトでキューは自己削除(AMQP_AUTODELETE)として宣言されます。 以下の例では、キューは自己削除および永続的として宣言されています。

AMQP amqp ;

auto_ptr < AMQPQueue > qu3 amqp。createQueue ;

qu3- > Declare "q2" 、AMQP_AUTODELETE | AMQP_DURABLE ; //永続モードおよび自動削除モード
キューには次のものがあります。



キューは、Exchangeにアタッチ(バインド)またはアンタイド(アンバインド)できます。 バインディングはルーティングキーを介して行われます。 キーは単純なものでも複合的なものでもかまいません。 複合キーの場合、パターンが使用されます。 たとえば、不動産取引所で公開されているすべてのニュースを購読しています。 キーは「* .news」にするか、サンクトペテルブルクに関連するすべてのメッセージ「spb。*」に登録します。 パート1で既に説明したように、Exchangeには、直接、トピック、ファンアウトの3つのタイプがあります。 トピックタイプの交換の場合、直接タイプの交換の場合、パトレンを使用できます-単純なキーのみが使用され、タイプファンアウトはキーをまったく使用しないため、キー値は純粋に形式的に指定され、空の文字列が指定されます。
AMQP amqp ;

auto_ptr < AMQPQueue > qu3 amqp。createQueue ;

//キューと交換は以前に発表された

qu3- > Bind "ex""news" ; //永続モードおよび自動削除モード

キューがエクスチェンジに接続され、バインディングキーを持つメッセージがエクスチェンジで公開されている場合、これらのメッセージは対応するキューにリダイレクトされます。 メッセージキューから読み取る方法は2つあります。

AMQPQueu :: Get()メソッドは、キューから1つのメッセージを読み取ります。 メッセージを読み取るとき、情報はヘッダーフレームで送信されます-キューに残っているメッセージの数。 以下に例を示します。

AMQP amqp ;

auto_ptr < AMQPQueue > qu3 amqp。createQueue ;



while 1 {

qu2- > Get ;

auto_ptr < AMQPMessage > m qu2- > getMessage ;



cout << "count:" << m- > getMessageCount << endl ;

if m- > getMessageCount > - 1 {

cout << "メッセージ\ n " << m- > getMessage << " \ nメッセージキー:" << m- > getRoutingKey << endl ;

cout << "exchange:" << m- > getExchange << endl ;

} その他

休憩 ;

}

AMQPQueu :: Get()メソッドには、このメッセージを「既読」としてマークしないようブローカーに「伝える」AMQP_NOACKパラメーターがあります。 AMQP_NOACKパラメーターと一緒にAMQPQueu :: Ack()メソッドが使用され、メッセージが配信されたことを確認します。 すべてのメッセージ情報はAMQPMessageデータオブジェクトにカプセル化されます。 Messageオブジェクトには、フィールドにアクセスするためのメソッドがあり、名前はそれ自体を表しています:getMessage()、getExchange()、getRoutingKey()、get MessageCount()。 getConsumerTag()およびgetDeliveryTag()メソッドに注目する必要があります。



サブスクライバータグ(consumer_tag)は、公開時に割り当てられるか、セッション名などのようにブローカーによって自動的に割り当てられる個別の一意の行です。 Cancelコマンドを送信し、dataパラメーターにconsumer_tagを渡すことで、たとえばAMQPQueu :: Cancel(m-> getConsumerTag())のように登録を解除できます。



配信タグは、このセッションの配信されたメッセージのカウンターに等しい数値であり、最初のメッセージの場合、配信タグは-1、2番目の-2、3番目の-3などになります。 メッセージの受信を確認するには、AMQPQueu :: Ack(delivery_tag)メソッドを呼び出す必要があります。delivery_tag変数には配信タグの値が含まれます。
AMQP amqp ;

auto_ptr < AMQPQueue > qu3 amqp。createQueue ;



while 1 {

qu2- > Get AMQP_NOACK ;

auto_ptr < AMQPMessage > m qu2- > getMessage ;



if m- > getMessageCount > - 1 {

qu2- > Ack m- > getDeliveryTag ;

} その他

休憩 ;

}

AMQPQueue :: Getメソッドとは異なり、AMQPQueue :: Consumeメソッドには同期受信スキームがあるため、ここではイベントモデルが使用されます。 Subscribeメソッドを使用する前に、AMQP_MESSAGEイベントを追加する必要があります。 イベントハンドラは、入力パラメータがメッセージデータである関数です。 また、出力はブール値です:停止/データ受信なし。 例でより明確に:
AMQP amqp ;

auto_ptr < AMQPQueue > qu amqp。createQueue "q2" ;

qu- >宣言

//または

auto_ptr < AMQPQueue > qu2 amqp。createQueue ;

qu2- > Declare "q2" ;
static int i = 0 ;



int onMessage AMQPMessage * message {

char * data = message- > getMessage ;

if データ

cout << data << endl ;

i ++ ;



cout << "#" << i << "tag =" << message- > getDeliveryTag << endl ;

if i > 5

1を 返し ます。

0を 返し ます

} ;

ゼロが印刷されなかった;)



どうする
このライブラリは完全なふりをしているわけではありません。もちろん、イベントロジックを開発し、onCancel、onSignal、onTimerなどのイベントを追加したいと思います。

サブスクリプション用にマルチスレッドを作成すると思います。 非ブロッキングソケットですべてを行うには、librabbitmqのネットワーク部分を書き換える必要がある場合があります。
関連リンク


ロシア語のAMQP

RabbitMQ:AMQPの概要

AMQP アプリケーションのデバッグ

PHP-AMQPフレンド向けの新機能

AMQP-PHPチャット
結論の代わりに
ベータステータスではありますが、比較的安定して動作します。 MPLライセンス。 プロジェクトを支援したい人はいつでも歓迎します。

バグはトラッカーで登録解除してください

さらなる開発のアイデアについては、ここまたはニュースレターで議論できます



PS。 ロシア語については、強く蹴らないようにお願いします;私はいつも彼と対立していました。

エラーを修正し、個人的に書き込みます。



All Articles