RabbitMQ-保留中のメッセージ

画像



Habréには、RabbitMQの公式マニュアルの翻訳シリーズがあります(1、2、3、4、5)。 残念ながら、公式のリーダーシップは保留中のメッセージを整理する問題に対処していませんが、この問題は非常に重要だと思います。 したがって、私は自分でそのような記事を書くことにしました。



コード例はPearlにありますが、コードにPearl固有の詳細はないため、例は他の言語に比較的簡単に適合させることができます。



問題の声明



時々、「今すぐ正しく」ではなく、しばらくしてからタスクを実行する必要があります。



たとえば、時々APIを呼び出すスクリプトがあり、回答が変更されていない場合は、しばらく「スリープ」し、「ウェイクアップ」して再度チェックします。



または、たとえば、一時ファイルを保存し、指定した時間後にファイルを削除するためにタイマーを開始する必要があります。



そのような場合、RabbitMQで遅延メッセージを作成するメカニズムが必要です(もちろん、RabbitMQを使用してこれを行いたい場合を除きます)。



残念ながら、RabbitMQ自体には、保留中のメッセージを投稿するための既製のメカニズムがありません。 RabbitMQで送信者によって公開されたメッセージは、すぐに受信者に配信されます。 もちろん、受信者がRabbitMQに接続されていない場合があります。この場合、メッセージは接続後に配信されますが、受信者が接続されている場合、メッセージはすぐに配信されます。



メッセージを公開して彼に言うことはできません。「隅まで横になって、10分後に出て受信者に届けてください。」



したがって、問題が発生します-RabbitMQを使用して保留中のメッセージを整理する方法



解決策



これを行うには、回避策を実行する必要があります。 重要な考え方は次のとおりです。キューに送信されたメッセージがこのキューをリッスンしている受信者にすぐに配信される場合、このメッセージを別のキューに送信する必要があります!



一般に、作業のスキームは次のとおりです。



画像



  1. 保留中のメッセージの送信先となる交換機を作成します
  2. 保留中のメッセージが保存されるキューを作成します
  3. キューと交換機の間にバインディングを作成します
  4. メッセージを指定された時間待機させた後、受信者への即時配信のために、通常の交換機に送信されるようにキューを構成します


受取人



consumer_dlx.plスクリプトを検討してください。



#!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; my $exchange = 'myexchange'; my $queue = 'myqueue'; my $routing_key = 'mykey'; #  $mq->connect("localhost", {user => $user, password => $password}); #  $mq->channel_open(1); #  $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'}); #  $mq->queue_declare(1, $queue); #  $mq->queue_bind(1, $queue, $exchange, $routing_key); #  $mq->consume(1, $queue); #   -- $mq->queue_declare(1, $queue.'2'); $mq->queue_bind(1, $queue.'2', $exchange, $routing_key.'2'); $mq->consume(1, $queue.'2'); #   ( ) while ( my $msg = $mq->recv() ) { print "$msg->{body} ($msg->{routing_key})\n"; }
      
      





マニュアルから上記の記事を読んだ人にとって新しいものは何もないので、このスクリプトの各行には焦点を合わせません。 これはメッセージの完全に普通の受信者であり、検討中のトピック(保留中のメッセージ)に関連する特異性さえありません。 受信者はデモンストレーションにのみ必要であり、すべてのソルトは送信者に含まれます。



私は1点だけに注意します



受信者は、2つの異なるrouting_keyを持つエクスチェンジャーと絡み合った2つの異なるキューを作成してリッスンすることに注意してください。 原則として、キューは1つで十分ですが、2つあればより視覚的になり、さらに1つの便利な機能をさらに実証するのに役立ちます。



送信者



次に、producer_dlx.plスクリプトを検討します。



 #!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; my $exchange = 'myexchange'; my $exchange_dlx = 'myexchange.dlx'; my $queue_dlx = 'myqueue.dlx'; my $message = $ARGV[0] || 'mymessage'; my $routing_key = $ARGV[1] || 'mykey'; my $expiration = $ARGV[2] || 0; #  $mq->connect("localhost", {user => $user, password => $password}); #  $mq->channel_open(1); #  $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'}); #  dlx $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'}); #  dlx $mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange}); #  $mq->queue_bind(1, $queue_dlx, $exchange_dlx, $routing_key); #   $mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration});
      
      





コードの個々のセクションを分析しましょう。



 #  $mq->exchange_declare(1, $exchange, {exchange_type => 'direct'});
      
      





