Cのシグナル#

こんにちは、Habr。 goのフローとQTのシグナルの同期モデルに触発され、c#で同様の何かを実装するというアイデアが浮上しました。



画像



興味があれば、猫の下でお願いします。



現時点では、c#でスレッドを同期すると、特にアプリケーションのオブジェクト間で同期プリミティブを渡し、将来的にこれをサポートするなど、いくつかの問題が発生します。



TaskおよびIAsyncResultを含む現在のモデルとTPL全体は、適切な設計ですべての問題を解決しますが、フローをブロックして信号を送受信できる単純なクラスを作成したかったのです。



一般的に、特定のインターフェイスは私の頭の中で成熟しています。



public interface ISignal<T> : IDisposable { void Send(T signal); T Receive(); T Receive(int timeOut); }
      
      





ここで、Tは受信者に転送する必要があるエンティティです。



呼び出し例:



  [TestMethod] public void ExampleTest() { var signal = SignalFactory.GetInstanse<string>(); var task1 = Task.Factory.StartNew(() => //   { Thread.Sleep(1000); signal.Send("Some message"); }); //    string message = signal.Receive(); Debug.WriteLine(message); }
      
      





信号オブジェクトを取得するには、ファクトリーを作成します。



  public static class SignalFactory { public static ISignal<T> GetInstanse<T>() { return new Signal<T>(); } public static ISignal<T> GetInstanse<T>(string name) { return new CrossProcessSignal<T>(name); } }
      
      





シグナルは、単一プロセス内で同期するための内部クラスです。 同期にはオブジェクト参照が必要です。



CrossProcessSignalは、個別のプロセスでスレッドを同期できる内部クラスです(詳細は後で説明します)。



シグナルの実装のために



Receiveで最初に頭に浮かぶのは、Semaphoreを使用してスレッドの実行をブロックし、Sendメソッドで、ブロックされたスレッドの数でこのセマフォのRelease()を呼び出すことです。

スレッドのロックを解除した後、Tバッファクラスのフィールドから結果を返します。 ただし、Receiveでハングするスレッドの数はわかりません。Release呼び出しまでにさらに2、3のスレッドが実行されないという保証はありません。



AutoResetEventが同期プリミティブとして選択されました。 新しいストリームごとに、独自のAutoResetEventが作成されます。このすべてを辞書Dictionary <int、AutoResetEvent>に格納します。キーはストリームIDです。



実際には、クラスフィールドは次のようになります。



 private T buffer; Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>(); private volatile object sync = new object(); private bool isDisposabled = false;
      
      





Sendを呼び出すときに同期オブジェクトが必要になるため、複数のスレッドがバッファのマッシングを開始しません。



Dispose()が呼び出されたかどうかを示すisDisposabledフラグ;そうでない場合は、デストラクターで呼び出します。



 public void Dispose() { foreach(var resetEvent in events.Values) { resetEvent.Dispose(); } isDisposabled = true; } ~Signal() { if (!isDisposabled) { Dispose(); } }
      
      





次に、Receiveメソッドについて説明します。



  public T Receive() { var waiter = GetEvents(); waiter.WaitOne(); waiter.Reset(); return buffer; }
      
      





GetEvents()は、辞書からAutoResetEventを取得し、そうでない場合は、新しいものを作成して辞書に入れます。



waiter.WaitOne()は、シグナルを待つ前にストリームをロックします。



waiter.Reset()は、現在のAutoResetEvent状態をリセットします。 WaitOneを次に呼び出すと、スレッドがブロックされます。



各AutoResetEventに対してSetメソッドを呼び出すだけです。



 public void Send(T signal) { lock (sync) { buffer = signal; foreach(var autoResetEvent in events.Values) { autoResetEvent.Set(); } } }
      
      





このモデルはテストで確認できます。



テスト
 private void SendTest(string name = "") { ISignal<string> signal; if (string.IsNullOrEmpty(name)) { signal = SignalFactory.GetInstanse<string>(); //    } else { signal = SignalFactory.GetInstanse<string>(name); } var task1 = Task.Factory.StartNew(() => //   { for (int i = 0; i < 10; i++) { //  ,   var message = signal.Receive(); Debug.WriteLine($"Thread 1 {message}"); } }); var task2 = Task.Factory.StartNew(() => //   { for (int i = 0; i < 10; i++) { //  ,   var message = signal.Receive(); Debug.WriteLine($"Thread 2 {message}"); } }); for (int i = 0; i < 10; i++) { //    . signal.Send($"Ping {i}"); Thread.Sleep(50); } }
      
      





信号クラスのリスト
 using System.Collections.Generic; using System.Threading; namespace Signal { internal class Signal<T> : ISignal<T> { private T buffer; Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>(); private volatile object sync = new object(); private bool isDisposabled = false; ~Signal() { if (!isDisposabled) { Dispose(); } } public T Receive() { var waiter = GetEvents(); waiter.WaitOne(); waiter.Reset(); return buffer; } public T Receive(int timeOut) { var waiter = GetEvents(); waiter.WaitOne(timeOut); waiter.Reset(); return buffer; } public void Send(T signal) { lock (sync) { buffer = signal; foreach(var autoResetEvent in events.Values) { autoResetEvent.Set(); } } } private AutoResetEvent GetEvents() { var threadId = Thread.CurrentThread.ManagedThreadId; AutoResetEvent autoResetEvent; if (!events.ContainsKey(threadId)) { autoResetEvent = new AutoResetEvent(false); events.Add(threadId, autoResetEvent); } else { autoResetEvent = events[threadId]; } return autoResetEvent; } public void Dispose() { foreach(var resetEvent in events.Values) { resetEvent.Dispose(); } isDisposabled = true; } } }
      
      





この実装には、信頼性の観点から成長する余地があります。 ソースには、共有メモリを介した信号伝送によるこのアイデアのプロセッサ間実装があります。興味深い場合は、別の記事を書くことができます。



Githubソース



All Articles