RabbitMQ-延期されたメッセージ、パート2

画像 保留中のメッセージに関する以前の記事では、すべての保留中のメッセージが同じ遅延時間を持つ単純なケースのために保留中のメッセージを整理するオプションを検討しました。 しかし、コメントですぐに、彼らは、保留中のメッセージを整理するこのオプションは、異なる遅延時間を持つメッセージにそれを使用しようとすると問題を引き起こすと指摘しました。



この点で、保留中のメッセージを任意の遅延時間で整理できる、より普遍的な(ただし少し複雑な)ソリューションを提供したいと思います。



実際、問題は何ですか



RabbitMQのキューは、前のメッセージが終了する前にメッセージがキューを終了できないように設計されています。 実際、それがターンの理由です-前方に登ることはできません。 有効期限パラメータがメッセージに設定されていて、このメッセージの有効期限が切れた場合でも、メッセージの前に他のメッセージがある間はメッセージにキューを終了する権利は与えられません。 古くなったメッセージは、順番が来るまで(キュー、ええ)キューにハングアップします。



彼の番がやっと到着すると、次の2つのことが起こります。



  1. 「x-dead-letter-exchange」パラメーターがキューに設定されていない場合、メッセージは単に削除され、誰にも配信されません
  2. パラメーター「x-dead-letter-exchange」が設定されている場合、メッセージは指定された交換機に転送されます


この点で、前の記事で検討したバリアントでは、次のことが発生しました。



すべてのメッセージに同じ「有効期限」が設定されている場合、それらはすべてキューに次々と並んで、以前のすべてのメッセージがキューから出るのを待ちました。 最初のメッセージは保存時間を過ぎ、このメッセージはキューを出て、指定された交換機に配信され、他のすべてのメッセージを解放してキューを出ました。 残りのメッセージは同じ「有効期限」値を持っていたため、それらはすでに前のメッセージと少なくとも同時に、または陳腐化の時点でそれぞれ後の時点で常に古くなりました。



キューに有効期限パラメータが他のメッセージよりも大きいメッセージが含まれているかどうかは別の問題です。 この場合、大きな「期限切れ」のメッセージは、その寿命が切れる前に出口行に到達し、そこに「スタック」しました。 そして、彼の背後で、メッセージは少しの「期限切れ」で蓄積し始めました。それは、たとえ期限が切れていても、行から抜け出すことができませんでした。 その後、大きな「期限切れ」のメッセージがキューから出て、すぐに蓄積されたすべてのメッセージがその後ろに落ちました。「期限切れ」は長く古くなっていました。



要するに、30、20、10の遅延でメッセージを送信する場合、出力順序はそれだけであり、予想される10、20、30ではありません。



打ち方



同じ記事の前の記事で、簡単な解決策を提案しました。一般的な遅延キューを1つではなく、複数作成します。各タスクには独自の遅延キューがあります。 1つのタスクには1つの遅延で十分であると想定されています。 ただし、同じタスク内でも異なる遅延を設定する機能が必要な場合は、保留中のメッセージを作成する普遍的な方法を作成しましょう。



主なアイデアはこれです-異なる遅延のメッセージが同じキューで互いに干渉する場合、遅延ごとに独自の個人キューを作成します!



画像



キーポイント:



  1. 遅延の異なる遅延メッセージのキューを作成します-各遅延にキューがあります
  2. キー「queue_name。*」を使用して、通常のキューをエクスチェンジャーとインターレースします。これにより、遅延に関係なくメッセージがキューに入ります。


受取人



consumer_dlx.plスクリプトを検討してください。



#!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; #  $mq->connect("localhost", {user => $user, password => $password}); #  $mq->channel_open(1); #      --- for my $i (1..2) { my $exchange = 'myexchange' . $i; #  $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'}); for my $j (1..2) { my $queue = 'myqueue' . $j; #  my $queue_full = "$exchange.$queue"; $mq->queue_declare(1, $queue_full, {auto_delete => 0}); #  my $routing_key = $queue . '.*'; $mq->queue_bind(1, $queue_full, $exchange, $routing_key); #  $mq->consume(1, $queue_full); } } #   ( ) while ( my $msg = $mq->recv() ) { print "$msg->{body} ($msg->{routing_key})\n"; }
      
      