これは、受信者で使用されるのと同じ交換機です。 送信者はこのエクスチェンジャーにメッセージを直接送信しませんが、間接的ではありますが、後で使用されるため、エクスチェンジャーを作成する必要があります。



 #  dlx $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'fanout'});
      
      





これは、保留中のメッセージを送信する交換機です。



最初に作成されたタイプが「直接」の交換機とは異なり、作成される交換機のタイプに注意してください-「ファンアウト」。 次に、なぜ「ファンアウト」なのかを説明します。



 #  dlx $mq->queue_declare(1, $queue_dlx, {}, {'x-dead-letter-exchange' => $exchange});
      
      





ここで、保留中のメッセージが配置されるキューを作成します。



引数「x-dead-letter-exchange」は、遅延メッセージのメカニズム全体が置かれる釘です。 この引数がキューに指定されている場合、期限切れのメッセージはこのキューからこの引数で指定された交換機に自動的に移動されます。



したがって、交換機として、受信者が使用する通常の交換機を指定する必要があります。



念のため、パールに詳しくない人への注意:3番目のパラメーターの{}



構造は、この場所でオプションを使用してハッシュへのリンクを渡す必要があることを意味しますが、この特定のケースではオプションは必要ないため、空のハッシュ。




 #   $mq->publish(1, $routing_key , $message, {exchange => $exchange_dlx}, {expiration => $expiration});
      
      





保留中のメッセージについては、交換機にメッセージを送信します。



ここでは、「有効期限」パラメータが重要です。 このパラメーターは、メッセージの保存時間をミリ秒単位で設定します。 この時間が経過すると、メッセージはキューから削除されます。 ただし、上記のように、キューが引数 'x-dead-letter-exchange'に設定されている場合、メッセージがキューから削除されると同時に、メッセージは引数で指定された交換機に送信され、次に、それが絡み合った通常のキューにメッセージを送信します即時配達のため。



routing_keyを使用したシンポイント



覚えているように、受信者では、「直接」タイプの交換機を1つ作成し、異なるキーでそれと絡み合う2つのキューを作成しました。 このようなスキームを使用して、1つのトピックのメッセージを2人の異なる受信者に送信できます。たとえば、状況に応じてログをファイルまたはコンソールに送信できます。 routing_keyは、メッセージが送信される順序を担当します。



ここで、2つの異なるキーを持つ2つのメッセージを延期する必要があると想像してください。 保留中のメッセージのためにそれらを交換機に送信します。交換機は、送信するキューを決定する必要があります。 しかし、それらは異なるrouting_keyを持っているため、交換機は、タイプが 'direct'の場合、同じキューに送信できませんでした。



これが、タイプ 'fanout'の保留中のメッセージの交換機を作成する理由です。このタイプの交換機はrouting_keyを無視し、絡み合っているすべてのキューにメッセージを送信します。 この例では、インターレースされているキューは1つだけです。保留中のメッセージのキューです。 したがって、保留中のメッセージのために交換機に送信されたrouting_keyを持つすべてのメッセージは、このキューに送られます。



この時点で注意深い読者は、「そして、保留中のメッセージキューでの有効期限が切れた後、どのrouting_keyメッセージが通常の交換機に送信されますか?」と尋ねる必要があります。



それらは、持っていたのと同じrouting_keyで送信されます。 routing_keyの値は、特にこれを行うまでは変更されません(ただし、必要に応じてrouting_keyを変更するようにキューを設定できます)。



打ち上げ



まず、consumer_dlx.plを実行する必要があります。次に、異なるパラメーターでproducer_dlx.plを実行できます。



パラメーター:[メッセージ] [キーmykeyまたはmykey2] [ミリ秒単位の遅延]。



画像



静的な画像には表示されませんが、指定された遅延でproducer_dlx.plを実行した後、この同じ遅延が発生し、consumer_dlx.plがメッセージを表示します(キーは括弧内に表示されます)。



警告



ユーザーTsyganov_Ivanがここで正しくプロンプトを表示したため、異なる期限切れのメッセージに問題があります。 実際には、メッセージはキューを厳密に順番に「終了」します(そのため、キューです)。 このため、先頭に大きな有効期限が切れたメッセージが先行している場合、この小さな有効期限がすでに切れている場合でも、キューが小さな有効期限が切れたメッセージを残すことを「ブロック」する場合があります。



したがって、異なるキューに異なる「期限切れ」を突然指定する必要がある場合、1つの一般的な遅延キューの代わりに、いくつかの個別の遅延キューを作成します。通常の各キューには独自の遅延があります。



'expired'の任意の値に対するより普遍的なソリューションは、記事の第2部で説明されています



All Articles