UDPおよびC#Reactive Extensions

最近、 UDPとC#async / awaitに関する投稿を読みました。これは、単一のクライアントでUDP経由でデバイスをポーリングする簡単なタスクを解決する方法を説明しています。 async \ awaitを使用して問題を解決すると、非同期呼び出しを手動で実装する場合と比較して、コードの量が実際に削減されます。 一方、タスクの同期、同時データアクセス、および例外処理に関して多くの問題が発生します。 結果のソリューションは非常にエラーが発生しやすくなります。 著者の元のバージョンには、リソースの非リリースのエラーが含まれていました。



簡単で信頼性を高めることは可能ですか?





そして実際に問題は何ですか?



問題はUdpClient.Receive



UdpClient.Receive



)メソッドにあります。 このメソッドはリエントラントではありません。つまり、クライアントがすでにデータグラムの到着を待っている場合、このメソッドを再度呼び出すことはできません。 エラーが発生しなくても、別の「ストリーム」が期待するデータグラムを取得することは非常に可能です。 したがって、ユーザーアクションとUdpClient



の状態を同期する追加のコードを記述する必要があります。



async \ awaitおよびTasks Parallel Libraryには、既製の同期ツールがありません。 元の記事のように、手でコードを記述するか、 TPL Dataflowなどの既製のライブラリを使用する必要があります。 しかし、残念ながら、Dataflowは非常に重いです。



リアクティブ拡張



TPL Dataflowの代わりに、Reactive Extensions(RX)を使用できます。 RXは、非同期データストリーム(非同期シーケンス)を記述します。 RXには、データストリームを作成および操作するための多くの機能があります。 RXを使用すると、IOだけでなく、インターフェイス要素によって生成された「イベントストリーム」も操作できます。 これにより、プログラム全体を一連のデータストリームとして記述することができます。



コード例
最初の問題を解決するには、NuGetからRx-Main



ライブラリをプロジェクトに追加し、いくつかのヘルパーを作成する必要があります。

 public static IObservable<UdpReceiveResult> ReceiveObservable(this UdpClient client) { return client.ReceiveAsync().ToObservable(); } public static IObservable<int> SendObservable(this UdpClient client, byte[] msg, int bytes, string ip, int port) { return client.SendAsync(msg, bytes, ip, port).ToObservable(); } public static IObservable<UdpReceiveResult> ReceiveStream(this UdpClient client) { return Observable.Defer(() => client.ReceiveObservable()).Repeat(); }
      
      





最初の2つのヘルパーは、拡張メソッドを使用してTaskをIObservable(1つの要素の非同期シーケンス)に変換します。

最後のヘルパーは、シーケンス操作の例を示しています。

Observable.Defer



サブスクライバーが表示されるまで、パラメーター内のシーケンスコンストラクターの呼び出しを遅延させます。

拡張メソッド.Repeat()



は、無限に元のシーケンスを繰り返します。

一緒に、2つのメソッドは、ソケットからデータグラムを取得するための無限ループを作成します。



次に、データを送受信する方法:

 public IObservable<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut) { var o = from _ in this.client.SendObservable(msg, msg.Length, ip, port) from r in receiveStream where r.RemoteEndPoint.Address.ToString() == ip && r.RemoteEndPoint.Port == port select r.Buffer; return o.Take(1).Timeout(TimeSpan.FromMilliseconds(timeOut)); }
      
      





はい、はい、RXは非同期シーケンスのLinqをサポートしています。

このLinq式はRXの知識なしでは理解するのがかなり困難ですが、その本質は非常に単純です: SendObservable



ストリームから結果を受信した後、 SendObservable



ストリームSendObservable



サブスクライブし、 where述語を満たす要素のみを取得し、受信したデータグラムからバッファーを返します。 次に、結果のシーケンスの1つの結果が取得され、タイムアウトがハングします。



コードの最も重要な部分はreceiveStream



の定義です:

 receiveStream = client.ReceiveStream().Publish().RefCount();
      
      







ホット、コールド、ウォームシーケンス
RXシーケンスを使用する場合、その「温度」を知ることが重要です。



コールドシーケンスは、シーケンスサブスクライバが表示されたときに表示され、サブスクライバが存在しなくなったときに表示されなくなるものです。

ReceiveStream



拡張ReceiveStream



は、まさにそのようなシーケンスを返します。 これは、各サブスクライバーが独自のシーケンスを持つことを意味します。つまり、いくつかのUdpClient.ReceiveAsync



呼び出しが並行して発生し、冒頭で説明した問題は解決できません。



ホットシーケンス-サブスクライバーとは無関係に存在します。 たとえば、一連のユーザーのマウスの動き。 上記のコードのPublish



機能を使用すると、コールドシーケンスをホットシーケンスに変換できます。 しかし、これには別の問題があります。 UdpClient



コンストラクターでポートを指定UdpClient



ず、 UdpClient



を呼び出す前にReceive



を呼び出すと、エラーがスローされます。



したがって、中間オプションが必要です。シーケンスはすべてのサブスクライバーに共通であり、少なくとも1つのサブスクライバーが存在する限り存在する必要があります。 このシーケンスは「ウォーム」と呼ばれ、 RefCount



を呼び出すことでRefCount



されます。



イベント購読
テストのために、「サーバー」関数も作成しました。

 public IDisposable Listen(Func<UdpReceiveResult, byte[]> process) { return receiveStream.Subscribe(async r => { var msg = process(r); await client.SendObservable(msg, msg.Length, r.RemoteEndPoint); }); }
      
      





Subscribeメソッドを使用すると、非同期シーケンスの各要素で呼び出されるアクションを指定できます。 シーケンスを終了して除外するアクションを設定することもできます。



RXがasync \ awaitをサポートしていることにも注意してください。つまり、非同期シーケンスに基づくコードを使用するためにRXを知る必要はありません。



おわりに



結果のコードには、単一のサイクル、単一の明示的な同期、単一のスレッドまたはタスクの作成は含まれません。 この場合、コードは完全に非同期で安全です。

RXは、たとえ使用しなくても、間違いなく探索する価値があります。 Rxの主要部分は、標準のIEnumerableおよびIEnumeratorインターフェイスにモナド双対性の原理を適用することで発明されたため、RXはコンパクトで強力であることが判明しました。 また、RXはJavaScript、C ++、Java、ScalaおよびPython、Rubyでも使用できます。



ソースコードはクライアントとサーバーとともにgithub-github.com/gandjustas/UdpRxSampleにアップロードされました



All Articles