.netの同時実行構造。 内部からのConcurrentQueue

ConcurrentQueueは、ロックフリーの競合データ構造として分類できます。 その実装にはロック(lock、Mutex ...)はなく、以下を使用して実装されます。

-クラシック関数CompareExchange。

-SpinWait

-volatile(メモリバリアとして使用)

ConcurrentQueueは、リングバッファ構造に基づいています。





リングバッファ



リングバッファは、キューデータ構造(FIFO)の実装に最適です。



データの配列と2つのポインター-開始(開始)と終了(終了)に基づいています。



主に2つの操作があります。

  1. プッシュ -最後に追加します。 バッファに新しい要素を追加すると、終了カウンタが1増加し、新しい要素がその場所に書き込まれます。 配列の上限で「休止」した場合、終了値はリセットされ(配列の先頭に「移動」)、要素は配列の先頭に書き込まれ始めます。 終了インデックスが開始インデックスに達するまで書き込みが可能です。
  2. ポップ -最初にアイテムを選択します。 要素の選択は開始要素から行われ、終了に達するまで値が連続的に増加します。 サンプリングは、開始インデックスが終了インデックスに達するまで可能です。




ブロックリングバッファ



ConcurrentQueueは、従来のリングバッファーよりも少し複雑です。 その実装では、セグメント(セグメント)の概念が使用されます。 ConcurrentQueueは、(単方向の)セグメントのリンクリストで構成されます。 セグメントサイズは32です。

private class Segment { volatile VolatileBool[] m_state; volatile T[] m_array; volatile int m_low; volatile int m_high; volatile Segment m_next; }
      
      





最初に、ConcurrentQueueに1つのセグメントが作成されます



必要に応じて、新しいセグメントが右側に追加されます。





結果は、単方向のリンクリストです。 リンクリストの先頭はm_head、末尾はm_tailです。 制限事項:



アイテムの追加(エンキュー)



以下は、要素をセグメントに追加するためのアルゴリズムの例です。



 index = Interlocked.Increment(ref this.m_high); if (index <= 31) { m_array[index] = value; m_state[index].m_value = true; }
      
      





m_state-セルの状態の配列。trueの場合-要素はセルに書き込まれ、falseの場合-まだではありません。 実際、これは一種の「コミット」レコードです。 Interlocked.Incrementインデックスを増やしてからm_array [index] = valueの値を書き込む操作の間、別のスレッドが要素を読み取らないようにする必要があります。 次に、実行後にデータが読み込まれます。

 while (!this.m_state[index].m_value) spinWait2.SpinOnce();
      
      







新しいセグメントの追加(Segment.Grow)



現在のセグメントのm_highが31になるとすぐに、現在のセグメントへの書き込みが停止され、新しいセグメントが作成されます(現在のセグメントは引き続き有効です)。

 m_next = new ConcurrentQueue<T>.Segment(this.m_index + 1L, this.m_source); m_source.m_tail = this.m_next;
      
      





m_next-次のセグメントへのリンク

m_source.m_tail-セグメントのリストの最後のセグメントをリンクします。



要素の選択(TryDequeue)



キューからの要素の選択は、2つの基本的な機能に基づいています。



サンプル操作の近似アルゴリズム:

  1. m_lowを取得する
  2. CompareExchangeを使用してm_lowを1増やす
  3. m_lowが31より大きい場合-次のセグメントに進みます
  4. インデックスm_lowを持つ要素のコミット(m_state [low] .m_value)を待ちます。


 SpinWait spinWait1 = new SpinWait(); int low = this.Low; if (Interlocked.CompareExchange(ref this.m_low, low + 1, low) == low) { SpinWait spinWait2 = new SpinWait(); while (!this.m_state[low].m_value) spinWait2.SpinOnce(); result = this.m_array[low];
      
      







カウントvs IsEmpty



IsEmptyコード:

 ConcurrentQueue<T>.Segment segment = this.m_head; if (!segment.IsEmpty) return false; if (segment.Next == null) return true; SpinWait spinWait = new SpinWait(); for (; segment.IsEmpty; segment = this.m_head) { if (segment.Next == null) return true; spinWait.SpinOnce(); } return false;
      
      





つまり 実際、これは最初の空でないセグメントを見つけることです。 見つかった場合、キューは空ではありません。



カウントコード:

 ConcurrentQueue<T>.Segment head; ConcurrentQueue<T>.Segment tail; int headLow; int tailHigh; this.GetHeadTailPositions(out head, out tail, out headLow, out tailHigh); if (head == tail) return tailHigh - headLow + 1; return 32 - headLow + 32 * (int) (tail.m_index - head.m_index - 1L) + (tailHigh + 1);
      
      





実際、最初と最後のセグメントを検索し、これら2つのセグメントに基づいて要素の数を計算します。

結論-カウント操作はIsEmptyよりも多くのプロセッサー時間を必要とします。



スナップショットとGetEnumerator



ConcurrentQueueフレームワークは、スナップショットテクノロジーをサポートして、要素の完全なセットを提供します。

整数データは以下を返します:



上記の演算子はロックなしでも機能し、カウンターを導入することで整合性が実現します。
 volatile int m_numSnapshotTakers
      
      



キュー全体内-現在の時点でスナップショットを操作している操作の数。 つまり 完全な画像を取得する各操作では、次のコードを実装する必要があります。

 Interlocked.Increment(ref this.m_numSnapshotTakers); try { ...//    } finally { Interlocked.Decrement(ref this.m_numSnapshotTakers); }
      
      





これに加えて、デキュー操作のみが変更を「書き込む」ため、キュー要素へのリンクを削除する必要性のみをチェックします。

 if (this.m_source.m_numSnapshotTakers <= 0) this.m_array[low] = default (T);
      
      






All Articles