並行してJava 8。 サブタスクを作成して実装を監視することを学ぶ

大量のデータの並列処理に関する記事のシリーズを続けます(美しい言葉、嘘?)。



前回の記事では、処理を複数の部分に分割して並列タスクを実行できるFork / Join Frameworkの興味深いツールキットにも出会いました。 この記事の新機能-あなたは尋ねますか? 私の答えは、より本質的な例と高品質の情報処理のための新しいメカニズムです。 並行して、このモードで動作するリソースおよびその他の機能について説明します。







私は猫の下で興味があるすべての人を招待します:



開始する



うまく分割されているものはすべて割り切れます。 これについては以前の記事で書きました。処理を可能な限りパーツに分割し、プロセッサを可能な限りロードすることを提案しています(もちろん、そうでない場合)。 どういうわけか無作法に聞こえた。



はい、もちろん、長い間、家庭用コンピューターであるマルチコアもあります。 このモードで動作する最初の機能がここにあります。 サブタスクの数とコアの数の間のパリティを観察する必要があります。 多数のテストによると、起動式はおおよそ次のとおりです。サブタスクの数は、(コアの数+0)または(コアの数+1)である必要があります。 これらのオプションは、いくつかの深刻なサーバーといくつかの通常のマシンでテストされました。



制限メカニズム



制限メカニズムにより、エラーを処理およびデバッグするためのあらゆる種類のメカニズム(「カットオフ」)を可能な限り迅速かつ便利に理解します。 私のプロジェクトでは、最大数のコードデバッグメソッドを作成しようとしています。次に例を示します。



a)計算用にシングルタスクおよびマルチタスクモードのメカニズムを実装してみてください。 なんで? これから説明します。 プロジェクトが正常に送信され、おそらくテストされたとします。 非標準的な状況の場合、最初にできることは、エラーをすばやく理解して修正し、アプリケーションサーバーの画面でエラーをすぐに取得するために、シングルタスクモード(強制)にすばやく切り替えることです(これが可能で、アクセスできる場合)。



シングルタスクモードで欠陥を特定する方が便利です-パラレルモードが有効になっている場合、最初のサブタスクが停止すると、2番目の実行の一部が表示されます(これは必ずしも表示に便利ではありません)。



b)常に必要なデータにアクセスしてロードするメカニズムを検討します。



もっとお話しします たとえば、処理を開始する前に、約100個の巨大なテーブルをMap <K、V>に変換します。 はい、それはすぐに便利ですが、いくつかの不快な瞬間があります。



ビッグデータのテストを開始するとしましょう。 3つの契約、ブロック、顧客、3つのポジションに違いはありません(「ポジション」と呼びましょう)。 エラーが何であるかを理解し、修正し、jarを書き直し、再起動すると...何もありません! 私たちは再び座って、数分待っています。 計算を待っています。



この状況では、データの選択的なロード(ダウンロード)のメカニズムが役立ちます。



たとえば、さらに最適なオプションではありません。 実際、すべてのデータからマップを作成します。 時々使用されますが。



public Map<Long, String> getValueInDao(Date date) { Map<Long, String> valueMap = new HashMap<Long, String>(); HashMap map = new LinkedHashMap(); map.put("date", date); List<Values> ValuesList = this.findWithQuery("select c from Values c where c.vf < :date and c.vd >= :date", map); if (ValuesList.size() > 0) { ListIterator<Values> iterValues = ValuesList.listIterator(); while (iterValues.hasNext()) { Values tmpValues = iterValues.next(); valueMap.put(tmpValues.getId,tmpValues.getDescr ()); } } if (valueMap.size() > 0) { return valueMap; } return Collections.emptyMap(); }
      
      





次のオプションが望ましい(IMHO)。 その中で、本当に必要なものだけを取得するため、受信データに限定します。



 public Map<Long,String> getValue(List<Long> idList,Date date) { Map<Long,String> valueMap = new HashMap<Long, String>(); HashMap map = new LinkedHashMap(); List<Value> list = new ArrayList<>(); if (idList.size() > 1000) { int j = 0; for (int i = 0; i < idList.size() / 999; i++) { map.put("idList", idList.subList(j, j + 999)); map.put("date", date); list.addAll(this.findWithQuery("select c from Value c where c.id in (" + ":idList" + ") and c.val < :date and c.val2 >= :date", map)); map.clear(); j += 999; } if (j <= idList.size()-1) { map.put("idList", idList.subList(j, idList.size())); map.put("date", date); list.addAll(this.findWithQuery("select c from Value c where c.id in (" + ":idList" + ") and c.val < :date and c.val2 >= :date", map)); } } else { map.put("idList", idList); map.put("date", date); list = this.findWithQuery("select c from Value c where c.id in (" + ":idList" + ") and c.val < :date and c.val2 >= :date", map); } Iterator<Value> iter = list.iterator(); while(iter.hasNext()) { Value tmpValue = iter.next(); valueMap.put(tmpValue.getId, tmpValue.getValue()); } if (valueMap.size() > 0) { return valueMap; } return Collections.emptyMap(); }
      
      





