非同期および待機に基づくイベントモデル

2012年、原油価格がまだ3桁で草が緑であったとき、Microsoftは.NET 4.5をリリースし、それとともに非同期/待機デザインをリリースしました。 かなりの数の記事が既にそれについて書かれており( C#の非同期 )、ほとんどのC#開発者はそれをよく研究しています。 しかし、すべてのユースケースが考慮されていますが、もう少し待ってから絞ることは可能ですか?



この構成の最も明らかな使用法は、非同期操作が完了するのを待つことです。 最初に頭に浮かぶのは、I / Oを待つことです。 たとえば、クライアントにリクエストを送信してレスポンスを期待し、awaitを使用して、レスポンスを受信した後もコードの実行を継続できます。コード自体は同期しているように見えます。 しかし、待機中にこの操作を中断する必要が生じた場合はどうなりますか? 次に、CancellationTokenを使用する必要があります。そのような操作が複数ある場合は、トークンをリンクするか、1つの共通トークンを使用する必要があります。 この場合、キャンセルの理由は、このCancellationTokenを使用してコードから隠されます。 キャンセルに加えて、コードは接続損失、タイムアウト、返されたエラーなどの処理をサポートする必要があります。



クラシックバージョンでは、キャンセルを処理するCancellationTokenの使用、切断を処理するtry catch、およびリクエストの結果を評価するために返されたデータの分析コードが使用されます。 しかし、これらすべてを単一のパラダイムに適合させることは可能ですか? この記事では、async / await構文シュガーを使用した別のイベントベースのアプローチを検討することを提案します。



イベントライブラリ。



async / awaitのイベントモデルに必要なものはすべて、イベントライブラリとして設計され、MITライセンスの下でGitHubに投稿さました。



ライブラリは2年以上にわたってテストされ、戦闘システムで使用されています。



使用する



Eventingを使用して最初に説明した例は次のようになります。



var @event = await this.EventManager.WaitFor<MessageReceived, CancelRequested>(TimeSpan.FromSeconds(50)); if (@event == null) Log.Info("timeout"); else if (@event is CancelRequested) Log.Info("Cancelled, reason: {0}", ((CancelRequested) @event).Reason); else Log.Info("Message received");
      
      





ここでは、EventManager-IEventManagerインターフェイスを実装するイベントマネージャーを使用して、タイムアウトが50秒のMessageReceivedおよびCancelRequestedイベントを待機します。 WaitFor呼び出しを使用して、指定されたイベントにサブスクライブし、await呼び出しはコードのさらなる実行をブロックします(ストリームはブロックしません)。 指定されたイベントのいずれかが発生するか、タイムアウトが期限切れになるまでロックされたままになり、その後、現在の同期コンテキストで実行が継続されます。 しかし、サブスクリプションの形成中にクライアントとの接続が失われた場合はどうなりますか? この場合、クライアント切断イベントが失われるため、コードは50秒間フリーズします。 これを修正しましょう:



 //   var eventAwait = this.EventManager.WaitFor<MessageReceived, ClientDisconnected, CancelRequested>(TimeSpan.FromSeconds(50), e => !(e is ClientDisconnected) || ((ClientDisconnected)e).id == client.Id); //   if (!client.Connected || cancelRequested) { //            Log.Info("Client disconnected or cancel requested"); return; } //      var @event = await eventAwait; ...
      
      





ここで、ClientDisconnectedイベントを追加し、待機可能なeventAwait変数の作成を分離し、イベントを直接待機します。 それらを分離しなかった場合、クライアントはclient.Connectedを確認してイベントを待機した後に切断する可能性があり、これはイベントの損失につながります。 現在のクライアントに関連しないClientDisconnectedイベントを除外するイベントフィルターも追加されました。



イベントを作成する方法は?



これを行うには、IEventを実装するクラスを作成します。



 class CancelRequested : IEvent { public string Reason { get; set; } }
      
      





次に、たとえば、IEventManager.RaiseEventを呼び出します。



 this.EventManager.RaiseEvent(new CancelRequested()).
      
      







IEventから継承すると、イベントが他のクラスから分離され、RaiseEventメソッドで不適切なインスタンスが使用されなくなります。 継承もサポートされています:



 class UserCancelRequested : CancelRequested { } class SystemCancelRequested : CancelRequested { } var @event = await this.EventManager.WaitFor<CancelRequested>(); if (@event is UserCancelRequested) ...
      
      





同時に多くの予想されるイベントがある複雑なシステムがある場合、キャンセルトークンの代わりにCancelRequestedイベントを使用すると、グローバルおよびローカルのCancellationTokenの詐欺やリンクを回避できます。 複雑なリンクは、トークンの保持によるメモリリークをスキップする可能性を高めるため、これは重要です。



イベントを購読するには?



一部のイベントは定期的であり、そのようなイベントはIEventManager.StartReceivingメソッドを使用して受信できます。



 void StartReceiving<T>(Action<T> handler, object listener, Func<T, bool> filter = null, SynchronizationContext context = null) where T : IEvent;
      
      





ハンドラーは、フィルターフィルター(指定されている場合)に一致する各Tイベントのコンテキスト同期コンテキストで呼び出されます。 同期コンテキストが指定されていない場合、SynchronizationContext.Currentが使用されます。



どのように機能しますか?



async / awaitのベースとなる同じタスクメカニズムを使用します。 WaitForが呼び出されると、イベントマネージャーはTaskCompletionSourceを使用してタスクを作成し、メッセージバスで選択されたイベントタイプをサブスクライブします。



 // EventManager.cs,   var taskCompletionSource = new TaskCompletionSource<IEvent>(); var subscription = new MessageSubscription( subscriptionId, message => { var @event = message as IEvent; if (filter != null && !filter(@event)) return; //     if (taskCompletionSource.TrySetResult(@event)) this.trace.TraceEvent(TraceEventType.Information, 0, "Wait ended: '{0}' - '{1}'", subscriptionId, message.GetType()); }, this, UnsubscribePolicy.Auto, this.defaultContext, eventTypes); this.messageBus.Subscribe(subscription); ... return taskCompletionSource.Task;
      
      





イベントが生成されると、RaiseEventメソッドが呼び出され、イベントがバスに渡され、eventTypesにこのタイプが含まれるイベントのタイプに従ってサブスクリプションが選択されます。 次に、サブスクリプションハンドラーが呼び出され、フィルターが満たされると、タスク実行の結果が設定され、await呼び出しのロックが解除されます。



 // EventManager.cs,   public void RaiseEvent(IEvent @event) { this.trace.TraceEvent(TraceEventType.Information, 0, "Event: {0}", @event); this.messageBus.Send(@event); } // MessageBus.cs,   public void Send(object message) { var messageType = message.GetType(); IMessageSubscription[] subscriptionsForMessage; lock (this.subscriptions) { subscriptionsForMessage = this.subscriptions .Where(s => s.MessagesTypes.Any(type => messageType == type || type.IsAssignableFrom(messageType))) .ToArray(); } ... foreach (var subscription in subscriptionsForMessage) subscription.ProccessMessage(message); this.UnsubscribeAutoSubscriptions(subscriptionsForMessage); ... // MessageSubscription.cs public void ProccessMessage(object message) { var messageHandler = this.handler; this.SynchronizationContext.Post(o => messageHandler(message), null); }
      
      





MessageSubscription.ProccessMessageでは、メッセージはユーザー定義の同期コンテキストに渡されるため、メッセージ送信の遅延が回避されます。



私のクラスをマルチスレッドからSpaします!



async / awaitを使用したことのある人は誰でも、awaitが終了した後、現在のスレッドではなく現在の同期コンテキストでコードが実行を継続することを知っています。 StartReceivingを使用してイベントをサブスクライブしてからWaitForを呼び出すと、これが問題になる可能性があります。これにより、クラスコードが異なるスレッドで同時に実行されます(StartReceivingのイベントハンドラーと、await //後のコードは非常に怖い!) これは、ライブラリに含まれるシングルスレッドの同期コンテキストで簡単に修正できます。



 this.serverSynchronizationContext = new SingleThreadSynchronizationContext("Server thread"); this.clientSynchronizationContext = new SingleThreadSynchronizationContext("Client thread"); this.serverSynchronizationContext.Post(async o => await this.RunServer(), null); this.clientSynchronizationContext.Post(async o => await this.RunClient(), null);
      
      





したがって、クライアントは常に「クライアントスレッド」スレッドで実行され、サーバーは「サーバースレッド」で実行されます。 競合状態を考慮することなく、マルチスレッドコードを記述できます。 ボーナスとして、これは単一のストリームの使用率を最大化します。



利点は何ですか?



主な利点は、コードのシンプルさとテスト容易性です。 前者について議論することができれば、誰もが独自の方法で単純さを理解し、2番目の段落ではすべてが明らかです。 マルチスレッドアプリケーションは、イベントのシーケンスをエミュレートする単一のスレッドでテストできます。そのため、モックオブジェクトを作成する必要はなく、相互作用をイベントに減らし、RaiseEvent呼び出しに対する検証を行うことができます。 NUnitの例:



 /// <summary> /// This test demonstrates how to test application that uses Eventing /// All code executes sequently in one thread /// </summary> [TestFixture] public class TestCase : TestBase { [Test] public async Task ClientDoesWork() { var client = new Client(this.EventManager); var doWorkAwaitable = client.DoWork(); this.EventManager.RaiseEvent(new Connected()); // We can debug and step into this.React(); await doWorkAwaitable; Assert.AreEqual(true, client.Connected); } }
      
      





これはどのように使用できますか?



リストで記事を圧倒しないように、イベンティングが使用されているシステムの1つについての短いテキスト説明のみを提供します。 これは、4つのタイプのノードで構成される水平方向にスケーラブルな分散システムであり、そのうちの1つはマスターです。 マスターはすべてのノードと継続的に通信し、それらに対するさまざまな操作の実行を制御します。 各操作は有限状態マシンの形式で表すことができます。遷移は、イベントの発生(タイムアウトまたはキャンセルを含む)です。 各操作では、オートマトンを(最初に行った)古典的な形式で実装することができましたが、現在の状態は個別の変数ではなくコード実行ポイントによって決定されるイベンティングを使用して表現する方がはるかに簡単であることがわかりました。 さらに、各ステップで予想されるすべてのイベントが明示的にリストされ、ホワイトボックスのテストが簡略化されました。



おわりに



この記事では、Eventingライブラリを使用するための主要な機能とオプションについて説明します。 ライブラリは普遍的なふりをせず、高負荷のシステムをサポートしますが、使い慣れたものを少し異なる外観で呼び出す必要があり、安全でマルチスレッドの観点から簡単にテストできるコードを作成できます。



All Articles