保証された非同期メッセージ配信の実装

統合ソリューションの開発者であるAlexander Romanovが執筆した記事。


システムを統合するプロセスでは、システム間でメッセージを確実に配信する必要性にしばしば直面します。 そのような場合、キューが役立ちます。 しかし、すべてのタスクがシステムAからシステムBにメッセージを配信するほど単純なわけではありません。統合に関与する隣接システムからのデータで配信メッセージを充実させる必要がある場合があります。 常にキューを介して統合できるわけではありませんが、同期サービスしかありません。 そして今、私たちの統合では、「同期」を使用することによるアクセス不能、失敗、およびその他の「楽しい」機能などの現象が発生します。 中間障害の処理をソースシステムにシフトすることは可能ですが、これは文化的ではなく、複数のシステムのイベントを一度に公開することは不可能です(トピック内)。







私たちの観点から見ると、この問題に対する便利で実用的な解決策は、すべてのステップで外部サービス呼び出しを伴う内部キューを介した非同期の段階的なメッセージ処理です。 誤って、または一時的な操作不能によるサービス障害が発生した場合、メッセージは内部障害キューに分類され、サービスで発生した問題を解析した後に再送信されます。







また、このソリューションにより、外部サービスを操作する際にトランザクションがロールバックできないという問題もなくなります。 呼び出しは2回通過しません-処理は、障害が発生したステップから正確に始まります。



上記のすべては、内部バスを介したコンポーネント間の非同期相互作用がすぐに使用できる統合バスでの実装が非常に簡単です。 しかし、「ボックス」の価格が高すぎると、統合バスの使用が大幅に複雑になる可能性があります。 Spring Integration(以降SI)+ Rabbit MQで簡単なアプリケーションを実装する例を示します。 XAを使用することは不可能であるため、本番環境ではRabbit MQを使用しないように予約します。



アプリケーション全体の中核spring-integration-context.xmlです。 コンポーネントモデルについて説明し、リソースBeanとMQを操作するためのトランザクションマネージャーを初期化します。 詳細に説明します。



SIに組み込まれたドライバーを接続し、リソースを規定します。



<rabbit:queue name="si.test.queue.to"/> <rabbit:queue name="si.test.queue.from"/>
      
      





リソースと対話するための低レベルのamqpTemplate Beanが必要です。 このビンはテストで直接使用します。RabbitMQで動作するSIコンポーネントに必要です。 リソースへの接続に必要なConnectionFactoryは、application.ymlの設定に従ってSpring Bootを設定します( org.springframework.boot.autoconfigure.amqp.RabbitAutoConfigurationを参照 )。



  <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory" mandatory="true"/>
      
      





Rabbit MQでのトランザクション作業にはTransactionManagerが必要です(操作中にエラーが発生した場合、メッセージをキューにロールバックする必要があります)。 残念ながら、Rabbit MQはXAトランザクションをサポートしていません。そうでない場合、トランザクションマネージャーはSpring Bootを構成します。 Springによって手動で構成可能。



  <bean id="rabbitTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <constructor-arg name="connectionFactory" ref="rabbitConnectionFactory"/> </bean>
      
      





そして今、最良の部分。 「描く」フロー! 引用符で囲んでいるのは、XMLの形式で記述しているためです。



流れ



必要なもの:





このアプリケーションで保証付き配信を編成するには、内部キューを介した非同期呼び出しを使用します。 多くのコンポーネントがあるため、いくつかのキューが必要であり、開発の観点からも管理者にとっても不便です。 私たちはそれを試みます。



2つのコンポーネント間の相互作用のシナリオを考えます。 SomeComponentOneは、チャネルからメッセージを受信し、特定の同期RESTサービスを呼び出し(データベースと連携、ファイルへの書き込みなど)、メッセージを送信して、SomeComponentTwoが処理する必要があるさらなる処理を行います。 SomeComponentOneが委託された作業を完了できなかった場合、トランザクションをロールバックし、受信したメッセージを元の場所に戻す必要があります。 すべてが正常であれば、メッセージを内部キューに送信し、トランザクションを完了します。 SomeComponentOneは、内部キューからメッセージを取得して送信しますが、必ずしも受信した形式とは限りません。 メッセージは強化または変更される場合がありますが、私たちにとっては重要ではありません。 SomeComponentTwoコンポーネントと連携するように設計されています。 ルーティングの問題があります。 メッセージは内部キューに分類され、指定された時間に必要なコンポーネントによってそこから取得される必要があります。 つまり、ルーティングが必要です。