c)エラーを記録し、テーブル(ファイル)およびその他のソースに出力するメカニズムをすぐに作成します。 アルゴリズムが明確に構築されている場合、キーによって実行されるクラスを作成することを妨げるものは何もありません。 たとえば、ダウンロードする前に、エラーのあるテーブルにデータを書き込むことができる「フラグ」があります。これにより、データがどこにあるかを正確に知ることができます。



 if (varKeyError == 1) { --   , ,    err       }
      
      







より速く...



可能性のある機能/エラーについて少し話したので、私たちの当面の目標、つまり大量の情報を並列モードで処理し、既存の例を少し拡張することに進みます(前の記事からそれらを大胆に取り上げます)。



受信データと処理を担当するいくつかのクラスを作成しました。 その後、 RecursiveActionクラスに基づいていました。 例で行われたことをもう一度思い出させてください。 StreamSettingsクラスでは、countValue = 500に設定されたしきい値に達するまでデータを部分に分割します。前に説明したように、制限メカニズムを作成できます。 たとえば、オプション(valueId.size()/ Runtime.getRuntime()。AvailableProcessors()* 2)も機能し、最適な値を見つけるために使用できます。



 public class StreamSettings extends RecursiveAction { final int countValue = 500; final int countProcessors = Runtime.getRuntime().availableProcessors(); List<ValueValue> data; int start, end; StreamSettings(List<ValueValue> valueId,int startNumber,int endNumber) { data = valueId; start = startNumber; end = endNumber; } protected void compute() { if (end - start < countValue || countProcessors < 2) { for(int i = start; i < end; i++) { ValueValue value = data.get(i); try { new CalcGo().calcGo(value); } catch (Exception e) { raiseApplicationError(" " + e.getMessage(), e); } } } else { int middle = (start + end)/ 2; invokeAll(new CalcGo(data, start, middle), new CalcGo(data,middle,end)); } } }
      
      





私たちは研究を続けています。 新しい処理オプションを見てみましょう、 RecursiveTaskクラスで停止します。 主な違いは、compute()メソッドが結果を返すことです(これは非常に頻繁に必要です)。 実際、いくつかのサブタスクが完了するのを待って、計算を実行できます。 以下は、より詳細に説明する例です。

Streamクラスは、サブタスクへの分割を担当します。 この例では、平均値を見つけて、クラスのインスタンス(Stream goVar1 =新しいストリーム(forSplit、start、middle))を0から中間まで作成し、(Stream goVar2 = new Stream(forSplit、middle、end))中間から転送します最後の要素に。



前のバージョンのStreamSettingsクラスとは異なり、invokeAllは使用されませんが、fork()およびjoin()メソッドがそれぞれ呼び出されます。



 public class Stream extends RecursiveTask<Long>{ final int countProcessors = Runtime.getRuntime().availableProcessors(); final int countLimit = 1000; int start; int end; int forSplit; Stream(int componentValue,int startNumber, int endNumber) { forSplit = componentValue; start = startNumber; end = endNumber; } protected Long compute() { Long countSum = 0L; if ( countProcessors == 1 || end - start <= countLimit) { System.out.println("=run="); System.out.println("=start="+start); System.out.println("=end="+end); for(int i = start; i <= end; i++) { countSum += 1 ; } } else { int middle = (start + end)/ 2; /* invokeAll(new Stream(forSplit, start, middle), new Stream(forSplit, middle+1, end));*/ Stream goVar1 = new Stream(forSplit,start, middle); Stream goVar2 = new Stream(forSplit,middle,end); goVar1.fork(); goVar2.fork(); countSum = goVar1.join() + goVar2.join(); } return countSum; } } import java.util.concurrent.ForkJoinPool; public class Start { public static void main(String[] args) { final int componentValue = 2000; Long beginT = System.nanoTime(); ForkJoinPool fjp = new ForkJoinPool(); Stream test = new Stream(componentValue,0,componentValue); Long countSum = fjp.invoke(test); Long endT = System.nanoTime(); Long timebetweenStartEnd = endT - beginT; System.out.println("=====time========" +timebetweenStartEnd); System.out.println("=====countSum========" +countSum); } }
      
      





起動の正しい調整と起動の期待の結果として、計算の非同期実行に便利なシステムを作成できます。



情報処理機能



取り上げるトピックは広範であり、大幅な改善が可能です。 時間の増加は、順次開始と比較して約1.7倍です。 使用可能なリソースをより効率的に使用し、多くの計算を並列モードにすることができます。



皆さん、頑張ってください。 コメントに質問や提案を残してください。 次の記事では、 Heartbeatのような興味深いツールについて説明します。 いや? 知りませんか? それではまた



All Articles