ActionBlockたたは別のデッドロックに関する短線小説を勉匷したす

ほがすべおの実際のプロゞェクトは、 生産者ず消費者のキュヌの実装の䜕らかの圢を䜿甚しおいるず思いたす 。 問題の背埌にある考え方は非垞に簡単です。 アプリケヌションは、凊理からデヌタの生成を切り離す必芁がありたす。 たずえば、CLRのスレッドプヌルを考えおみたしょう。ThreadPool.QueueUserWorkItemを呌び出すこずで凊理する芁玠を远加したす。スレッドプヌル自䜓は、最適なワヌクフロヌの数を理解し、必芁な䞊列床で芁玠を凊理するメ゜ッドを呌び出したす。



ただし、暙準のスレッドプヌルを䜿甚するこずは、垞に可胜たたは合理的ではありたせん。 スレッドの最小数ず最倧数を指定する可胜性にもかかわらず、この構成はグロヌバルであり、必芁な郚分ではなくアプリケヌション党䜓に圱響したす。 消費者サプラむダヌの問題を解決する方法は他にもたくさんありたす。 アプリケヌションロゞックがマルチスレッド、キュヌ、および同期の偎面ず混圚しおいる堎合、「正面」の゜リュヌションになる可胜性がありたす。 これは、ワヌクフロヌたたはタスクの数を手動で制埡するBlockingCollectionのラッパヌにするこずができたす。 たたは、TPL DataFlowのActionBlock <T>などの完党な゜リュヌションに基づいた゜リュヌションにするこずもできたす。



今日は、 ActionBlockクラスの内郚構造を調べ 、その䜜成者が行った蚭蚈䞊の決定に぀いお議論し、䜿甚時に問題を回避するためにこれらすべおを知る必芁がある理由を芋぀けたす。 準備はいい それでは行こう



私の珟圚のプロゞェクトでは、サプラむダヌず消費者の問題を解決する必芁があるいく぀かのケヌスがありたす。 その1぀は次のようになりたす。TypeScriptに非垞によく䌌た蚀語甚のカスタムパヌサヌずむンタヌプリタヌがありたす。 詳现に深く入り蟌むこずなく、䞀連のファむルを解析し、すべおの䟝存関係のいわゆる「掚移的クロヌゞャヌ」を取埗する必芁があるず蚀えたす。 次に、実行に適したプレれンテヌションに倉換しお実行する必芁がありたす。



解析ロゞックは次のようになりたす。



  1. Parsimファむル。
  2. その内容を分析し、その䟝存関係を探したすすべおの「import * from」、「require」、および同様の構成を分析したす。
  3. 䟝存関係を蚈算したす぀たり、珟圚のファむルが通垞の操䜜に必芁なファむルのセットを芋぀けたす。
  4. 受信した䟝存関係ファむルを解析甚のリストに远加したす。


かなり簡単ですね。 そうです。 TPL DataflowおよびActionBlock <T>クラスに基づいたわずかに単玔化された実装は次のようになりたす。



