透過遷移PgQ-> RabbitMQ





habrachitatel様、PgQキューからamqpへのアプリケーション移行の透過性に関する経験を共有したいと思います。 たぶん、これは自転車のように見えるかもしれません。おそらく、いくつかの考えが役に立つでしょう。 この記事では、PgQとrabbitmqの基本を紹介します。



背景



私たちのプロジェクトでは、歴史的にPgQが非常に積極的に使用されています。 すべての欠点について、PgQには否定できない利点があります。これは、コードで積極的に使用されているデータベースとのトランザクションです。 つまり、イベントがキュー内にあり、データベースが更新されることを確認できます。 または、これらのどちらも発生しません。 そして、この利点を新しいキューエンジンに転送する必要があります。



PgQを離れる理由については詳しく説明しません。これは別の記事のトピックです。 遷移自体にのみ焦点を当てます。



思考線、pg_amqp



グーグルは、PostgreSQL- pg_amqpの拡張につながります。 amqpに送信するためのPostgreSQLのストアドプロシージャを提供します。 拡張機能は、アプリケーションロジックレベルで完全に機能します。PostgreSQLでトランザクションをロールバックすることにより、データはamqpに入りません。 そして、彼らがコミットすれば、彼らはそうするでしょう。

BEGIN; INSERT INTO some_table (...) VALUES (...); SELECT amqp.publish(broker_id, 'amqp.direct', 'foo', 'bar'); ROLLBACK; //     ,   amqp  
      
      





実際、拡張機能はメッセージがamqpに送信されることを保証しません。 内部では、最初にPostgreSQLで、次にamqpで順次トランザクションコミットのみがあります。 また、2つのコミット間のamqpとの接続が失われると、メッセージは失われます。 そのようなイベントの可能性は小さいという事実にもかかわらず、パケットが失われます。 そして、実際のお金と取引口座で作業することを考えると、これは受け入れられません。



パッケージの0.01%の損失が許容される場合-記事の残りの部分は省略できます。 PgQからamqpに移動する場合は、pg_amqpを使用します。



自転車を作り始めます



*次に、抽象的なamqpの代わりに、特定のrabbitmqがあります。



しかし、PostgreSQLはまだ残っており、内部ではトランザクションがいっぱいです。 そして、トランザクションですべてのパッケージを何らかの種類のテーブルに挿入し、そこに到達しなかったものを何らかの方法でamqpに送信できます。



すぐに言ってやった。



アプリケーションでのPgQのすべての作業は、1つのストアドプロシージャを使用して行われましたが、これは自由に変更できます。



テーブルを作成しました

 amqp.message( id bigint default nextval('amqp_message_id_sequence') primary key, pid bigint, queue varchar(128), message text )
      
      





そして、テーブルに挿入されると、このデータをamqpに送信するトリガー。 また、pgqの挿入ストレージは、このテーブルへのデータ挿入に置き換えられました。 この場合、オーバーヘッドはデータをamqpに送信するだけです。これは、PgQでもイベントごとにテーブルへの挿入が発生するためです。 なぜpidが必要なのか、後で説明します。



これで、テーブルとrabbitmqからの受信者にメッセージがあります。 メッセージは、PostgreSQLトランザクションの一部として保証付きでテーブルに書き込まれ、 ほとんどすべてのメッセージはpg_amqpを使用してamqpに送信されます。 しかし、どのメッセージが来て、どのメッセージが来なかったかをどうやって理解するのでしょうか? そして、パフォーマンスを失わないように、このテーブルを同じサイズ(できれば数十または数百行)に保つ方法は?



ここでrabbitmqが助けになります。 結局のところ、彼はいくつかのキューでメッセージを複製する方法を知っています







それでは、ビジネスコードを1回転させ、2番目をパッケージの受領確認に使用しますか?

すぐに言ってやった。 交換、2つのキュー、メッセンジャーを作成します。これにより、受信したメッセージがamqp.messageテーブルから単純に削除されます。



その結果、「送信中」のメッセージのみが保存されるテーブルがあります。 メッセージは挿入後すぐに削除されるため、テーブルのサイズは常に小さくなります。 テーブルのサイズを監視できます。 また、アプリケーションのビジネスコードはrabbitmqでのみ機能し、内部の魔法については何も知りません。



これが最終レイアウトです







しかし、今、重要な質問が発生します:いくつかのパッケージが到着していないことをどのように理解するか? 結局のところ、amqp.messageテーブルの行は、メッセージが失われることを保証しません-それは単に「途中」である可能性があります。 パッケージを送信するには、これを確認する必要があります。そうしないと、パッケージを2倍作成でき、誰かが100ドルではなく200ドルをクレジットします:)同時に、パッケージが来なかったと判断し、できるだけ早く送信して、最小限に抑える必要がありますキュー内のパケットの順序を乱します。



これが基本的なシャーマニズムの始まりです



すべてのパッケージには昇順で番号が付けられていますが、システムはマルチスレッドであり、パッケージがテーブルにある順序でrabbitmqに到着する必要はありません。 ただし、amqpにメッセージを送信する1つのプロセスのフレームワーク内では、メッセージを厳密に順序付ける必要があります。 PostgreSQLは、現在のプロセスのpidを表示する機能を提供します(pg_backend_pid())。 そして、pg_backend_pid()のフレームワーク内では、パッケージは厳密に昇順で並べられます(nextvalを使用してパッケージIDを生成することを思い出してください)。 したがって、ID Nのパケットを受信すると、IDがN以下の同じpg_backend_pidからのすべてのパケットは配信されず、送信する必要があります。



合計で、キューメッセンジャーを作成する必要がありますが、このメッセンジャーは2つのことしか行いません。





利益! すべてのメッセージが受信者に届くと同時に、PgQを完全に取り除きました。 メインアプリケーションコードはあまり変更されていません。



すべてのオーバーヘッド:





私はメッセンジャーの論理が転倒に対して完全に抵抗力があるという事実に注意を引きます。 あなたはいつでも彼を殺すことができ、彼は再び起動し、問題なく仕事を続けます。



システムは、postgresプロセスがamqpにパケットを送信した場合、そのパケットは到達せず、パケットを送信しなくなった場合を考慮しません。 誰かがこの状況を自動的に処理する方法を教えてくれるとありがたいです。 現在、これは単に監視することで解決されていますが、そのようなイベントはこれまで発生していません。 一般に、メッセージを送信するという事実は非常にまれなイベントです。 多くの異なるpg_backend_pidを削減するpgbouncerを使用します。



All Articles