このアプリケーションは、メッセージヘッダーに基づくルーティングを示しています。 メッセージのカスタムPartnerComponentヘッダーが入力され、メッセージで機能するコンポーネントを示します。



フローの技術的な詳細を書き留めます。



入力キューから読み取るためのアダプター。 メッセージを受信し、トランザクションですぐに内部キューにスローします。



 <int-amqp:inbound-channel-adapter channel="innerChannel" queue-names="si.test.queue.to" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/> <int-amqp:channel id="innerChannel" queue-name="si.test.queue.inner" connection-factory="rabbitConnectionFactory" transaction-manager="rabbitTransactionManager"/>
      
      





Springが提供する特殊な非同期キューチャネルを使用しました。 SIチャネルインターフェイスを取得し、メッセージをキュー(アプリケーションの内部mq-queue)に直接格納します。 このキューチャネルからメッセージを受信すると、トランザクションが開かれます。 トランザクションマネージャーを接続しました。



このキューチャネルに、メッセージヘッダーで動作するSIルーターを接続します。



 <int:header-value-router id="wireRouter" input-channel="innerChannel" header-name="PartnerComponent" default-output-channel="component1Channel"> <int:mapping value="ComponentTwo" channel="component2Channel"/> <int:mapping value="ComponentThree" channel="component3Channel"/> <int:mapping value="OutboundComponent" channel="outboundRabbitChannel"/> </int:header-value-router>
      
      





フローの新しいメッセージにはPartnerComponent技術ヘッダーがないため、デフォルトではsomeComponentOneによって処理され、その責任はPartnerComponentメッセージヘッダーの次のコンポーネントを指定して、内部キューにメッセージを送信することです。 ルータは再び内部キューからメッセージを取得し、指定されたコンポーネントに送信して処理します。



ルーターからのメッセージが送信されるコンポーネントの説明。



  <int:channel id="component1Channel"/> <int:service-activator input-channel="component1Channel" ref="someComponentOne" method="process"/> <int:channel id="component2Channel"/> <int:service-activator input-channel="component2Channel" ref="someComponentTwo" method="process"/> <int:channel id="component3Channel"/> <int:service-activator input-channel="component3Channel" ref="someComponentThree" method="process"/> <int:channel id="outboundRabbitChannel"/> <int:service-activator input-channel="outboundRabbitChannel" ref="outboundRabbitComponent" method="process"/
      
      





出力キューに送信するアダプター。

  <int:channel id="toRabbit"/> <int-amqp:outbound-channel-adapter channel="toRabbit" amqp-template="amqpTemplate" routing-key="si.test.queue.from"/>
      
      





アセンブリ(pom.xml)



古き良きMaven。 Spring Bootからの標準ビルド。 SIおよびAMQPの依存関係は、必要なすべてのライブラリを提供します。 また、spring-boot-starter-testを接続して、JUnitにテストケースを実装します。



SomeComponent * .javaの動作



SIをフローするサービスアクティベーターとして接続されたトランザクションBean。 RestTemplateを介してRESTを呼び出し、innerChannelを介して内部キューに送信します。 サービスでの作業とテストでの便利なモックを示すのに十分です。



テスト中



testHappyPathテストは、RESTの呼び出し時にエラーがない場合にフローをチェックしました。 失敗することなくRESTサービスへのすべての呼び出しをモックし、メッセージを入力キューにドロップし、出力を待機し、受信したメッセージの本文の内容に従ってすべてのコンポーネントの通過を確認します。



testGuaranteedDeliveryテストは、RESTの1つで障害が発生した場合の保証付き配信を確認しました。 3番目のコンポーネントからサービスを呼び出すときに1回限りのエラーをエミュレートし、メッセージが出力キューに配信されるのを待って、受信したメッセージの本文を確認します。



おわりに



配信が保証されたアプリケーションを作成しました。 いくつかの小さな未解決の問題があります。メッセージの再送信は無限であり、制御不能です。誰がこのソリューションをサポートしていますか-管理者またはこのサポートを自動化できますか これらの問題は、自己記述型アプリケーションと、メッセージの種類ごとに個別の設定を使用して解決します。 おそらく将来、これらのソリューションについて説明します。



git-hubアプリケーション全体



All Articles