private static Task<ParsedFile> ParseFileAsync(string path) { Console.WriteLine($"Parsing '{path}'. {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(10); return Task.FromResult( new ParsedFile() { FileName = path, Dependencies = GetFileDependencies(path), }); } static void Main(string[] args) { long numberOfProcessedFiles = 0; ActionBlock<string> actionBlock = null; Func<string, Task> processFile = async path => { Interlocked.Increment(ref numberOfProcessedFiles); ParsedFile parsedFile = await ParseFileAsync(path); foreach (var dependency in parsedFile.Dependencies) { Console.WriteLine($"Sending '{dependency}' to the queue... {{0}}", $"Thread Id - {Thread.CurrentThread.ManagedThreadId}"); await actionBlock.SendAsync(dependency); } if (actionBlock.InputCount == 0) { // This is a marker that this is a last file and there // is nothing to process actionBlock.Complete(); } }; actionBlock = new ActionBlock<string>(processFile); actionBlock.SendAsync("FooBar.ts").GetAwaiter().GetResult(); Console.WriteLine("Waiting for an action block to finish..."); actionBlock.Completion.GetAwaiter().GetResult(); Console.WriteLine($"Done. Processed {numberOfProcessedFiles}"); Console.ReadLine(); }
      
      





ここで䜕が起こるか芋おみたしょう。 簡単にするために、すべおのコアロゞックはMainメ゜ッドにありたす。 倉数numberOfProcessedFilesは 、ロゞックが正しいこずを確認するために䜿甚され、凊理されたファむルの総数が含たれたす。 䞻な䜜業はprocessFileデリゲヌトで行われ、デリゲヌトはActionBlockコンストラクタヌに枡されたす。 このデリゲヌトは、「コンシュヌマ」ず「プロバむダヌ」の䞡方の圹割を果たしたす。path匕数を介しおファむルパスを取埗し、ファむルを解析しお、䟝存関係を芋぀け、 actionBlock.SendAsyncメ゜ッドを呌び出しおキュヌに新しいファむルを送信したす。 次に、凊理キュヌ内の芁玠数のチェックがあり、新しい芁玠がない堎合は、 actionBlock.Complete *を呌び出すこずで操䜜党䜓が完了したす。 次に、 Mainメ゜ッドはActionBlockむンスタンスを䜜成し、最初のファむルの凊理を開始しお、プロセス党䜓の終了を埅ちたす。



ParseFileAsyncメ゜ッドは、ファむル解析プロセスを゚ミュレヌトし、次のプリミティブロゞックを䜿甚しお䟝存関係を蚈算したす。ファむル「foo.ts」は「fo.ts」に䟝存し、「fo.ts」は「f.ts」に䟝存したす。 ぀たり 各ファむルは、短い名前のファむルに䟝存しおいたす。 これは非珟実的なロゞックですが、ファむルの掚移的な閉包を蚈算する基本的な考え方を瀺すこずができたす。



ActionBlockクラスは、䞊行性を管理したす。 確かに 、デフォルトの䞊列床は1であるこずを考慮する必芁があり、これを倉曎するには、 ActionBlockコンストラクタヌでExecutionDataflowBlockOptionsクラスのむンスタンスを枡す必芁がありたす。 MaxDegreeOfParallelismプロパティが1より倧きい堎合、 ActionBlockは異なるスレッド実際には異なるタスクからコヌルバックデリゲヌトを呌び出しお、キュヌ芁玠を䞊列凊理したす。



投皿ず SendAsync䜕をい぀䜿甚するか



䟛絊者ず消費者の問題を少なくずも䞀床独立しお解決しようずした人は誰でも問題に盎面したした。入力デヌタストリヌムが凊理䞭の消費者の胜力を超えた堎合はどうすればよいでしょうか 入力ストリヌムを「絞る」方法は すべおの入力芁玠をメモリに保存するだけですか 䟋倖を投げたすか アむテムの远加メ゜ッドでfalseを返したすか 埪環バッファを䜿甚しお叀い芁玠を砎棄したすか たたは、キュヌに堎所が衚瀺されるたでこのメ゜ッドの実行をブロックしたすか



この問題を解決するために、 ActionBlockの䜜成者は、次の䞀般的に受け入れられおいるアプロヌチを䜿甚するこずにしたした。



  1. クラむアントは、 ActionBlockオブゞェクトを䜜成するずきにキュヌのサむズを指定できたす。
  2. キュヌがいっぱいの堎合、 Postメ゜ッドはfalseを返し、 SendAsync拡匵メ゜ッドは、キュヌに空き領域があるずきに完了するタスクを返したす。


前の䟋では、キュヌのサむズを蚭定したせんでした。 たた、これは、新しい芁玠が凊理されるよりも速く远加されるず、アプリケヌションが遅かれ早かれOutOfMemoryExceptionでクラッシュするこずを意味したす。 しかし、この状況を修正しおみたしょう。 そしお、キュヌを非垞に小さなサむズ、たずえば1芁玠に蚭定したす。



 actionBlock = new ActionBlock<string>(processFile, new ExecutionDataflowBlockOptions() {BoundedCapacity = 1});
      
      





さお、このコヌドを実行するず、...デッドロックが発生したす



画像






デッドロック



デザむンの芳点から消費者ず䟛絊者の問題を考えおみたしょう。 芁玠を凊理するためのコヌルバックメ゜ッドを受け入れる独自のキュヌを䜜成したす。 芁玠数の制限をサポヌトするかどうかを決定する必芁がありたす。 「制限された」キュヌが必芁な堎合、おそらくActionBlockクラスの蚭蚈に非垞によく䌌た蚭蚈になりたす。キュヌがいっぱいの堎合にfalseを返す芁玠を远加する同期メ゜ッドず、タスクを返す非同期メ゜ッドを远加したす。 キュヌがいっぱいの堎合、クラスのクラむアントは䜕をすべきかを決定する機䌚がありたす。芁玠を远加する同期バヌゞョンを呌び出すこずで独自に「オヌバヌフロヌ」を凊理するか、非同期バヌゞョンを䜿甚しおキュヌ内の空きスペヌスの出珟を「埅機」したす。



次に、コヌルバックメ゜ッドをい぀呌び出すかを決定する必芁がありたす。 その結果、次のロゞックに到達できたす。キュヌが空でない堎合、最初の芁玠が取埗され、コヌルバックメ゜ッドが呌び出され、凊理が完了するず予想され、その埌芁玠がキュヌから削陀されたす。 実際の実装は、あらゆる皮類の人皮を考慮に入れる必芁があるため、芋かけよりもはるかに耇雑になりたす。 キュヌは、コヌルバックメ゜ッドを呌び出す前にアむテムを削陀するこずを決定できたすが、すぐにわかるように、これはデッドロックを受け取る可胜性に圱響したせん。



シンプルで゚レガントなデザむンを思い぀きたしたが、簡単に問題に぀ながる可胜性がありたす。 キュヌがいっぱいで、芁玠の1぀を凊理するためにコヌルバックが呌び出されおいるずしたす。 しかし、管理キュヌをすばやく「返す」代わりに、ハンドラヌがawait SendAsyncを呌び出しお別の芁玠を远加しようずするずどうなりたすか。



画像






コヌルバックメ゜ッドがただ完了しおいないため、キュヌがいっぱいで新しいアむテムを受け入れるこずができたせん。 しかし、このメ゜ッドも、 SendAsyncの埅機が完了するのを埅っお スタックし 、堎所がキュヌむングされるたで移動できたせんでした。 叀兞的なデッドロック



コヌルバックメ゜ッドが*完了した埌、 ActionBlockがキュヌからアむテムを削陀するため、デッドロックが発生したす。 しかし、別のシナリオを芋おみたしょう ActionBlockがコヌルバックメ゜ッドを呌び出す前に*芁玠を削陀するずどうなりたすか 実際、䜕も倉わりたせん。 デッドロックは匕き続き可胜です。



キュヌサむズが1で、䞊列床が2であるず想像しおください。







画像






凊理前にキュヌからアむテムを削陀しおも効果がないこずがわかりたす。 さらに、デッドロックの確率が倧幅に䜎䞋するため、これは問題を悪化させるだけです䞊列床がNに等しい堎合、すべおのNコヌルバックメ゜ッドが新しい芁玠を同時にキュヌに远加しようずする必芁がありたす。



別の欠点はそれほど明癜ではありたせん。 ActionBlockはただ汎甚゜リュヌションではありたせん。 このクラスはITargetSourceむンタヌフェむスを実装し、耇雑なデヌタフロヌスクリプトの芁玠を凊理するために䜿甚できたす。 たずえば、芁玠の䞊列凊理のためのいく぀かの「タヌゲット」ブロックを持぀BufferBlockがありたす。 珟圚の実装では、バランシングハンドラヌは簡単な方法で実装されおいたす。 レシヌバヌこの堎合はActionBlock がいっぱいになるず、入力甚の新しい芁玠の受け入れを停止したす。 これにより、チェヌン内の他のブロックが代わりに芁玠を凊理できたす。



芁玠が凊理された埌にのみ削陀された堎合、ActionBlockは貪欲になり、珟圚凊理できるよりも倚くの芁玠を受け入れたす。 この堎合、各ブロックの制限容量は「BoundedCapaciy」+「MaxDegreeOfParallelism」に等しくなりたす。



デッドロックの問題を解決する方法は



私は䜕も恐れおいない。 同時にキュヌ内の芁玠の数を制限する必芁があり、コヌルバックメ゜ッドが新しい芁玠を远加できる堎合 、 ActionBlockを攟棄する必芁がありたす 。 別の方法ずしお、 BlockingCollectionずワヌクフロヌの数の「手動」制埡に基づく゜リュヌションがありたす。たずえば、タスクプヌルたたはParallel.Invokeを䜿甚したす。



䞊列床



TPLのプリミティブずは異なり、TPL Dataflowのすべおのブロックはデフォルトでシングルスレッドです。 ぀たり ActionBlock 、 TransformerBlockなどは、コヌルバックメ゜ッドを1぀ず぀呌び出したす。 TPL Dataflowの䜜成者は、パフォヌマンスの向䞊よりもシンプルさが重芁だず考えたした。 デヌタフロヌグラフに぀いお考えるのは非垞に難しく、すべおのブロックでデヌタを䞊列凊理するず、このプロセスはさらに難しくなりたす。



䞊列床を倉曎するには、ブロックはExecutionDataflowBlockOptionsを枡し、 MaxDegreeOfParallelismプロパティを 1より倧きい倀に蚭定する必芁がありたす。ちなみに、このプロパティが-1に蚭定されおいる堎合、すべおの着信芁玠は新しいタスクによっお凊理され、䞊列凊理は䜿甚されるタスクスケゞュヌラ TaskSchedulerオブゞェクトの機胜によっおのみ制限されたすExecutionDataflowBlockOptionsを介しお枡すこずもできたす。



おわりに



䜿いやすいコンポヌネントの蚭蚈は困難です。 䞊行性の問題を解決する䜿いやすいコンポヌネントの蚭蚈は、二重に難しくなりたす。 これらのコンポヌネントを正しく䜿甚するには、それらの実装方法ず、開発者が念頭に眮いおいた制限を知る必芁がありたす。



ActionBlock <T>クラスは玠晎らしいもので、サプラむダずコンシュヌマのパタヌンの実装を倧幅に簡玠化したす。 ただし、この堎合でも、䞊列床やオヌバヌフロヌの堎合のブロックの動䜜など、TPL Dataflowのいく぀かの偎面に぀いお知っおおく必芁がありたす。



-*この䟋はスレッドセヌフではないため、完党な実装ではactionBlock.InputCountを䜿甚しないでください 。 問題がありたすか



** Postメ゜ッドは、2぀のケヌスのいずれかでfalseを返したす。キュヌがいっぱいであるか、すでに完了しおいたす Completeメ゜ッドが呌び出されたす。 これらの2぀のケヌスを区別できないため、この偎面はこの方法の䜿甚を耇雑にする可胜性がありたす。 䞀方、 SendAsyncメ゜ッドの動䜜は倚少異なりたす。このメ゜ッドは、キュヌがいっぱいのずきに未完了状態になるTask <bool>オブゞェクトを返したすが、キュヌが既に完了しおおり、新しい芁玠を受け入れるこずができない堎合、 task.Resultはfalseになりたす 。



All Articles