ã¯ããã«
éåžžããã«ãã³ã¢ã³ã³ãã¥ãŒã¿ãŒçšã«ããã°ã©ã ãæé©åããå Žåãæåã®ã¹ãããã¯ãã¢ã«ãŽãªãºã ã䞊åå®è¡ããéšåã«åå²ããå¯èœæ§ãèŠã€ããããšã§ãã åé¡ã解決ããããã«ã倧ããªããŒã¿ã»ããããåå¥ã®èŠçŽ ã䞊ååŠçããå¿ èŠãããå Žåãæåã®åè£ã¯ã.NET Framework 4ïŒ Parallel.ForEachããã³Parallel LINQïŒ PLINQ ïŒã®æ°ãã䞊ååŠçæ©èœã§ãã
Parallel.ForEach
Parallelã¯ã©ã¹ã«ã¯ã ForEachã¡ãœãããå«ãŸããŠããŸããããã¯ãCïŒã®éåžžã®foreachã«ãŒãã®ãã«ãã¹ã¬ããããŒãžã§ã³ã§ãã éåžžã®foreachãšåæ§ã«ãParallel.ForEachã¯åæå¯èœãªããŒã¿ãç¹°ãè¿ãåŠçããŸãããè€æ°ã®ã¹ã¬ããã䜿çšããŸãã ããäžè¬çã«äœ¿çšãããParallel.ForEachãªãŒããŒããŒãã®1ã€ã¯æ¬¡ã®ãšããã§ãã
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource> body)
Ienumerableã¯å埩ãããã·ãŒã±ã³ã¹ã瀺ããActionæ¬äœã¯åèŠçŽ ã«å¯ŸããŠåŒã³åºãããªã²ãŒããèšå®ããŸãã Parallel.ForEachãªãŒããŒããŒãã®å®å šãªãªã¹ãã¯ã ããã«ãããŸã ã
Plinq
Parallel.ForEachã«é¢é£ããPLINQã¯ã䞊åããŒã¿æäœã®ããã°ã©ãã³ã°ã¢ãã«ã§ãã ãŠãŒã¶ãŒã¯ããããžã§ã¯ã·ã§ã³ããã£ã«ã¿ãŒãéèšãªã©ãå«ãæšæºã®ãªãã¬ãŒã¿ãŒã»ããããæäœãå®çŸ©ããŸãã Parallel.ForEachãšåæ§ã«ã PLINQã¯å ¥åã·ãŒã±ã³ã¹ãç°ãªãã¹ã¬ããã®ããŒããšåŠçèŠçŽ ã«åå²ããããšã«ããã䞊ååŠçãå®çŸããŸãã
ãã®èšäºã§ã¯ã䞊ååŠçã«å¯Ÿããããã2ã€ã®ã¢ãããŒãã®éãã匷調ããŠããŸãã PLINQã®ä»£ããã«Parallel.ForEachã䜿çšããã®ãæé©ãªäœ¿çšã·ããªãªãçè§£ãããã®éãåæ§ã§ãã
ç¬ç«ããæäœ
ã·ãŒã±ã³ã¹ã®èŠçŽ ã§é·æéã®èšç®ãå®è¡ããå¿ èŠããããçµæãç¬ç«ããŠããå ŽåãParallel.ForEachã䜿çšããããšããå§ãããŸãã PLinqã¯ããã®ãããªæäœã«ã¯éãããŸãã ããã«ã Parallel.ForEachã®æå€§ã¹ã¬ããæ°ã瀺ãããŸããã€ãŸãã ThreadPoolã®ãªãœãŒã¹ãå°ãªãã ParallelOptions.MaxDegreeOfParallelismã§æå®ãããŠãããããå°ãªãã¹ã¬ããã䜿çšå¯èœãªå Žåãæé©ãªã¹ã¬ããæ°ã䜿çšãããå®è¡æã«å¢å ããŸãã PLINQã®å Žåãå®è¡ããã¹ã¬ããã®æ°ã¯å³å¯ã«æå®ãããŠããŸãã
ããŒã¿é åºã®ä¿åã䌎ã䞊åæäœ
é åºãç¶æããPLINQ
倿ã§å ¥åé åºãä¿æããå¿ èŠãããå Žåã¯ã Parallel.ForEachãããPLINQã䜿çšããæ¹ãç°¡åã§ããããšãã»ãšãã©ã§ãã ããšãã°ãåºåæã«RGBã«ã©ãŒãããªãã¬ãŒã ãçœé»ã«å€æããå Žåããã¬ãŒã ã®é åºã¯èªç¶ã«ä¿æãããŸãã ãã®å Žåã PLINQãšAsOrderedïŒïŒé¢æ°ã䜿çšããããšããå§ãããŸããAsOrderedïŒïŒé¢æ°ã¯ãPLINQã®æ·±ãã§å ¥åã·ãŒã±ã³ã¹ãåå²ãã倿ãå®è¡ããŠãããæ£ããé åºã§çµæãé 眮ããŸãã
public static void GrayscaleTransformation(IEnumerable<Frame> Movie) { var ProcessedMovie = Movie .AsParallel() .AsOrdered() .Select(frame => ConvertToGrayscale(frame)); foreach (var grayscaleFrame in ProcessedMovie) { // Movie frames will be evaluated lazily } }
ããã§Parallel.ForEachã䜿çšããªãã®ã¯ãªãã§ããïŒ
äºçްãªå Žåãé€ãã Parallel.ForEachã䜿çšããŠã·ãªã¢ã«ããŒã¿ã«äžŠåæäœãå®è£ ããã«ã¯ã倧éã®ã³ãŒããå¿ èŠã§ãã ãã®å Žåã Foreach颿°ã®ãªãŒããŒããŒãã䜿çšããŠãAsOrderedïŒïŒæŒç®åã®å¹æãç¹°ãè¿ãããšãã§ããŸãã
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState,Int64>body)
Foreachã®ãªãŒããŒããŒãããŒãžã§ã³ã§ã¯ãçŸåšã®èŠçŽ ã®ã€ã³ããã¯ã¹ãã©ã¡ãŒã¿ãŒãããŒã¿ã¢ã¯ã·ã§ã³ããªã²ãŒãã«è¿œå ãããŠããŸãã ããã§ãçµæãåãã€ã³ããã¯ã¹ã®åºåã³ã¬ã¯ã·ã§ã³ã«æžã蟌ã¿ãã³ã¹ãã®ãããèšç®ã䞊è¡ããŠè¡ããæçµçã«æ£ããé åºã§åºåã·ãŒã±ã³ã¹ãååŸã§ããŸãã æ¬¡ã®äŸã¯ã Parallel.ForEachã§é åºãç¶æãã1ã€ã®æ¹æ³ã瀺ããŠããŸãã
public static double [] PairwiseMultiply( double[] v1, double[] v2) { var length = Math.Min(v1.Length, v2.Lenth); double[] result = new double[length]; Parallel.ForEach(v1, (element, loopstate, elementIndex) => result[elementIndex] = element * v2[elementIndex]); return result; }
ãã ãããã®ã¢ãããŒãã®æ¬ ç¹ã¯ããã«çºèŠãããŸãã å ¥åã·ãŒã±ã³ã¹ãé åã§ã¯ãªãIEnumerableåã§ããå Žåãé åºã®ä¿åãå®è£ ããã«ã¯4ã€ã®æ¹æ³ããããŸãã
- æåã®ãªãã·ã§ã³ã¯ãIEnumerable.CountïŒïŒãåŒã³åºãããšã§ãOïŒnïŒããããŸãã èŠçŽ ã®æ°ãããã£ãŠããå Žåã¯ãåºåé åãäœæããŠãæå®ããã€ã³ããã¯ã¹ã«çµæãä¿åã§ããŸã
- 2çªç®ã®ãªãã·ã§ã³ã¯ãã³ã¬ã¯ã·ã§ã³ãå ·äœåããããšã§ãïŒããšãã°ãã³ã¬ã¯ã·ã§ã³ãé åã«å€æããããšã«ããïŒã 倧éã®ããŒã¿ãããå Žåããã®æ¹æ³ã¯ããŸãé©ããŠããŸããã
- 3çªç®ã®ãªãã·ã§ã³ã¯ãåºåã³ã¬ã¯ã·ã§ã³ã«ã€ããŠæ éã«æ€èšããããšã§ãã åºåã³ã¬ã¯ã·ã§ã³ã¯ããã·ã¥ã§ããå Žåãããããã®å Žåãåºåå€ãæ ŒçŽããããã«å¿ èŠãªã¡ã¢ãªã®éã¯ãããã·ã¥ã®è¡çªãé¿ããããã«å ¥åã¡ã¢ãªã®å°ãªããšã2åã«ãªããŸãã 倧éã®ããŒã¿ãããå Žåãããã·ã¥ã®ããŒã¿æ§é ãéåžžã«å€§ãããªããããã«ã誀ã£ãå ±æãšã¬ããŒãžã³ã¬ã¯ã¿ãŒã®ããã«ããã©ãŒãã³ã¹ãäœäžããå¯èœæ§ããããŸãã
- æåŸã®ãªãã·ã§ã³ã¯ãå ã®ã€ã³ããã¯ã¹ã§çµæãä¿åããåºåã³ã¬ã¯ã·ã§ã³ããœãŒãããããã®ç¬èªã®ã¢ã«ãŽãªãºã ãé©çšããããšã§ãã
PLINQã§ã¯ããŠãŒã¶ãŒã¯åã«é åºã®ä¿åãèŠæ±ããã¯ãšãªãšã³ãžã³ã¯çµæã®æ£ããé åºã確ä¿ããããã®ãã¹ãŠã®ã«ãŒãã³ã®è©³çްã管çããŸãã PLINQ ãã¬ãŒã ã¯ãŒã¯ã䜿çšãããšã AsOrderedïŒïŒæŒç®åã§ã¹ããªãŒãã³ã°ããŒã¿ãåŠçã§ããŸããã€ãŸããPLINQã¯é å»¶ãããªã¢ã©ã€ãŒãŒã·ã§ã³ããµããŒãããŸãã PLINQã§ã¯ãã·ãŒã±ã³ã¹å šäœãå ·äœåããããšãææªã®ãœãªã¥ãŒã·ã§ã³ã§ããäžèšã®åé¡ãç°¡åã«åé¿ãã AsOrderedïŒïŒæŒç®åã䜿çšããŠããŒã¿ã®äžŠåæäœãå®è¡ã§ããŸãã
䞊åã¹ããªãŒãã³ã°
PLINQã䜿çšããŠã¹ããªãŒã ãåŠçãã
PLINQã¯ãèŠæ±ãã¹ããªãŒã äžã®èŠæ±ãšããŠåŠçããæ©èœãæäŸããŸãã ãã®æ©èœã¯ã次ã®çç±ã§éåžžã«äŸ¡å€ããããŸãã
- 1.çµæã¯ã¢ã¬ã€ã§å ·äœåãããªããããã¡ã¢ãªã«ããŒã¿ãä¿åããéã®åé·æ§ã¯ãããŸããã
- 2.æ°ããããŒã¿ãåä¿¡ãããšãèšç®ã®åäžã¹ããªãŒã ã§çµæãåæã§ããŸãã
蚌åžã®åæã®äŸãç¶ããŠã蚌åžã®ããŒããã©ãªãªããåããŒããŒã®ãªã¹ã¯ãèšç®ãããªã¹ã¯åæã®åºæºãæºãã蚌åžã®ã¿ãæäŸãããã£ã«ã¿ãªã³ã°çµæã«å¯ŸããŠããã€ãã®èšç®ãå®è¡ãããšããŸãã PLINQã§ã¯ãã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
public static void AnalyzeStocks(IEnumerable<Stock> Stocks) { var StockRiskPortfolio = Stocks .AsParallel() .AsOrdered() .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)}) .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk)); foreach (var stockRisk in StockRiskPortfolio) { SomeStockComputation(stockRisk.Risk); // StockRiskPortfolio will be a stream of results } }
ãã®äŸã§ã¯ãèŠçŽ ã¯ããŒãïŒ ããŒãã£ã·ã§ã³ ïŒã«åæ£ãããè€æ°ã®ã¹ã¬ããã§åŠçãããŠããäžŠã¹æ¿ããããŸãã ãããã®æé ã¯äžŠè¡ããŠå®è¡ãããããšãçè§£ããããšãéèŠã§ãããã£ã«ã¿ãªã³ã°çµæã衚瀺ããããšã foreachã«ãŒãã®ã·ã³ã°ã«ã¹ã¬ããã³ã³ã·ã¥ãŒããèšç®ãå®è¡ã§ããŸãã PLINQã¯ãåŸ ã¡æéã§ã¯ãªãããã©ãŒãã³ã¹çšã«æé©åãããŠãããå éšã§ãããã¡ãŒã䜿çšããŸãã éšåçãªçµæããã§ã«ååŸãããŠããŠããåºåãããã¡ãå®å šã«é£œåãããã以äžã®åŠçãèš±å¯ãããªããªããŸã§åºåãããã¡ã«æ®ãããšããããŸãã ãã®ç¶æ³ã¯ãåºåãããã¡ãªã³ã°ãæå®ã§ããPLINQ WithMergeOptionsæ¡åŒµã¡ãœããã䜿çšããŠä¿®æ£ã§ããŸãã WithMergeOptionsã¡ãœããã¯ã ParallelMergeOptionsåæããã©ã¡ãŒã¿ãŒãšããŠåãåããåäžã®ã¹ããªãŒã ã§äœ¿çšãããæçµçµæãã¯ãšãªãè¿ãæ¹æ³ãæå®ã§ããŸãã æ¬¡ã®ãªãã·ã§ã³ãæäŸãããŸãã
- ParallelMergeOptions.NotBuffered-åŠçãããåã¢ã€ãã ã¯ãåŠçããããšããã«åã¹ã¬ããããè¿ãããããšã瀺ããŸãã
- ParallelMergeOptions.AutoBuffered-èŠçŽ ããããã¡ã«åéãããããšã瀺ããŸãããããã¡ã¯å®æçã«ã³ã³ã·ã¥ãŒãã¹ããªãŒã ã«è¿ãããŸã
- ParallelMergeOptions.FullyBuffered-åºåã·ãŒã±ã³ã¹ãå®å šã«ãããã¡ãªã³ã°ãããŠããããšã瀺ããŸããããã«ãããä»ã®ãªãã·ã§ã³ã䜿çšãããããé«éã«çµæãååŸã§ããŸãããã³ã³ã·ã¥ãŒãã¹ã¬ããã¯åŠçã®ããã«æåã®èŠçŽ ãåä¿¡ãããŸã§é·ãæéåŸ ããªããã°ãªããŸããã
MSDNã§å ¥æå¯èœãªUsingMergeOptionsã®äŸ
Parallel.ForEachãéžã°ãªãçç±
Parallel.ForEachã®æ¬ ç¹ãèã«çœ®ããŠãã·ãŒã±ã³ã¹ã®é åºãä¿æããŸãã Parallel.ForEachã䜿çšããã¹ãââãªãŒã ã§ã®é åºãªãèšç®ã®å Žåãã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
public static void AnalyzeStocks(IEnumerable<Stock> Stocks) { Parallel.ForEach(Stocks, stock => { var risk = ComputeRisk(stock); if(ExpensiveRiskAnalysis(risk) { // stream processing lock(myLock) { SomeStockComputation(risk) }; // store results } }
ãã®ã³ãŒãã¯PLINQã®äŸãšã»ãŒåãã§ãããæç€ºçãªãããã¯ãšããŸãæŽç·ŽãããŠããªãã³ãŒããé€ããŸãã ãã®ç¶æ³ã§ã¯ã Parallel.ForeEachã¯çµæãã¹ã¬ããã»ãŒãã¹ã¿ã€ã«ã§ä¿åããããšãæå³ããŸãããPLINQã¯çµæãä¿åããŸãã
çµæãä¿åããã«ã¯ã3ã€ã®æ¹æ³ããããŸããæåã®æ¹æ³ã¯ãã¹ããªãŒã ã»ãŒãã§ãªãã³ã¬ã¯ã·ã§ã³ã«å€ãä¿åããåã¬ã³ãŒãã«ããã¯ãèŠæ±ããããšã§ãã 2ã€ç®ã¯ãã¹ã¬ããã»ãŒããªã³ã¬ã¯ã·ã§ã³ã«ä¿åããããšã§ãã幞ããªããšã«ã.NET Framework 4ã¯ã System.Collections.Concurrentåå空éã«ãã®ãããªã³ã¬ã¯ã·ã§ã³ã®ã»ãããæäŸãããããèªåã§å®è£ ããå¿ èŠã¯ãããŸããã 3çªç®ã®æ¹æ³ã¯ã ã¹ã¬ããããŒã«ã«ã¹ãã¬ãŒãžã§Parallel.ForEachã䜿çšããããšã§ããããã«ã€ããŠã¯åŸã§èª¬æããŸãã ãããã®åã¡ãœããã§ã¯ãã³ã¬ã¯ã·ã§ã³ãžã®æžã蟌ã¿ã®ãµãŒãããŒãã£ã®åœ±é¿ãæç€ºçã«å¶åŸ¡ããå¿ èŠããããŸãããPLINQã§ã¯ãããã®æäœããæœè±¡åããããšãã§ããŸãã
2ã€ã®ã³ã¬ã¯ã·ã§ã³ã®æäœ
2ã€ã®ã³ã¬ã¯ã·ã§ã³ã®æäœã«PLINQã䜿çšãã
PLINQ ZIPæŒç®åã¯ãç¹ã«2ã€ã®ç°ãªãã³ã¬ã¯ã·ã§ã³ã§äžŠåèšç®ãå®è¡ããŸãã ä»ã®ã¯ãšãªãšçµã¿åãããããšãã§ããããã2ã€ã®ã³ã¬ã¯ã·ã§ã³ãçµã¿åãããåã«ãåã³ã¬ã¯ã·ã§ã³ã§è€éãªæäœãåæã«å®è¡ã§ããŸãã äŸïŒ
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { return a .AsParallel() .AsOrdered() .Select(element => ExpensiveComputation(element)) .Zip( b .AsParallel() .AsOrdered() .Select(element => DifferentExpensiveComputation(element)), (a_element, b_element) => Combine(a_element,b_element)); }
äžèšã®äŸã¯ãåããŒã¿ãœãŒã¹ãç°ãªãæäœã«ãã£ãŠäžŠåã«åŠçãããäž¡æ¹ã®ãœãŒã¹ããã®çµæãZipæŒç®åã«ãã£ãŠçµåãããæ¹æ³ã瀺ããŠããŸãã
Parallel.ForEachãéžã°ãªãçç±
åæ§ã®æäœããã€ã³ããã¯ã¹ã䜿çšããŠParallel.ForEachãªãŒããŒããŒãã䜿çšããŠå®è¡ã§ããŸããæ¬¡ã«äŸã瀺ããŸãã
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { var numElements = Math.Min(a.Count(), b.Count()); var result = new T[numElements]; Parallel.ForEach(a, (element, loopstate, index) => { var a_element = ExpensiveComputation(element); var b_element = DifferentExpensiveComputation(b.ElementAt(index)); result[index] = Combine(a_element, b_element); }); return result; }
ãã ããParallel.ForEachã®ã¢ããªã±ãŒã·ã§ã³ã«ã¯ãããŒã¿ã®é åºãä¿æããããšã§èª¬æãããŠããæœåšçãªãã©ãããšæ¬ ç¹ããããŸããæ¬ ç¹ã®1ã€ã¯ãã³ã¬ã¯ã·ã§ã³å šäœãæåŸãŸã§è¡šç€ºããæç€ºçãªã€ã³ããã¯ã¹ç®¡çãè¡ãããšã§ãã
ã¹ã¬ããããŒã«ã«ç¶æ
Parallel.ForEachã䜿çšããŠã¹ããªãŒã ã®ããŒã«ã«ç¶æ ã«ã¢ã¯ã»ã¹ãã
PLINQã¯ããŒã¿ã®äžŠåæäœã«å¯ŸããŠããç°¡æœãªææ®µãæäŸããŸãããäžéšã®åŠçã·ããªãªã¯Parallel.ForEachã®äœ¿çšã«ããé©ããŠããŸããããšãã°ãã¹ããªãŒã ã®ããŒã«ã«ç¶æ ããµããŒãããæäœãªã©ã§ãã 察å¿ããParallel.ForEachã¡ãœããã®ã·ã°ããã£ã¯æ¬¡ã®ããã«ãªããŸãã
public static ParallelLoopResult ForEach<TSource,TLocal>( IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
AggregateæŒç®åã®ãªãŒããŒããŒãããããã¹ããªãŒã ã®ããŒã«ã«ç¶æ ãžã®ã¢ã¯ã»ã¹ãèš±å¯ããããŒã¿åŠçãã³ãã¬ãŒããæ¬¡å ã®æžå°ãšããŠèšè¿°ã§ããå Žåã«äœ¿çšã§ããããšã«æ³šæããŠãã ããã æ¬¡ã®äŸã¯ãã·ãŒã±ã³ã¹ããçŽ æ°ã§ãªãæ°ãé€å€ããæ¹æ³ã瀺ããŠããŸãã
public static List<R> Filtering<T,R>(IEnumerable<T> source) { var results = new List<R>(); using (SemaphoreSlim sem = new SemaphoreSlim(1)) { Parallel.ForEach(source, () => new List<R>(), (element, loopstate, localStorage) => { bool filter = filterFunction(element); if (filter) localStorage.Add(element); return localStorage; }, (finalStorage) => { lock(myLock) { results.AddRange(finalStorage) }; }); } return results; }
ãã®ãããªæ©èœã¯PLINQã䜿çšãããšã¯ããã«ç°¡åã«å®çŸã§ããŸãããã®äŸã®ç®çã¯ã Parallel.ForEachãšã¹ããªãŒã ã®ããŒã«ã«ç¶æ ã䜿çšãããšãåæã³ã¹ããå€§å¹ ã«åæžã§ããããšã瀺ãããšã§ãã ãã ããä»ã®ã·ããªãªã§ã¯ãããŒã«ã«ãããŒç¶æ ã絶察ã«å¿ èŠã«ãªããŸã;次ã®äŸã¯ãã®ãããªã·ããªãªã瀺ããŠããŸãã
åªç§ãªã³ã³ãã¥ãŒã¿ãŒç§åŠè ããã³æ°åŠè ãšããŠã蚌åžãªã¹ã¯ãåæããããã®çµ±èšã¢ãã«ãéçºãããšæ³åããŠãã ããã ãã®ã¢ãã«ã¯ãä»ã®ãã¹ãŠã®ãªã¹ã¯ã¢ãã«ã9ã«åå²ãããšèããŠããŸãã ããã蚌æããã«ã¯ãæ ªåŒåžå Žã«é¢ããæ å ±ãæã€ãµã€ãã®ããŒã¿ãå¿ èŠã§ãã ãã ããããŒã¿ã·ãŒã±ã³ã¹ã®ããŒãã¯éåžžã«é·ããªãã8ã³ã¢ã³ã³ãã¥ãŒã¿ãŒã®ããã«ããã¯ã«ãªããŸãã Parallel.ForEachã䜿çšãããšã WebClientã䜿çšããŠããŒã¿ã䞊åã«ããŒãããç°¡åãªæ¹æ³ã§ãããããŠã³ããŒãããããã³ã«åã¹ã¬ããããããã¯ãããéåæI / Oã®äœ¿çšãæ¹åã§ããŸãã 詳现ã«ã€ããŠã¯ãã¡ããã芧ãã ãã ã ããã©ãŒãã³ã¹äžã®çç±ããã Parallel.ForEachã䜿çšããŠURLã®ã³ã¬ã¯ã·ã§ã³ãå埩åŠçããããŒã¿ã䞊è¡ããŠã¢ããããŒãããããšã«ããŸããã ã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
public static void UnsafeDownloadUrls () { WebClient webclient = new WebClient(); Parallel.ForEach(urls, (url,loopstate,index) => { webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
é©ããããšã«ãå®è¡æã«äŸå€ãçºçããŸãã ãSystem.NotSupportedException-> WebClientã¯åæI / OæäœããµããŒãããŠããŸããããå€ãã®ã¹ã¬ãããåãWebClientã«åæã«ã¢ã¯ã»ã¹ã§ããªãããšã«æ°ä»ãããã WebClientãäœæããããšã«ããŸããããŠã³ããŒãããšã«ã
public static void BAD_DownloadUrls () { Parallel.ForEach(urls, (url,loopstate,index) => { WebClient webclient = new WebClient(); webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
ãã®ã³ãŒãã«ãããããã°ã©ã ã¯100ãè¶ ããWebã¯ã©ã€ã¢ã³ããäœæã§ããŸã;ããã°ã©ã ã¯WebClientã§ã¿ã€ã ã¢ãŠãäŸå€ãã¹ããŒããŸãã ã³ã³ãã¥ãŒã¿ãŒã§ãµãŒããŒãªãã¬ãŒãã£ã³ã°ã·ã¹ãã ãå®è¡ãããŠããªããããæ¥ç¶ã®æå€§æ°ãå¶éãããŠããããšãããããŸãã æ¬¡ã«ãã¹ããªãŒã ã®ããŒã«ã«ç¶æ ã§Parallel.ForEachã䜿çšãããšåé¡ã解決ãããšæšæž¬ã§ããŸãã
public static void downloadUrlsSafe() { Parallel.ForEach(urls, () => new WebClient(), (url, loopstate, index, webclient) => { webclient.DownloadFile(url, filenames[index]+".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); return webclient; }, (webclient) => { }); } }
ãã®å®è£ ã§ã¯ãåããŒã¿ã¢ã¯ã»ã¹æäœã¯äºãã«ç¬ç«ããŠããŸãã åæã«ãã¢ã¯ã»ã¹ãã€ã³ãã¯ç¬ç«ããŠããããã¹ã¬ããã»ãŒãã§ããããŸããã ããŒã«ã«ã¹ããªãŒã ã¹ãã¬ãŒãžã䜿çšãããšãäœæãããWebClientã€ã³ã¹ã¿ã³ã¹ã®æ°ãå¿ èŠãªæ°ã«ãªããåWebClientã€ã³ã¹ã¿ã³ã¹ããããäœæããã¹ããªãŒã ã«å±ããŠããããšã確èªã§ããŸãã
ããã§PLINQãæªãã®ã¯ãªãã§ããïŒ
ThreadLocalããã³PLINQãªããžã§ã¯ãã䜿çšããŠåã®äŸãå®è£ ããå Žåãã³ãŒãã¯æ¬¡ã®ãšããã§ãã
public static void downloadUrl() { var webclient = new ThreadLocal<WebClient>(()=> new WebClient ()); var res = urls .AsParallel() .ForAll( url => { webclient.Value.DownloadFile(url, host[url] +".dat")); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
å®è£ ã¯åãç®æšãéæããŸãããã©ã®ã·ããªãªã§ãã ThreadLocal <>ã䜿çšããããšã¯ã察å¿ããParallel.ForEachãªãŒããŒããŒããããããªãé«äŸ¡ã§ããããšãçè§£ããããšãéèŠã§ãã ãã®ã·ããªãªã§ã¯ãã€ã³ã¿ãŒããããããã¡ã€ã«ãããŠã³ããŒãããã®ã«ããã£ãæéãšæ¯èŒããŠã ThreadLocal <>ã€ã³ã¹ã¿ã³ã¹ãäœæããã³ã¹ãã¯ç¡èŠã§ããããšã«æ³šæããŠãã ããã
çµäºæäœ
Parallel.ForEachã䜿çšããŠæäœãçµäºãã
æäœã®å®è¡ã®å¶åŸ¡ãäžå¯æ¬ ãªç¶æ³ã§ã¯ã Parallel.ForEachãµã€ã¯ã«ãçµäºãããšããµã€ã¯ã«ã®æ¬äœå ã§èšç®ãç¶ç¶ããå¿ èŠããããã©ããã®æ¡ä»¶ããã§ãã¯ããã®ãšåã广ãåŸãããããšãçè§£ããããšãéèŠã§ãã ParallelLoopStateã远跡ã§ããParallel.ForEachãªãŒããŒããŒãã®1ã€ã¯æ¬¡ã®ããã«ãªããŸãã
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
ParallelLoopStateã¯ã以äžã«èª¬æãã2ã€ã®ç°ãªãæ¹æ³ã§ã«ãŒãå®è¡ã®äžæããµããŒãããŸãã
ParallelLoopState.StopïŒïŒ
StopïŒïŒã¯ãå埩ã忢ããå¿ èŠæ§ã«ã€ããŠã«ãŒãã«éç¥ããŸãã ParallelLoopState.IsStoppedããããã£ã䜿çšãããšãåå埩ã§ä»ã®å埩ãStopïŒïŒã¡ãœãããåŒã³åºãããã©ããã倿ã§ããŸãã éåžžã StopïŒïŒã¡ãœããã¯ãã«ãŒããé åºä»ããããŠããªãæ€çŽ¢ãå®è¡ããã¢ã€ãã ãèŠã€ãã£ããããã«çµäºããå¿ èŠãããå Žåã«åœ¹ç«ã¡ãŸãã ããšãã°ãã³ã¬ã¯ã·ã§ã³ã«ãªããžã§ã¯ããååšãããã©ããã確èªããå Žåãã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var matchFound = false; Parallel.ForEach(TSpace, (curValue, loopstate) => { if (curValue.Equals(match) ) { matchFound = true; loopstate.Stop(); } }); return matchFound; }
PLINQã䜿çšããŠæ©èœãå®çŸããããšãã§ããŸãããã®äŸã§ã¯ãParallelLoopState.StopïŒïŒã䜿çšããŠå®è¡ãããŒãå¶åŸ¡ããæ¹æ³ã瀺ããŸãã
ParallelLoopState.BreakïŒïŒ
BreakïŒïŒã¯ãçŸåšã®èŠçŽ ã®åã®èŠçŽ ãåŠçããå¿ èŠãããããšãã«ãŒãã«éç¥ããŸãããå埩ã®åŸç¶ã®èŠçŽ ã«ã€ããŠã¯åæ¢ããå¿ èŠããããŸãã äœãå埩å€ã¯ã ParallelLoopState.LowestBreakIterationããããã£ããååŸã§ããŸãã BreakïŒïŒã¯éåžžãé åºä»ããããããŒã¿ãæ€çŽ¢ããå Žåã«åœ¹ç«ã¡ãŸãã ã€ãŸããããŒã¿åŠçã®å¿ èŠæ§ã«ã¯äžå®ã®åºæºããããŸãã ããšãã°ãäžèŽãããªããžã§ã¯ãã®äžäœã€ã³ããã¯ã¹ãèŠã€ããå¿ èŠãããéäžæã®èŠçŽ ãå«ãã·ãŒã±ã³ã¹ã®å Žåãã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var loopResult = Parallel.ForEach(source, (curValue, loopState, curIndex) => { if (curValue.Equals(match)) { loopState.Break(); } }); var matchedIndex = loopResult.LowestBreakIteration; return matchedIndex.HasValue ? matchedIndex : -1; }
ãã®äŸã§ã¯ããªããžã§ã¯ããèŠã€ãããŸã§ã«ãŒããå®è¡ãããŸããBreakïŒïŒã·ã°ãã«ã¯ãèŠã€ãã£ããªããžã§ã¯ããããäœãã€ã³ããã¯ã¹ãæã€èŠçŽ ã®ã¿ãåŠçããããšãæå³ããŸãã å¥ã®äžèŽããã€ã³ã¹ã¿ã³ã¹ãèŠã€ãã£ãå ŽåãBreakïŒïŒä¿¡å·ãåã³åä¿¡ãããŸããèŠçŽ ãèŠã€ãããŸã§ç¹°ãè¿ãããŸãããªããžã§ã¯ããèŠã€ãã£ãå ŽåãLowestBreakIterationãã£ãŒã«ãã¯äžèŽãããªããžã§ã¯ãã®æåã®ã€ã³ããã¯ã¹ãæããŸãã
PLINQã䜿çšããªãçç±
PLINQã¯ã¯ãšãªå®è¡ã®çµäºããµããŒãããŠããŸãããPLINQãšParallel.ForEachã®çµäºã¡ã«ããºã ã®éãã¯éèŠã§ãã PLINQãªã¯ãšã¹ããçµäºããã«ã¯ã ããã§èª¬æããããã«ããªã¯ãšã¹ãã«ãã£ã³ã»ã«ããŒã¯ã³ãæäŸããå¿ èŠããããŸã ã C Parallel.ForEachçµäºãã©ã°ã¯ãåå埩ã§ããŒãªã³ã°ãããŸãã PLINQã®å Žåããã£ã³ã»ã«ããããªã¯ãšã¹ãã«äŸåããŠããã«åæ¢ããããšã¯ã§ããŸããã
ãããã«
Parallel.ForEachãšPLINQã¯ãäœæ¥ã®ã¡ã«ããºã ã«æ·±ã浞ãå¿ èŠãªããã¢ããªã±ãŒã·ã§ã³ã«äžŠè¡æ§ããã°ããå°å ¥ããããã®åŒ·åãªããŒã«ã§ãã ãã ããç¹å®ã®åé¡ã解決ããããã®é©åãªããŒã«ãéžæããã«ã¯ããã®èšäºã§èª¬æããéããšãã³ããèŠããŠãããŠãã ããã
䟿å©ãªãªã³ã¯ïŒ
CïŒã®ã¹ã¬ããå
RSDNïŒCïŒã§ã¹ããªãŒã ãæäœããŸãã 䞊è¡ããã°ã©ãã³ã°
.NET Frameworkã䜿çšãã䞊åããã°ã©ãã³ã°ã®Microsoftãµã³ãã«