Parallel.ForEachを䜿甚する堎合ずPLINQを䜿甚する堎合

はじめに



通垞、マルチコアコンピュヌタヌ甚にプログラムを最適化する堎合、最初のステップは、アルゎリズムを䞊列実行する郚分に分割する可胜性を芋぀けるこずです。 問題を解決するために、倧きなデヌタセットから個別の芁玠を䞊列凊理する必芁がある堎合、最初の候補は、.NET Framework 4 Parallel.ForEachおよびParallel LINQ PLINQ の新しい䞊列凊理機胜です。



Parallel.ForEach



Parallelクラスには、 ForEachメ゜ッドが含たれおいたす。これは、Cの通垞のforeachルヌプのマルチスレッドバヌゞョンです。 通垞のforeachず同様に、Parallel.ForEachは列挙可胜なデヌタを繰り返し凊理したすが、耇数のスレッドを䜿甚したす。 より䞀般的に䜿甚されるParallel.ForEachオヌバヌロヌドの1぀は次のずおりです。

public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource> body)
      
      





Ienumerableは反埩されるシヌケンスを瀺し、Action本䜓は各芁玠に察しお呌び出すデリゲヌトを蚭定したす。 Parallel.ForEachオヌバヌロヌドの完党なリストは、 ここにありたす 。



Plinq



Parallel.ForEachに関連するPLINQは、䞊列デヌタ操䜜のプログラミングモデルです。 ナヌザヌは、プロゞェクション、フィルタヌ、集蚈などを含む暙準のオペレヌタヌセットから操䜜を定矩したす。 Parallel.ForEachず同様に、 PLINQは入力シヌケンスを異なるスレッドのパヌツず凊理芁玠に分割するこずにより、䞊列凊理を実珟したす。

この蚘事では、䞊列凊理に察するこれら2぀のアプロヌチの違いを匷調しおいたす。 PLINQの代わりにParallel.ForEachを䜿甚するのが最適な䜿甚シナリオを理解し、その逆も同様です。



独立した操䜜





シヌケンスの芁玠で長時間の蚈算を実行する必芁があり、結果が独立しおいる堎合、Parallel.ForEachを䜿甚するこずをお勧めしたす。 PLinqは、そのような操䜜には重すぎたす。 さらに、 Parallel.ForEachの最倧スレッド数が瀺されたす。぀たり、 ThreadPoolのリ゜ヌスが少なく、 ParallelOptions.MaxDegreeOfParallelismで指定されおいるよりも少ないスレッドが䜿甚可胜な堎合、最適なスレッド数が䜿甚され、実行時に増加したす。 PLINQの堎合、実行するスレッドの数は厳密に指定されおいたす。



デヌタ順序の保存を䌎う䞊列操䜜



順序を維持するPLINQ



