当初は、各要素にソースまたはコンシューマが属するかどうかを決定するビットが含まれる静的な循環バッファを作成することになっていた。 アルゴリズムは非常に単純で、ソースはビットがゼロのセルにデータを書き込み、変更を「公開」して、このビットにユニットを書き込み、次の要素に進みます。 一方、このビットが1に等しい要素を持つコンシューマは、メッセージを読み取り、ビットがゼロにリセットされます。 競合状態はありません。すべてが順調です。 しかし、最初のトラフィックテストでは、ソースが1つのスライスで約3万〜4万要素を理論的に生成できることが明らかになりました。 もちろん、これらの行の生成を除いて、彼はまだ何かをしているので、実際にはもっと少なくなりますが、バッファーのサイズを決定することはできません。これで十分です。 この理由の1つは、ファイルへの書き込み速度が不安定であることです。一部のシステムでは、ハードドライブの代わりにCFカードが使用されます。 そして、私は本当にメッセージを失いたくありません。
インターネットで大騒ぎして、次のソリューションに出くわしました。これはタスクで実装しました: drdobbs.com/architecture-and-design/210604448
アルゴリズムの詳細は十分に説明されているため、ここでは繰り返しません。
私が行った2つの変更:
1)消費者ではなく、ソースが要素をリリースする理由を理解できませんでした。 消費者が要素をリリースしても、競合状態は発生しません(ところで、このフレーズをロシア語に翻訳するにはどうすればよいですか?)。 これにより、消費された要素がすぐに削除されるため、消費者から負荷の一部が削除され、使用されるメモリが削減されます。
2)また、トラフィックテスト、またはプロファイラーは、mallocが比較的高価な操作であることを明らかにしました。 ソースメッセージの最大サイズがわかっているため、1つの操作でメモリ割り当てをすぐに8つの要素にグループ化することが決定されました。 これにより、特にプロセッサの負荷が半分になり、元のタスクに追加されて、速度が2倍以上向上しました。
ここでは、ソースコードがプラス以外のSにあったことを直ちに予約する必要があります。これは、秘密保持契約のため、公開できません。 MallockはC-Sharpには適用されないため、2番目のポイントは適用されません。 そして、私はすでに私の自由時間にs-sharpeを勉強していて、自分で書いています。 彼らは私に仕事を提供しましたが、私はこの魔法の言語の経験不足だけを経験したわけではなく、それ以来ずっとやっています。 まあ、ポイントに近い。
Cでの非ブロッキングキューの実装#
最初のステップは、キューの要素を記述することです。
クラスqueItem { パブリックオブジェクトメッセージ。 public queItem next; public queItem(オブジェクトメッセージ= null) { this.message = message; next = null; } }
そして実際にはターン自体:
class locklessQueue //スレッド間ロックレスキュー { queItem first; queItem last; }
ここで、最初はコンシューマーに属し、最後の要素の次はソースに属します。 firstもlastもnullであってはならないため、コンストラクタは「すでに消費された」状態で空の要素を作成します。
public locklessQueue() { 最初=新しいqueItem(); last = first; }
キューに追加し、それに応じてキューから抽出するその他の方法。
public voidproduce(オブジェクトメッセージ) { last.next = new queItem(メッセージ); last = last.next; } パブリックブール消費(オブジェクトメッセージの出力) { if(first == last || first.next == null) { メッセージ= null; falseを返します。 } message = first.next.message; first = first.next; trueを返します。 } }
ConcurrentQueueクラスはすでに4.0に含まれているため、結果のクラスだけでは十分な基盤がありません。これは完全にスレッドセーフであるだけでなく、結果のクラスとは異なり、同時に複数のスレッドをキューに追加および削除できます。 また、ブロックオプションと比較して、キューの処理が1.5〜3倍速くなります。 pruflink
ログコレクターの場合、ConcurrentQueueクラスで十分です。 しかし、アプリケーションのタスクを拡張しましたが、ConcurrentQueueは私に合わず、アドレスレスです。
スレッド間メッセージング
![画像](http://dl.dropbox.com/u/26488961/messenger.png)
各スレッドは、名前で別のスレッドにメッセージを送信できる必要があります。 私の場合、これらはtcpソケットハンドラー(クライアントまたはサーバー)とスレッドハンドラー自体です。 どのプロセッサを送信する必要があるかを見つける方法は、このノートの外に残します。
残念ながら、サブ問題の1つを解決できませんでした-ブロックせずに、交換の参加者として新しいスレッドを追加します。 ConcurrentQueueのソースコードを見たいのですが、その解決策が答えを見つけるのに役立つかもしれません。 明確に、真実は開始メッセージを送信するために、または非同期メソッドからのメッセージのために使用することができますが、今のところ、古典的な場合とほぼ同じブロッキングソリューションを提供します。
先を見据えて、この決定には明らかな欠点があると思います。別のスレッドを実行する必要があるキューを処理する場合、これはブロックの欠如に対する支払いです。 個別のスレッドは明らかにプロセッサの負荷を追加しますが、ブロッキングがないため、各メッセージの処理が高速化されます。 生産性がどれだけ向上/悪化し、どのように拡大するかを評価することは困難です。おそらく、今後このような研究を行うでしょう。
そのため、参加者スレッドごとに2つのキューを作成する必要があります。1つのキューではメッセージを送信し、他のキューから読み取ります。 これら2つのキューのプロキシコンテナ:
クラスthreadNode { パブリック文字列tName; public int tid; locklessQueue outgoing = new locklessQueue(); //メッセンジャーからノードへ locklessQueue incoming = new locklessQueue(); //ノードからメッセンジャーへ パブリックthreadNode(文字列tName、int tid) { this.tid = tid; this.tName = tName; } public void enqueue(messengerItem message)// Nodeによって呼び出されます { incoming.produce(メッセージ); } public bool dequeue(out messengerItem message)// Nodeによって呼び出されます { オブジェクトメッセージ; bool result = outgoing.consume(out msg); message = messengerItemとしてのmsg; 結果を返す; } public void transmit(messengerItem message)// Messengerによって呼び出されます { outgoing.produce(メッセージ); } public bool retrieve(out messengerItemメッセージ)//メッセンジャーによって呼び出されます { オブジェクトメッセージ; bool result = incoming.consume(out msg); message = messengerItemとしてのmsg; 結果を返す; } }
ご覧のとおり、次のクラスで表されるmessengerItem型のオブジェクトはキューに入れられています:
クラスmessengerItem { からの公開文字列。 パブリック文字列 パブリックオブジェクトメッセージ。 public messengerItem(string from、string to、オブジェクトメッセージ) { this.from = from; this.to = to; this.message = message; } }
Messenger.send(...);を記述して、コードのどこからでもメッセージを送信できるように、メインクラスを静的にしました。
パブリックスタティッククラスメッセンジャー { static Dictionary <int、threadNode> byTID = new myDictionary <int、threadNode>(); static辞書<string、threadNode> byRegName = new myDictionary <string、threadNode>(); static Mutex regMutex = new Mutex(); //一度に登録できるタスクは1つだけです
ストリームからメッセージを送信するときに必要なノードを検索するには、辞書を使用します。キーは管理対象スレッドIDで、キーは登録時に指定された名前です。 あるノードから別のノードにメッセージを転送するために、私は自分のスレッドを開始します。ここではシェルを提供しません。つまり、無限ループで記述されたmessengerFunctionをプルし、戻り値がfalseの場合にThread.Sleepを呼び出してスライスを提供します。
静的boolメッセンジャー関数() { bool acted = false; messengerItemアイテム; threadNode dst; 辞書<string、threadNode> tmp = byRegName; foreach(tmp.ValuesのthreadNodeノード) { if(tmp!= byRegName) 休憩; if(node.retrieve(out item)) if(item!= null) { acted = true; if(tmp.TryGetValue(item.to、out dst)) { dst.transmit(アイテム); sent = true; } //そうでなければ破棄 } } リターンが行動した。 }
メッセンジャーにフローを登録するには、現在ブロックしている次の関数が使用されます。
static public void register(string tName){if(tName == null || tName == "")return; int tid = Thread.CurrentThread.ManagedThreadId; myDictionary <int、threadNode> newbyTID = new myDictionary <int、threadNode>(); myDictionary <string、threadNode> newbyRegName = new myDictionary <string、threadNode>(); threadNode newnode =新しいthreadNode(tName、tid); newbyTID.Add(tid、newnode); newbyRegName.Add(tName、newnode); regMutex.WaitOne(); foreach(byTID.ValuesのthreadNodeノード){newbyTID.Add(node.tid、node); newbyRegName.Add(node.tName、node); } byTID = newbyTID; byRegName = newbyRegName; regMutex.ReleaseMutex(); }
また、同様の関数を使用して、ストリームの最後で登録を解除します。そのコードは省略します。 残っているのは、ストリームでメッセージを送受信する機能だけです。
static public void send(文字列宛先、オブジェクトメッセージ) { int tid = Thread.CurrentThread.ManagedThreadId; threadNodeノード。 if(byTID.TryGetValue(tid、out node)) node.enqueue(新しいmessengerItem(node.tName、宛先、メッセージ)); } 静的なパブリックブール受信(オブジェクトメッセージの送信、文字列送信者の送信) { int tid = Thread.CurrentThread.ManagedThreadId; threadNodeノード。 if(!byTID.TryGetValue(tid、out node)) { 送信者= null; メッセージ= null; falseを返します。 } 他に { messengerItemアイテム; bool result = node.dequeue(out item); if(!結果|| item == null) { 送信者= null; メッセージ= null; } 他に { メッセージ= item.message; 送信者= item.from; } 結果を返す; } }
最後の関数は、ループ内のハンドラースレッド、メッセージの処理、または受信されない場合はThread.Sleep()の実行によって使用されます。