この点で、保留中のメッセージを任意の遅延時間で整理できる、より普遍的な(ただし少し複雑な)ソリューションを提供したいと思います。
実際、問題は何ですか
RabbitMQのキューは、前のメッセージが終了する前にメッセージがキューを終了できないように設計されています。 実際、それがターンの理由です-前方に登ることはできません。 有効期限パラメータがメッセージに設定されていて、このメッセージの有効期限が切れた場合でも、メッセージの前に他のメッセージがある間はメッセージにキューを終了する権利は与えられません。 古くなったメッセージは、順番が来るまで(キュー、ええ)キューにハングアップします。
彼の番がやっと到着すると、次の2つのことが起こります。
- 「x-dead-letter-exchange」パラメーターがキューに設定されていない場合、メッセージは単に削除され、誰にも配信されません
- パラメーター「x-dead-letter-exchange」が設定されている場合、メッセージは指定された交換機に転送されます
この点で、前の記事で検討したバリアントでは、次のことが発生しました。
すべてのメッセージに同じ「有効期限」が設定されている場合、それらはすべてキューに次々と並んで、以前のすべてのメッセージがキューから出るのを待ちました。 最初のメッセージは保存時間を過ぎ、このメッセージはキューを出て、指定された交換機に配信され、他のすべてのメッセージを解放してキューを出ました。 残りのメッセージは同じ「有効期限」値を持っていたため、それらはすでに前のメッセージと少なくとも同時に、または陳腐化の時点でそれぞれ後の時点で常に古くなりました。
キューに有効期限パラメータが他のメッセージよりも大きいメッセージが含まれているかどうかは別の問題です。 この場合、大きな「期限切れ」のメッセージは、その寿命が切れる前に出口行に到達し、そこに「スタック」しました。 そして、彼の背後で、メッセージは少しの「期限切れ」で蓄積し始めました。それは、たとえ期限が切れていても、行から抜け出すことができませんでした。 その後、大きな「期限切れ」のメッセージがキューから出て、すぐに蓄積されたすべてのメッセージがその後ろに落ちました。「期限切れ」は長く古くなっていました。
要するに、30、20、10の遅延でメッセージを送信する場合、出力順序はそれだけであり、予想される10、20、30ではありません。
打ち方
同じ記事の前の記事で、簡単な解決策を提案しました。一般的な遅延キューを1つではなく、複数作成します。各タスクには独自の遅延キューがあります。 1つのタスクには1つの遅延で十分であると想定されています。 ただし、同じタスク内でも異なる遅延を設定する機能が必要な場合は、保留中のメッセージを作成する普遍的な方法を作成しましょう。
主なアイデアはこれです-異なる遅延のメッセージが同じキューで互いに干渉する場合、遅延ごとに独自の個人キューを作成します!
キーポイント:
- 遅延の異なる遅延メッセージのキューを作成します-各遅延にキューがあります
- キー「queue_name。*」を使用して、通常のキューをエクスチェンジャーとインターレースします。これにより、遅延に関係なくメッセージがキューに入ります。
受取人
consumer_dlx.plスクリプトを検討してください。
#!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; # $mq->connect("localhost", {user => $user, password => $password}); # $mq->channel_open(1); # --- for my $i (1..2) { my $exchange = 'myexchange' . $i; # $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'}); for my $j (1..2) { my $queue = 'myqueue' . $j; # my $queue_full = "$exchange.$queue"; $mq->queue_declare(1, $queue_full, {auto_delete => 0}); # my $routing_key = $queue . '.*'; $mq->queue_bind(1, $queue_full, $exchange, $routing_key); # $mq->consume(1, $queue_full); } } # ( ) while ( my $msg = $mq->recv() ) { print "$msg->{body} ($msg->{routing_key})\n"; }
前の記事のように、受信者スクリプトには保留中のキューに直接関連する機能はありません。
重要な点に注意してください-ここでは、タイプ「トピック」で交換機が作成されます。 これは、使用するrouting_keyに2つのパラメーター(キューの名前と目的の配信時間)が含まれるという事実によるものです。
さらに、次の行に示すように、配信時間は受信者自身では考慮されません。
my $routing_key = $queue . '.*';
ご覧のとおり、2番目のパラメーターは
'*'
設定されているため、すべてのメッセージは時間を考慮せずに通常のキューに配信されます(これは通常のキューに必要なものです)。
送信者
次に、producer_dlx.plスクリプトを検討します。
#!/usr/bin/perl use strict; use warnings; use Net::AMQP::RabbitMQ; my $mq = Net::AMQP::RabbitMQ->new(); my $user = 'guest'; my $password = 'guest'; my $message = $ARGV[0] || 'mymessage'; my $exchange = $ARGV[1] || 'myexchange1'; my $queue = $ARGV[2] || 'myqueue1'; my $delay = $ARGV[3] || 0; # $mq->connect("localhost", {user => $user, password => $password}); # $mq->channel_open(1); # $mq->exchange_declare(1, $exchange, {exchange_type => 'topic'}); # dlx my $exchange_dlx = $exchange . '.dlx'; $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'}); # dlx my $endtime = time() + $delay; my $queue_full = "$exchange.$queue.$endtime"; $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000}); # my $routing_key = "$queue.$endtime"; $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key); # $mq->publish(1, $routing_key, $message, {exchange => $exchange_dlx});
ここでは次の点が重要です。
# dlx my $exchange_dlx = $exchange . '.dlx'; $mq->exchange_declare(1, $exchange_dlx, {exchange_type => 'topic'});
通常の交換機と同様に、一時メッセージの交換機は「トピック」タイプでなければなりません。
# dlx my $endtime = time() + $delay; my $queue_full = "$exchange.$queue.$endtime"; $mq->queue_declare(1, $queue_full, {}, {'x-message-ttl' => $delay * 1000, 'x-dead-letter-exchange' => $exchange, 'x-expires' => $delay * 1000 + 10000});
現在の時間に指定された遅延を追加することにより、
$endtime
の希望する配信時間を計算します(秒単位の値、unixtimeが使用されます)。
次に、指定された遅延の個人用キューを作成し、キュー名に配信時間を直接入力します。 キュー名に時間を挿入することは技術的な必要性ではありません。キュー名は好きなように指定できますが、わかりやすく、混乱を避けるために、それを行うのが最も便利です。
キューを作成するとき、次の3つの引数を渡す必要があります。
-
'x-message-ttl'
はメッセージの有効期間です。 お気づきのように、遅延ごとに個別のキューがあるため、個々のメッセージに同じ'expiration'
値を指定する代わりに、キュー内のすべてのメッセージの遅延を一度に設定できます。 -
'x-dead-letter-exchange'
は、このキューからのメッセージが転送される交換機の名前です。 -
'x-expires'
はキュー自体のライフタイムです。 遅延ごとに新しい保留キューを作成するため、これらのキューは常に蓄積されます。 彼らが無駄にどれだけ邪魔しないように、私たちは彼らに寿命を与え、その後自動的に削除されます。 重要! キューの有効期間は、メッセージの有効期間よりも長くする必要があります。 キューの有効期間をメッセージの有効期間と同じに設定した場合、メッセージの配信は保証されません。キューからのメッセージが配信される前にキューがバングされる可能性があります。 この例では、キューの有効期間はメッセージの有効期間よりも10秒長く設定されています。
# my $routing_key = "$queue.$endtime"; $mq->queue_bind(1, $queue_full, $exchange_dlx, $routing_key);
routing_keyは、「queue_name。Wish_time_delivery」として指定されます。 中央のポイントはキーを2つのパラメーターに分割します。これらは「トピック」エクスチェンジャーが解析するパラメーターです。
ご覧のとおり、両方のパラメーターがバインディングに含まれています。 したがって、一時メッセージの交換機は、特定の配信時間で特定の1つのキューにこのキーを持つメッセージを送信します。
打ち上げ
まず、consumer_dlx.plを実行する必要があります。次に、異なるパラメーターでproducer_dlx.plを実行できます。
パラメーター:[メッセージ] [交換機] [キュー] [秒単位の遅延]。
ご覧のとおり、最初はメッセージはより長い遅延で送信されましたが、メッセージは正しい順序で配信されました。まず、遅延の少ないメッセージです。