前の記事のように、受信者スクリプトには保留中のキューに直接関連する機能はありません。



重要な点に注意してください-ここでは、タイプ「トピック」で交換機が作成されます。 これは、使用するrouting_keyに2つのパラメーター(キューの名前と目的の配信時間)が含まれるという事実によるものです。



さらに、次の行に示すように、配信時間は受信者自身では考慮されません。



 my $routing_key = $queue . '.*';
      
      





ご覧のとおり、2番目のパラメーターは'*'



設定されているため、すべてのメッセージは時間を考慮せずに通常のキューに配信されます(これは通常のキューに必要なものです)。



送信者



次に、producer_dlx.plスクリプトを検討します。



 #!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; my $message = $ARGV[0] || 'mymessage'; my $exchange = $ARGV[1] || 'myexchange1'; my $queue = $ARGV[2] || 'myqueue1'; my $delay = $ARGV[3] || 0; #  $mq->connect("localhost", {user => $user, password => $password}); #  $mq->channel_open(1); #  $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'}); #  dlx my $exchange_dlx = $exchange . '.dlx'; $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'}); #  dlx my $endtime = time() + $delay; my $queue_full = "$exchange.$queue.$endtime"; $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000}); #  my $routing_key = "$queue.$endtime"; $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key); #   $mq->publish(1, $routing_key, $message, {exchange => $exchange_dlx});
      
      





ここでは次の点が重要です。



 #  dlx my $exchange_dlx = $exchange . '.dlx'; $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'});
      
      





通常の交換機と同様に、一時メッセージの交換機は「トピック」タイプでなければなりません。



 #  dlx my $endtime = time() + $delay; my $queue_full = "$exchange.$queue.$endtime"; $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000});
      
      





現在の時間に指定された遅延を追加することにより、 $endtime



の希望する配信時間を計算します(秒単位の値、unixtimeが使用されます)。



次に、指定された遅延の個人用キューを作成し、キュー名に配信時間を直接入力します。 キュー名に時間を挿入することは技術的な必要性ではありません。キュー名は好きなように指定できますが、わかりやすく、混乱を避けるために、それを行うのが最も便利です。



キューを作成するとき、次の3つの引数を渡す必要があります。



  1. 'x-message-ttl'



    はメッセージの有効期間です。 お気づきのように、遅延ごとに個別のキューがあるため、個々のメッセージに同じ'expiration'



    値を指定する代わりに、キュー内のすべてのメッセージの遅延を一度に設定できます。
  2. 'x-dead-letter-exchange'



    は、このキューからのメッセージが転送される交換機の名前です。
  3. 'x-expires'



    はキュー自体のライフタイムです。 遅延ごとに新しい保留キューを作成するため、これらのキューは常に蓄積されます。 彼らが無駄にどれだけ邪魔しないように、私たちは彼らに寿命を与え、その後自動的に削除されます。 重要! キューの有効期間は、メッセージの有効期間よりも長くする必要があります。 キューの有効期間をメッセージの有効期間と同じに設定した場合、メッセージの配信は保証されません。キューからのメッセージが配信される前にキューがバングされる可能性があります。 この例では、キューの有効期間はメッセージの有効期間よりも10秒長く設定されています。


 #  my $routing_key = "$queue.$endtime"; $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key);
      
      





routing_keyは、「queue_name。Wish_time_delivery」として指定されます。 中央のポイントはキーを2つのパラメーターに分割します。これらは「トピック」エクスチェンジャーが解析するパラメーターです。



ご覧のとおり、両方のパラメーターがバインディングに含まれています。 したがって、一時メッセージの交換機は、特定の配信時間で特定の1つのキューにこのキーを持つメッセージを送信します。



打ち上げ



まず、consumer_dlx.plを実行する必要があります。次に、異なるパラメーターでproducer_dlx.plを実行できます。



パラメーター:[メッセージ] [交換機] [キュー] [秒単位の遅延]。



画像



ご覧のとおり、最初はメッセージはより長い遅延で送信されましたが、メッセージは正しい順序で配信されました。まず、遅延の少ないメッセージです。



All Articles