.NetでのConcurrentBagの動作

並行コレクションの中で、ConcurrentDictionaryが最も人気があります。 ConcurrentQueueとConcurrentStackも一般的に使用されています。



一般に、スレッドセーフハッシュテーブルのコレクションの一部をロックするソリューションは、非常に単純で論理的であり、したがって、さらに美しくなります。



ConcurrentDictionaryの構造は、DictionaryおよびConcurrentDictionaryの内部のハーバーに関する記事でも説明されていました。 ConcurrentBagは、Producer-Consumerパターンが実装されている場所で主に使用されるため、あまり人気がありません。 さらに、この構造は、同じストリームがコレクションへのデータの追加と削除に従事しているときに最も最適に機能します。 これが発生する理由については、後で説明します。



はじめに



この構造の基本は、ストリーム間ですべてのデータを共有する単純なアイデアです。そのため、各ストリームはストレージの「仮想」部分で可能な限り頻繁に機能します。



.Netでは、ご存じのとおり、すべてのスレッドの1つのインスタンスではなく、各スレッドの独自のインスタンスで変数を作成するために、 [ThreadStatic]属性が使用されます。 .Net Framework 4.0では、 ThreadLocalクラスが追加されました。これは、このようなデータを操作するための便利なラッパーです。







ConcurrentBagでは、データはThreadLocal m_localsに保存されます。 つまり、この構造で動作する各スレッドには、独自のThreadLocalListインスタンスがあります。 揮発性変数m_headListとm_tailListもあり、それぞれm_localsの最初と最後の要素を示します。 これは、コレクション全体を取得する必要があるときにIEnumeratorを取得するために必要です。



ストレージは単方向リンクリストを介して実装されるため、m_localsの「head」および「tail」への参照が存在します 。 つまり、スレッドにはThreadLocalListのインスタンスがあり、このクラスには、別のスレッドのThreadLocalListの次のインスタンスを指すThreadLocalList m_nextListフィールドがあります。 これは、1つのスレッドから、すべてのスレッドでこの変数のすべてのインスタンスにアクセスでき、m_nextListで「歩いている」ことを意味します。



次に、ThreadLocalListクラスの構造を扱います。 また、双方向リンクリストも表します。 要素は通常のNodeクラスで表されます。 最初と最後の要素へのポインタは、それぞれm_headとm_tailです。 また、現在のインスタンス所有者へのリンクを格納するThread m_ownerThreadフィールドがあることにも注意してください。 なぜ作成者ではなく現在であるかについては、後で説明します。 結果は次の構造になります。







アイテムを追加



ThreadLocalListの取得


まず、現在のスレッドのThreadLocalListを取得または作成します(作成されていない場合)。m_headListおよびm_tailListポインターが更新されます。 さらに、ロックはGlobalListLock(同じm_locals)にある同期コードで作成されます。 これは、m_tailListを更新するために必要です。 また、名前が示すとおり、このロックは、コレクション全体、つまり、CopyTo、ToArray、GetEnumerator、Count、IsEmptyでFreezeBagおよびUnFreezeBagメソッドを介してロックが必要な場合に使用されます。



また、作成時には、最初に所有者のないThreadLocalList、つまり、このコレクションを使用して勇敢に死んだスレッドを見つけようとします。 そのようなリストがあれば、それを見つけて、現在のスレッドへのリンクをm_ownerThreadフィールドに割り当てます。



未使用のリストを検索する
private ThreadLocalList GetUnownedList() { ThreadLocalList currentList = m_headList; while (currentList != null) { if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped) { currentList.m_ownerThread = Thread.CurrentThread; return currentList; } currentList = currentList.m_nextList; } return null; }
      
      













ThreadLocalListへのアイテムの追加


2番目のステップは、要素をThreadLinkedListに追加することです。 ブロックせずに「ヘッド」に標準的に追加されます。 ただし、ThreadLocalListの要素の数が2未満の場合、要素の追加時に現在のシートインスタンスにロックがかけられます。この場合、データが失われる可能性があるためです。 これは、現時点で別のスレッドが現在のスレッド(スチールスレッド)のThreadLocalListからデータを取得できるためです。



コレクションからアイテムを取得する



