約1年前、経験を交換するために私たちのオフィスに来た元同僚から、 MassTransit (MT)ライブラリについて初めて聞きました。 彼が採用した会社は、MTを使用して、開発したサービスのモジュール間の接続性を削減しました。高い接続性が問題になり始めたため、他の人の経験が非常に役に立ちました。 モジュール間の相互作用のイベントベースのモデルに切り替えることで接続性を削減することに加えて、MTは複数のプロセス間でリソースを集中的に使用するタスクを分散するためにも役立ちました。
MassTransitとは何ですか。
MassTransitは、よく知られているDataBusパターンの実装です。 このパターンの主な目的は、オブジェクト間のメッセージ交換を通じて、互いの存在を認識していない複数のオブジェクトの相互作用を整理することです。 ライブラリは、 NServiceBusプロジェクトの無料の類似物としてDru SellersとChris Pattersonによって作成され、RabbitMQまたはMSMQを選択用のメッセージサーバートランスポートとして使用できます。 このプロジェクトでは、RabbitMQを使用することを選択したため、ここでは、このキューサーバーでのバス構成を待っている経験と落とし穴について説明します。 MassTransitはAMQPプロトコルの抽象化レイヤーであり、開発者は実装の詳細を隠そうとしたため、ライブラリを使用し、記事を理解し、バスを構成するときにレーキを正常にトラバースするために、キューサーバーデバイスとAMQPプロトコルの知識は実際には必要ありません、デバイスの一般的な考え方RabbitMQサーバーが必要です。 これは悪いニュースですが、良いニュースがあります-最低限の知識が必要です 。 ここから最初の4つのレッスンを読むだけで十分です 。 レッスンは小さく理解しやすいものであり、RabbitMQを使用した作業の基本を学習するのにそれほど時間はかかりませんが、多くの利点をもたらすことができます。 ちなみに、最初の2、3のレッスンはhabrで翻訳されました。 レッスン1とレッスン2 。
ビジネスに。
理論から実践に移り、MassTransitライブラリを使用して、機能がチュートリアルからRabbitMQまでの最初の例に似たタスクを実行してみます。 2つのPublisherおよびSubscriberオブジェクトが相互作用する単純なコンソールアプリケーションを作成します。 パブリッシャーは、キーを押すと、メッセージ「KeyWasPressed」と押されたキーのコードをバスに送信します。 サブスクライバーは、バスからこのメッセージをキャプチャし、画面に表示します。
最初に
コードに直接進みます。 バスに送信されるメッセージは、通常のDTOオブジェクトです。 まず、パブリッシャーからサブスクライバーに送信されるメッセージ自体のクラスを作成する必要があります。 KeyWasPressedと呼びましょう。
public class KeyWasPressed { // public ConsoleKey PressedKey { get; set; } }
それでは、単純なパブリッシャー(パブリッシャー)とサブスクライバー(サブスクライバー)の作成に移りましょう。 ライブラリの重要な要素はServiceBusです。 MassTransitのServiceBusは、RabbitMQ(またはMSMQ)キューサーバーがメッセージ転送を担当するメッセージング環境です。 サブスクライバーおよびパブリッシャーは、まさにこのタイプのオブジェクトです-ServiceBus。
加入者
IServiceBus subscriber = ServiceBusFactory.New(sbc => { // rabbitMq sbc.UseRabbitMq(); // sbc.ReceiveFrom("rabbitmq://localhost/subscriber"); // KeyWasPressed. // sbc.Subscribe(subs => subs.Handler<KeyWasPressed>(msg => Console.WriteLine("{0}{1}{2}{3}",Environment.NewLine,"Key '", msg.PressedKey, "' was pressed") )); });
出版社
IServiceBus publisher = ServiceBusFactory.New(sbc => { //, rabbitMq sbc.UseRabbitMq(); // , sbc.ReceiveFrom("rabbitmq://localhost/publisher"); });
MassTransitは、メッセージサーバーに接続されているServiceBusインスタンスをパブリッシャーとサブスクライバーに分割しません;接続された各インスタンスを通じて、メッセージをパブリッシュおよび処理できます。 したがって、メッセージを受信するためのキューを常に指定する必要がありますが、パブリッシャーオブジェクトの場合のように、何も受信しない場合もあります。
次に、押された各キーがバスに送信される無限ループを作成します。
while (true) { publisher.Publish(new KeyWasPressed() { PressedKey = Console.ReadKey().Key }); }
すべての準備が整ったら、アプリケーションを実行してキーを押します。
キューサーバーで何が起こるか。
アプリケーションの起動時にキューサーバーで何が起こるか見てみましょう。 デフォルトでは、RabbitMQインストーラーはRabbitMQをWindowsサービスとして登録するため、コマンドラインユーティリティを使用して、キューサーバーで特定の時間に何が起こっているかを常に確認できます。 しかし、標準の配布キットの一部でもあるWebプラグインを使用する方が便利です。
それをインストールするには、次のいくつかの手順に従う必要があります
1)コマンドプロンプトで、サーバーインストールディレクトリからsbinフォルダーに移動します(たとえば、%PROGRAMFILES%\ RabbitMQ Server \ rabbitmq_server_2.7.1 \ sbin \)
2)次に、次のコマンドを実行します。
rabbitmq-plugins.bat enable rabbitmq_management
3)最後に、管理プラグインを有効にするには、RabbitMQサービスを再インストールする必要があります。 次の一連のコマンドを実行して、サービスをインストールします。
rabbitmq-service.bat stop
rabbitmq-service.batインストール
rabbitmq-service.bat start
RabbitMQサーバー管理プラグインがインストールされ、実行されていることを確認するには、ブラウザーを起動して次のページに進みます (バージョン3.0の場合、デフォルトのポートは55672です)。 すべてがうまくいった場合、次のような画面が表示されます。
デフォルトのユーザー名/パスワードはguest / guestです。 中に入り、交換ポイントのリストをクリックします。
2)次に、次のコマンドを実行します。
rabbitmq-plugins.bat enable rabbitmq_management
3)最後に、管理プラグインを有効にするには、RabbitMQサービスを再インストールする必要があります。 次の一連のコマンドを実行して、サービスをインストールします。
rabbitmq-service.bat stop
rabbitmq-service.batインストール
rabbitmq-service.bat start
RabbitMQサーバー管理プラグインがインストールされ、実行されていることを確認するには、ブラウザーを起動して次のページに進みます (バージョン3.0の場合、デフォルトのポートは55672です)。 すべてがうまくいった場合、次のような画面が表示されます。
デフォルトのユーザー名/パスワードはguest / guestです。 中に入り、交換ポイントのリストをクリックします。
少なくとも1つのサブスクライバーが存在する各メッセージに対して、MTはテンプレートNamespace:ClassNameに基づいた名前で交換ポイント(exchange)を作成し、サブスクライバーキューをそれにバインドします。 このアプリケーションでは、MTという名前の交換ポイントが1つだけあります:KeyWasPressed。この交換ポイントは1つのキュー(サブスクライバー)に関連付けられています。
MTの新しいバージョンでは、キューと交換ポイントはデフォルトで永続的(耐久性)になりましたが、このバグや機能はまだ理解していませんでしたが、各サブスクリプションを作成する前に、Permanent()メソッドを明示的に呼び出して、対応する交換ポイントをキューサーバーに修正する必要がありました。 安定した交換ポイントとキューは、パブリッシャーによるメッセージのパブリッシュ時にサブスクライバーがキューサーバーから切断された場合、パブリッシュされたメッセージはまだサブスクライバーを通過しませんが、キュー内に立ち、接続するのを静かに待ちます(サブスクライバー)という点で便利です。
新しいサブスクライバーを追加します。
KeyWasPressedメッセージに別のサブスクライバーを追加して、アプリケーションを変更しましょう。 最初とは異なり、anothersubscriberと呼ばれるキューにサブスクライブされ、ユーザーが押した各キーの数値表現を表示します。
IServiceBus anotherSubscriber = ServiceBusFactory.New(sbc => { sbc.UseRabbitMq(); sbc.ReceiveFrom("rabbitmq://localhost/anothersubscriber"); sbc.Subscribe(subs => subs.Handler<KeyWasPressed>(msg => Console.WriteLine("{0}{1}{2}{3}", Environment.NewLine, "Key with code ", (int) msg.PressedKey, " was pressed") )); });
アプリケーションを起動し、キーを押します。
これで、画面上の各キーを押すと、2つの行が表示されます-各キーのデジタルコードとシンボルコードが押されました。 キューサーバーのコントロールパネルに移動すると、MT:KeyWasPressed交換ポイントに既に2つのサブスクライバーと別のサブスクライバーのキューが接続されていることがわかります。 また、MT.KeyWasPressedタイプのメッセージを受信すると、RabbitMQキューサーバーは両方のキューに送信します。
リソースを集中的に使用するタスクの分散。
次に、MassTransit + RabbitMQバンドルを使用して、リソースを大量に消費するタスクを複数のプロセスに分散する方法を見てみましょう。
ビデオファイルを変換するサービスを作成するタスクに直面していることを想像してください。 このタスクには、2つのサーバーがあります。 経験的に、サーバー1の最適な負荷は3つの並列変換可能なビデオファイルであり、サーバー2の場合、同時に変換されるビデオファイルの数は5を超えてはならないことがわかりました。 もちろん、変換プロセスはエミュレートします。 変換用のファイルを受け取るfilesToConvertというキューがあるとします。 各ファイルは、VideoFileタイプのオブジェクトを表します。
public class VideoFile { public int Num { get; set; } //, public int TimeToConvert { get; set; } }
そのようなメッセージを受信した加入者は、ゲームのルールに従って、受信したメッセージのTimeToConvertフィールドで指定されたミリ秒数だけスリープする必要があります。
伝説によれば、最初のサーバーで実行されるコード。
int firstServerFilesCount = 0; IServiceBus firstServer = ServiceBusFactory.New(sbc => { sbc.UseRabbitMq(); // , sbc.SetConcurrentConsumerLimit(3); sbc.Subscribe(subs => subs.Handler<VideoFile>(msg => { firstServerFilesCount++; Thread.Sleep(msg.TimeToConvert); Console.WriteLine(" 1. {0} {1} {2} . : {3} 3. ThreadId - {4}", Environment.NewLine, msg.Num, msg.TimeToConvert, firstServerFilesCount, Thread.CurrentThread.ManagedThreadId); firstServerFilesCount--; })); //prefetch=3. , sbc.ReceiveFrom("rabbitmq://localhost/filesToConvert?prefetch=3"); });
伝説によると、最初のサーバーでは、同時に解析されるメッセージの数を3つに制限することにしました。 したがって、引数3を指定してSetConcurrentConsumerLimitメソッドを呼び出します。つまり、firstServerオブジェクトをメッセージサーバーに接続すると、MassTransitはサーバーからのメッセージを処理するために3つのスレッドのプールを保持します。 ただし、RabbitMQはメッセージの配信に従事していることを覚えておく必要があり、firstServerオブジェクトが一度に最大3つのメッセージを解析する準備ができているという事実を知ることはできません。 firstServerがメッセージサーバーに接続するUriのprefetchパラメーターを指定することで、この情報を彼に渡すことができます。
伝説によれば、このコードは2番目のサーバーで実行されています。
int secondServerFilesCount = 0; IServiceBus secondServer = ServiceBusFactory.New(sbc => { sbc.UseRabbitMq(); // , sbc.SetConcurrentConsumerLimit(5); sbc.Subscribe(subs => subs.Handler<VideoFile>(msg => { secondServerFilesCount++; Thread.Sleep(msg.TimeToConvert); Console.WriteLine(" 2. {0} {1} {2} . : {3} 5. ThreadId - {4}", Environment.NewLine, msg.Num, msg.TimeToConvert, secondServerFilesCount, Thread.CurrentThread.ManagedThreadId); secondServerFilesCount--; })); //prefetch=3. , sbc.ReceiveFrom("rabbitmq://localhost/filesToConvert?prefetch=5"); });
ご想像のとおり、違いは、メッセージを解析するために設計されたプール内のスレッドの数と、Uriのプリフェッチの値のみです。 さらに重要なのは、firstServerが接続されているのと同じキューにsecondServerを接続し、それによってこのキューに表示されるメッセージについてサブスクライバー間の競合が発生するという事実です。 firstServerオブジェクトとsecondServerオブジェクトが異なるキューに接続されている場合、各サーバーで1回ずつ、各ファイルが2回変換されるという事実に直面します。
次に、filesToConvertキューを100個の「ビデオファイル」で埋めるコードを作成し、ランダムな変換時間をrandomで指定します。
Random rnd = new Random(); for (int i = 1; i <= 100; i++) { publisher.Publish(new VideoFile() {Num = i, TimeToConvert = rnd.Next(100, 5000)}); }
割り当てたスレッドの数を使用して、サブスクライブするサーバーが並行して動作することを確認します。
MassTransitが提供できる他の可能性
1つの記事内で、MassTransitライブラリが提供するすべての可能性を考慮することは不可能です。 しかし、あなたはリストすることができます(私はそれをします)
- サガ。 分散プロセスを調整するためのメカニズム
- スケジューリング。 Quartz.netライブラリとの統合により、スケジュールされたキューにメッセージを送信できます
- 診断 ControlBus 、 PerformanceCountersのサポート
- Rx統合。 Rx拡張機能のサポート
- 暗号化 送信メッセージの暗号化。 Rijndaelブロック暗号は暗号化に使用されます。
- ユニットテスト可能性。 テストの目的で、MassTransitは外部のMQサーバーを必要としない組み込みトランスポート(ループバック)を使用できます
この記事の例のコードは、 ここで入手できます 。