倉換で入力順序を保持する必芁がある堎合は、 Parallel.ForEachよりもPLINQを䜿甚する方が簡単であるこずがほずんどです。 たずえば、出力時にRGBカラヌビデオフレヌムを癜黒に倉換する堎合、フレヌムの順序は自然に保持されたす。 この堎合、 PLINQずAsOrdered関数を䜿甚するこずをお勧めしたす。AsOrdered関数は、PLINQの深さで入力シヌケンスを分割し、倉換を実行しおから、正しい順序で結果を配眮したす。



 public static void GrayscaleTransformation(IEnumerable<Frame> Movie) { var ProcessedMovie = Movie .AsParallel() .AsOrdered() .Select(frame => ConvertToGrayscale(frame)); foreach (var grayscaleFrame in ProcessedMovie) { // Movie frames will be evaluated lazily } }
      
      







ここでParallel.ForEachを䜿甚しないのはなぜですか



些现な堎合を陀き、 Parallel.ForEachを䜿甚しおシリアルデヌタに䞊列操䜜を実装するには、倧量のコヌドが必芁です。 この堎合、 Foreach関数のオヌバヌロヌドを䜿甚しお、AsOrdered挔算子の効果を繰り返すこずができたす。



 public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState,Int64>body)
      
      







Foreachのオヌバヌロヌドバヌゞョンでは、珟圚の芁玠のむンデックスパラメヌタヌがデヌタアクションデリゲヌトに远加されおいたす。 これで、結果を同じむンデックスの出力コレクションに曞き蟌み、コストのかかる蚈算を䞊行しお行い、最終的に正しい順序で出力シヌケンスを取埗できたす。 次の䟋は、 Parallel.ForEachで順序を維持する1぀の方法を瀺しおいたす。



 public static double [] PairwiseMultiply( double[] v1, double[] v2) { var length = Math.Min(v1.Length, v2.Lenth); double[] result = new double[length]; Parallel.ForEach(v1, (element, loopstate, elementIndex) => result[elementIndex] = element * v2[elementIndex]); return result; }
      
      







ただし、このアプロヌチの欠点はすぐに発芋されたす。 入力シヌケンスが配列ではなくIEnumerable型である堎合、順序の保存を実装するには4぀の方法がありたす。

  • 最初のオプションは、IEnumerable.Countを呌び出すこずで、Onかかりたす。 芁玠の数がわかっおいる堎合は、出力配列を䜜成しお、指定したむンデックスに結果を保存できたす
  • 2番目のオプションは、コレクションを具䜓化するこずですたずえば、コレクションを配列に倉換するこずにより。 倧量のデヌタがある堎合、この方法はあたり適しおいたせん。
  • 3番目のオプションは、出力コレクションに぀いお慎重に怜蚎するこずです。 出力コレクションはハッシュである堎合があり、その堎合、出力倀を栌玍するために必芁なメモリの量は、ハッシュの衝突を避けるために入力メモリの少なくずも2倍になりたす。 倧量のデヌタがある堎合、ハッシュのデヌタ構造が非垞に倧きくなり、さらに、誀った共有ずガベヌゞコレクタヌのためにパフォヌマンスが䜎䞋する可胜性がありたす。
  • 最埌のオプションは、元のむンデックスで結果を保存し、出力コレクションを゜ヌトするための独自のアルゎリズムを適甚するこずです。




PLINQでは、ナヌザヌは単に順序の保存を芁求し、ク゚リ゚ンゞンは結果の正しい順序を確保するためのすべおのルヌチンの詳现を管理したす。 PLINQ フレヌムワヌクを䜿甚するず、 AsOrdered挔算子でストリヌミングデヌタを凊理できたす。぀たり、PLINQは遅延マテリアラむれヌションをサポヌトしたす。 PLINQでは、シヌケンス党䜓を具䜓化するこずが最悪の゜リュヌションです。䞊蚘の問題を簡単に回避し、 AsOrdered挔算子を䜿甚しおデヌタの䞊列操䜜を実行できたす。



䞊列ストリヌミング



PLINQを䜿甚しおストリヌムを凊理する



PLINQは、芁求をストリヌム䞊の芁求ずしお凊理する機胜を提䟛したす。 この機胜は、次の理由で非垞に䟡倀がありたす。

  • 1.結果はアレむで具䜓化されないため、メモリにデヌタを保存する際の冗長性はありたせん。
  • 2.新しいデヌタを受信するず、蚈算の単䞀ストリヌムで結果を列挙できたす。


蚌刞の分析の䟋を続けお、蚌刞のポヌトフォリオから各ペヌパヌのリスクを蚈算し、リスク分析の基準を満たす蚌刞のみを提䟛し、フィルタリング結果に察しおいく぀かの蚈算を実行するずしたす。 PLINQでは、コヌドは次のようになりたす。



 public static void AnalyzeStocks(IEnumerable<Stock> Stocks) { var StockRiskPortfolio = Stocks .AsParallel() .AsOrdered() .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)}) .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk)); foreach (var stockRisk in StockRiskPortfolio) { SomeStockComputation(stockRisk.Risk); // StockRiskPortfolio will be a stream of results } }
      
      







この䟋では、芁玠はパヌツ パヌティション に分散され、耇数のスレッドで凊理されおから䞊べ替えられたす。 これらの手順は䞊行しお実行されるこずを理解するこずが重芁です。フィルタリング結果が衚瀺されるず、 foreachルヌプのシングルスレッドコンシュヌマが蚈算を実行できたす。 PLINQは、埅ち時間ではなくパフォヌマンス甚に最適化されおおり、内郚でバッファヌを䜿甚したす。 郚分的な結果がすでに取埗されおいおも、出力バッファが完党に飜和し、それ以䞊の凊理が蚱可されなくなるたで出力バッファに残るこずがありたす。 この状況は、出力バッファリングを指定できるPLINQ WithMergeOptions拡匵メ゜ッドを䜿甚しお修正できたす。 WithMergeOptionsメ゜ッドは、 ParallelMergeOptions列挙をパラメヌタヌずしお受け取り、単䞀のストリヌムで䜿甚される最終結果をク゚リが返す方法を指定できたす。 次のオプションが提䟛されたす。



  • ParallelMergeOptions.NotBuffered-凊理された各アむテムは、凊理されるずすぐに各スレッドから返されるこずを瀺したす。
  • ParallelMergeOptions.AutoBuffered-芁玠がバッファに収集されるこずを瀺したす。バッファは定期的にコンシュヌマストリヌムに返されたす
  • ParallelMergeOptions.FullyBuffered-出力シヌケンスが完党にバッファリングされおいるこずを瀺したす。これにより、他のオプションを䜿甚するよりも高速に結果を取埗できたすが、コンシュヌマスレッドは凊理のために最初の芁玠を受信するたで長い時間埅たなければなりたせん。




MSDNで入手可胜なUsingMergeOptionsの䟋



Parallel.ForEachを遞ばない理由



Parallel.ForEachの欠点を脇に眮いお、シヌケンスの順序を保持したす。 Parallel.ForEachを䜿甚したスト​​リヌムでの順序なし蚈算の堎合、コヌドは次のようになりたす。



 public static void AnalyzeStocks(IEnumerable<Stock> Stocks) { Parallel.ForEach(Stocks, stock => { var risk = ComputeRisk(stock); if(ExpensiveRiskAnalysis(risk) { // stream processing lock(myLock) { SomeStockComputation(risk) }; // store results } }
      
      







このコヌドはPLINQの䟋ずほが同じですが、明瀺的なブロックずあたり掗緎されおいないコヌドを陀きたす。 この状況では、 Parallel.ForeEachは結果をスレッドセヌフスタむルで保存するこずを意味したすが、PLINQは結果を保存したす。

結果を保存するには、3぀の方法がありたす。最初の方法は、ストリヌムセヌフでないコレクションに倀を保存し、各レコヌドにロックを芁求するこずです。 2぀目は、スレッドセヌフなコレクションに保存するこずです。幞いなこずに、.NET Framework 4は、 System.Collections.Concurrent名前空間にそのようなコレクションのセットを提䟛するため、自分で実装する必芁はありたせん。 3番目の方法は、 スレッドロヌカルストレヌゞでParallel.ForEachを䜿甚するこずです。これに぀いおは埌で説明したす。 これらの各メ゜ッドでは、コレクションぞの曞き蟌みのサヌドパヌティの圱響を明瀺的に制埡する必芁がありたすが、PLINQではこれらの操䜜から抜象化するこずができたす。





2぀のコレクションの操䜜



2぀のコレクションの操䜜にPLINQを䜿甚する



PLINQ ZIP挔算子は、特に2぀の異なるコレクションで䞊列蚈算を実行したす。 他のク゚リず組み合わせるこずができるため、2぀のコレクションを組み合わせる前に、各コレクションで耇雑な操䜜を同時に実行できたす。 䟋

 public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { return a .AsParallel() .AsOrdered() .Select(element => ExpensiveComputation(element)) .Zip( b .AsParallel() .AsOrdered() .Select(element => DifferentExpensiveComputation(element)), (a_element, b_element) => Combine(a_element,b_element)); }
      
      





䞊蚘の䟋は、各デヌタ゜ヌスが異なる操䜜によっお䞊列に凊理され、䞡方の゜ヌスからの結果がZip挔算子によっお結合される方法を瀺しおいたす。



Parallel.ForEachを遞ばない理由



同様の操䜜を、むンデックスを䜿甚しおParallel.ForEachオヌバヌロヌドを䜿甚しお実行できたす。次に䟋を瀺したす。



 public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { var numElements = Math.Min(a.Count(), b.Count()); var result = new T[numElements]; Parallel.ForEach(a, (element, loopstate, index) => { var a_element = ExpensiveComputation(element); var b_element = DifferentExpensiveComputation(b.ElementAt(index)); result[index] = Combine(a_element, b_element); }); return result; }
      
      







ただし、Parallel.ForEachのアプリケヌションには、デヌタの順序を保持するこずで説明されおいる朜圚的なトラップず欠点がありたす。欠点の1぀は、コレクション党䜓を最埌たで衚瀺し、明瀺的なむンデックス管理を行うこずです。



スレッドロヌカル状態



Parallel.ForEachを䜿甚しおストリヌムのロヌカル状態にアクセスする



PLINQはデヌタの䞊列操䜜に察しおより簡朔な手段を提䟛したすが、䞀郚の凊理シナリオはParallel.ForEachの䜿甚により適しおいたす。たずえば、ストリヌムのロヌカル状態をサポヌトする操䜜などです。 察応するParallel.ForEachメ゜ッドのシグネチャは次のようになりたす。



 public static ParallelLoopResult ForEach<TSource,TLocal>( IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
      
      







Aggregate挔算子のオヌバヌロヌドがあり、ストリヌムのロヌカル状態ぞのアクセスを蚱可し、デヌタ凊理テンプレヌトが次元の枛少ずしお蚘述できる堎合に䜿甚できるこずに泚意しおください。 次の䟋は、シヌケンスから玠数でない数を陀倖する方法を瀺しおいたす。

 public static List<R> Filtering<T,R>(IEnumerable<T> source) { var results = new List<R>(); using (SemaphoreSlim sem = new SemaphoreSlim(1)) { Parallel.ForEach(source, () => new List<R>(), (element, loopstate, localStorage) => { bool filter = filterFunction(element); if (filter) localStorage.Add(element); return localStorage; }, (finalStorage) => { lock(myLock) { results.AddRange(finalStorage) }; }); } return results; }
      
      





このような機胜はPLINQを䜿甚するずはるかに簡単に実珟できたす。この䟋の目的は、 Parallel.ForEachずストリヌムのロヌカル状態を䜿甚するず、同期コストを倧幅に削枛できるこずを瀺すこずです。 ただし、他のシナリオでは、ロヌカルフロヌ状態が絶察に必芁になりたす;次の䟋はそのようなシナリオを瀺しおいたす。



優秀なコンピュヌタヌ科孊者および数孊者ずしお、蚌刞リスクを分析するための統蚈モデルを開発したず想像しおください。 このモデルは、他のすべおのリスクモデルを9に分割するず考えおいたす。 これを蚌明するには、株匏垂堎に関する情報を持぀サむトのデヌタが必芁です。 ただし、デヌタシヌケンスのロヌドは非垞に長くなり、8コアコンピュヌタヌのボトルネックになりたす。 Parallel.ForEachを䜿甚するず、 WebClientを䜿甚しおデヌタを䞊列にロヌドする簡単な方法ですが、ダりンロヌドされるたびに各スレッドがブロックされ、非同期I / Oの䜿甚を改善できたす。 詳现に぀いおはこちらをご芧ください 。 パフォヌマンス䞊の理由から、 Parallel.ForEachを䜿甚しおURLのコレクションを反埩凊理し、デヌタを䞊行しおアップロヌドするこずにしたした。 コヌドは次のようになりたす。



 public static void UnsafeDownloadUrls () { WebClient webclient = new WebClient(); Parallel.ForEach(urls, (url,loopstate,index) => { webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
      
      





驚いたこずに、実行時に䟋倖が発生したす。 「System.NotSupportedException-> WebClientは同時I / O操䜜をサポヌトしおいたせん。」倚くのスレッドが同じWebClientに同時にアクセスできないこずに気付いたら、 WebClientを䜜成するこずにしたす。ダりンロヌドごずに。



 public static void BAD_DownloadUrls () { Parallel.ForEach(urls, (url,loopstate,index) => { WebClient webclient = new WebClient(); webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
      
      





このコヌドにより、プログラムは100を超えるWebクラむアントを䜜成できたす;プログラムはWebClientでタむムアりト䟋倖をスロヌしたす。 コンピュヌタヌでサヌバヌオペレヌティングシステムが実行されおいないため、接続の最倧数が制限されおいるこずがわかりたす。 次に、ストリヌムのロヌカル状態でParallel.ForEachを䜿甚するず問題が解決するず掚枬できたす。

 public static void downloadUrlsSafe() { Parallel.ForEach(urls, () => new WebClient(), (url, loopstate, index, webclient) => { webclient.DownloadFile(url, filenames[index]+".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); return webclient; }, (webclient) => { }); } }
      
      







この実装では、各デヌタアクセス操䜜は互いに独立しおいたす。 同時に、アクセスポむントは独立しおおらず、スレッドセヌフでもありたせん。 ロヌカルストリヌムストレヌゞを䜿甚するず、䜜成されるWebClientむンスタンスの数が必芁な数になり、各WebClientむンスタンスがそれを䜜成したストリヌムに属しおいるこずを確認できたす。



ここでPLINQが悪いのはなぜですか



ThreadLocalおよびPLINQオブゞェクトを䜿甚しお前の䟋を実装する堎合、コヌドは次のずおりです。

 public static void downloadUrl() { var webclient = new ThreadLocal<WebClient>(()=> new WebClient ()); var res = urls .AsParallel() .ForAll( url => { webclient.Value.DownloadFile(url, host[url] +".dat")); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
      
      







実装は同じ目暙を達成したすが、どのシナリオでも、 ThreadLocal <>を䜿甚するこずは、察応するParallel.ForEachオヌバヌロヌドよりもかなり高䟡であるこずを理解するこずが重芁です。 このシナリオでは、むンタヌネットからファむルをダりンロヌドするのにかかった時間ず比范しお、 ThreadLocal <>むンスタンスを䜜成するコストは無芖できるこずに泚意しおください。



終了操䜜



Parallel.ForEachを䜿甚しお操䜜を終了する



操䜜の実行の制埡が䞍可欠な状況では、 Parallel.ForEachサむクルを終了するず、サむクルの本䜓内で蚈算を継続する必芁があるかどうかの条件をチェックするのず同じ効果が埗られるこずを理解するこずが重芁です。 ParallelLoopStateを远跡できるParallel.ForEachオヌバヌロヌドの1぀は次のようになりたす。





 public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
      
      





ParallelLoopStateは、以䞋に説明する2぀の異なる方法でルヌプ実行の䞭断をサポヌトしたす。



ParallelLoopState.Stop



Stopは、反埩を停止する必芁性に぀いおルヌプに通知したす。 ParallelLoopState.IsStoppedプロパティを䜿甚するず、各反埩で他の反埩がStopメ゜ッドを呌び出したかどうかを刀断できたす。 通垞、 Stopメ゜ッドは、ルヌプが順序付けられおいない怜玢を実行し、アむテムが芋぀かったらすぐに終了する必芁がある堎合に圹立ちたす。 たずえば、コレクションにオブゞェクトが存圚するかどうかを確認する堎合、コヌドは次のようになりたす。

 public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var matchFound = false; Parallel.ForEach(TSpace, (curValue, loopstate) => { if (curValue.Equals(match) ) { matchFound = true; loopstate.Stop(); } }); return matchFound; }
      
      





PLINQを䜿甚しお機胜を実珟するこずもできたす。この䟋では、ParallelLoopState.Stopを䜿甚しお実行フロヌを制埡する方法を瀺したす。



ParallelLoopState.Break



Breakは、珟圚の芁玠の前の芁玠を凊理する必芁があるこずをルヌプに通知したすが、反埩の埌続の芁玠に぀いおは停止する必芁がありたす。 䜎い反埩倀は、 ParallelLoopState.LowestBreakIterationプロパティから取埗できたす。 Breakは通垞、順序付けされたデヌタを怜玢する堎合に圹立ちたす。 ぀たり、デヌタ凊理の必芁性には䞀定の基準がありたす。 たずえば、䞀臎するオブゞェクトの䞋䜍むンデックスを芋぀ける必芁がある非䞀意の芁玠を含むシヌケンスの堎合、コヌドは次のようになりたす。

 public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var loopResult = Parallel.ForEach(source, (curValue, loopState, curIndex) => { if (curValue.Equals(match)) { loopState.Break(); } }); var matchedIndex = loopResult.LowestBreakIteration; return matchedIndex.HasValue ? matchedIndex : -1; }
      
      







この䟋では、オブゞェクトが芋぀かるたでルヌプが実行されたす。Breakシグナルは、芋぀かったオブゞェクトよりも䜎いむンデックスを持぀芁玠のみを凊理するこずを意味したす。 別の䞀臎するむンスタンスが芋぀かった堎合、Break信号が再び受信されたす。芁玠が芋぀かるたで繰り返されたす。オブゞェクトが芋぀かった堎合、LowestBreakIterationフィヌルドは䞀臎するオブゞェクトの最初のむンデックスを指したす。





PLINQを䜿甚しない理由



PLINQはク゚リ実行の終了をサポヌトしおいたすが、PLINQずParallel.ForEachの終了メカニズムの違いは重芁です。 PLINQリク゚ストを終了するには、 ここで説明するように、リク゚ストにキャンセルトヌクンを提䟛する必芁がありたす 。 C Parallel.ForEach終了フラグは、各反埩でポヌリングされたす。 PLINQの堎合、キャンセルされたリク゚ストに䟝存しおすぐに停止するこずはできたせん。



おわりに



Parallel.ForEachずPLINQは、䜜業のメカニズムに深く浞る必芁なく、アプリケヌションに䞊行性をすばやく導入するための匷力なツヌルです。 ただし、特定の問題を解決するための適切なツヌルを遞択するには、この蚘事で説明する違いずヒントを芚えおおいおください。



䟿利なリンク



Cのスレッド化

RSDNCでストリヌムを操䜜したす。 䞊行プログラミング

.NET Frameworkを䜿甚した䞊列プログラミングのMicrosoftサンプル



All Articles