スレッドがコレクションから要素を取得する場合、最初にThreadLocalListに移動し、空でない場合、リンクリストの「ヘッド」から要素を取得します。 ローカルストレージが空の場合、他のスレッドのすべてのストレージでm_nextListを通過し、空でないリストを探します。 見つかったら、そこから要素を「盗み」ます。 さらに、スレッドが所有者に要素を正しく追加することを混乱または防止することなく、要素を「盗む」必要があります。 ここが重要なポイントです。 別のスレッドのリンクリストから要素を取得する場合、「ヘッド」ではなく「テール」から取得します。 つまり、リンクリストに3つ以上の要素がある場合、ストリームはシート全体をブロックせずに要素を盗むことができます。 つまり、この場合、追加された要素と撤回された要素の間に少なくとも1つの中間要素があるため、「競合状態」は不可能です。







要素が2つ未満の場合、ブロックなしで追加することはできません。3つ未満の要素があるように、同期なしで要素を取得することはできません。 以下は、これらの操作が空のストレージで実行され、2つの要素がある場合に要素を追加および削除する例を示します。



ThreadLocalListを使用してスレッドをテストする


テスト用コード(オプション1)
 Task task1, task2, task3; ConcurrentBag<int> bagInt = new ConcurrentBag<int>(); int inputSize = 100 * 1024 * 1024; int[] inputDataInt = new int[inputSize]; for (var i = 0; i < inputSize; i++) { inputDataInt[i] = i; } Stopwatch sw = new Stopwatch(); sw.Start(); task1 = Task.Factory.StartNew(() => { int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); task2 = Task.Factory.StartNew(() => { int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); task3 = Task.Factory.StartNew(() => { int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); Task.WaitAll(task1, task2, task3); sw.Stop();
      
      







テスト用コード(オプション2)
 Task task1, task2, task3; ConcurrentBag<int> bagInt = new ConcurrentBag<int>(); int inputSize = 100 * 1024 * 1024; int[] inputDataInt = new int[inputSize]; for (var i = 0; i < inputSize; i++) { inputDataInt[i] = i; } Stopwatch sw = new Stopwatch(); sw.Start(); task1 = Task.Factory.StartNew(() => { bagInt.Add(-2); bagInt.Add(-1); int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); task2 = Task.Factory.StartNew(() => { bagInt.Add(-2); bagInt.Add(-1); int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); task3 = Task.Factory.StartNew(() => { bagInt.Add(-2); bagInt.Add(-1); int outInt; for (var i = 0; i < inputSize; i++) { bagInt.Add(inputDataInt[i]); bagInt.TryTake(out outInt); } }); Task.WaitAll(task1, task2, task3); sw.Stop();
      
      







この例では、3つのストリームがあり、それぞれがn個の要素を入力してすぐにピックアップします。

この例では、すべてのスレッドがThreadLocalListでのみ機能します。

2番目のケースでは、これらの操作を実行する前に、各ストリームの2つの要素をローカルリストに追加します。 そして、すべてのスレッドがリストのサイズを2から3に、またはその逆に変更することがわかりました。



サイズが100 * 1024 * 1024のint型の配列。

空のコレクション(オプション1)-16秒

ローカルストレージでは、最初に2つの要素が追加されました(オプション2)-12秒。







ThreadLocalListにはm_currentOpプロパティがあり、コレクションで実行される現在の操作(なし、追加、テイク)を示します。 ただし、操作中に、要素の数がそれぞれ追加および取得時に2または3未満の場合は、Noneにリセットされます(リストのロックが実行されます)。

スレッドが別のスレッドのリストから項目を盗もうとするとき、現在の操作がNoneになるまで最初に待機します。 これはSpinWaitを使用して行われます。



 SpinWait spinner = new SpinWait(); while (list.m_currentOp != (int)ListOperation.None) { spinner.SpinOnce(); }
      
      





追加およびテイクのロックは、要素の数が2〜3未満の場合だけでなく、フィールドm_needSync = trueの場合にも発生します。 コレクション全体がブロックされたことを示しています。 コレクション全体がロックされると、すべてのスレッドのすべてのThreadLocalListにもロックが繰り返し適用されます。



おわりに



要約すると、2つの基本原則に注目したいと思います。



1)各スレッドは、ストレージの一部でのみ動作しようとします。

2)ストリームが自宅でデータを見つけられない場合でも、データを「盗む」ときに別のストリームのローカルデータリストをブロックしないようにします。



サイモンクーパーは英語で、この記事のすべての基本原則を簡潔かつ十分に説明しました。

コンカレントコレクション内:ConcurrentBag



All Articles