Hazelcastからの分散コンピューティングの調査とテスト

Hazelcastのテストを続けています。 前の投稿で、彼のキューに会いました。 そして、これでは、タスクを分散的に実行する機能を詳しく見ていきます。



データを自分自身にデフレートしてから読み取りおよび/または変更し、分散ストレージに送り返すよりも、できるだけ近いデータを使用する方がはるかに効率的です。 これは、HazelcastがExecutorServiceの分散実装として提供するものです。 また、データを保存するサーバーを制御し、共通キーでグループ化し、同じキーを使用して目的のサーバーでタスクを実行することもできます。



私たちは見つけようとします-これは本当ですか、落とし穴はありますか?







すべてのテストはGitHubで利用できます



それでは、行きましょう。 hazelcastの最新の安定バージョンが使用されます-3.2.3。

(テストは、速度と量の測定に関するサンプルデータを提供します。テストマシンの構成は公開されていません。データはテストを相互に比較するのに十分であり、これが目標です)



テスト1-昔ながらの方法で作業する




このテストの目的は、さらに比較するために分散コンピューティングなしで作業するときのパフォーマンスを測定することです。



2つのノードと2つのリポジトリ(マップ)を使用します。 最も困難なケースをエミュレートします。この場合、所有するすべてのデータがローカルノード上にないことになります。 つまり hazelcastは別のノードからデータをダウンロードすることを強制され、変更後、別のノードにもデータを保存します。 分散タスクコンピューティングが将来のテストで解決すべきすべての問題を作成します。



10万個のタスクをキューにすばやくドロップし、10個のスレッドで完了すると予想しています。



結果:



情報:100000 713秒:0.538

情報:完了タスク:100000秒:15.470



期待が叶いました。 半秒で100kを高速で投げました。 キャスト中、713タスクのみが完了しました。 すべてのタスクは15.5秒で完了しました。



テスト2-分散コンピューティング




hazelcastから分散コンピューティングを有効にします。 また、2番目のノードに必要なデータがある場合、最も難しいケースを起動します。 ただし、hazelcastはこれを判断して、タスクを2番目のノードに送信し、そこで実行する必要があります。



キューに10万個のタスクをすばやくスローし、10個のスレッドで実行することを期待しています(hazelcast構成でこれを指定しました)が、最初のテストよりも高速です。 さらに、タスクが実行されたノードを登録します。



結果:



情報:100000 99998秒:6.308

情報:完了タスク:hz1:0 hz2:100000秒:6.319



すべてのタスクが2番目のノード(hz2:100000)で完了し、速度の増加が2倍以上になったことがわかります。 これはとてもいいです。 さらに、この差は、実際に処理されるデータの量が増えると大きくなります。 これで、vaultには整数しかありません。



しかし、キャストでは非常に奇妙なことが起こります。 キャストが非常に遅くなっていることがわかります。 キャスト中、ほぼすべてのタスクが完了しました。 この謎を少し延期しますが、今度はより多くの寿命テストを見ていきます。



テスト3-より現実的な




このテストでは、2番目のノードに対してのみ人為的にタスクを作成することはありません。 実際には、計算が開始されたノードと同じノードにデータが存在する明るい瞬間があります。



速度はさらに高くなると予想されます。 ノード間でタスクがほぼ均一に分散されることが期待されます。



結果:



情報:100000 99857秒:5.241

情報:完了タスク:hz1:50818 hz2:49182秒:5.252



期待が叶いました。 ノードによるタスクの正規分布(hz1:50818 hz2:49182)が表示され、実行速度が1秒速くなります。



テスト4-遅いタスクを実行する




タスクは異なります。 操作中に入出力リソースを使用するものもあります。 したがって、遅いタスクを実行しようとします(10ミリ秒の実行の遅延を追加します)。 さらに、キャスト中にほぼすべてのタスクが完了するという以前に発見された効果に興味があります。 おそらく、それらは高速であり、最初のテストと比較してヘーゼルキャストのオーバーヘッドがあったということでしょうか?



キャストタスクの時間の短縮は期待できません。 すべてのタスクの実行時間が長くなると予想されます。



結果:



情報:100000 99978秒:59.442

情報:完了タスク:hz1:49522 hz2:50478秒:59.455



キャストも壊滅的に遅くなりました。 これは奇妙です。特に、キャストメソッドexecutorService.executeOnKeyOwner



を使用しているのは、 executorService.submitToKeyOwner



は異なり、実行結果を待つことを期待していないためです。 タスクを終了して忘れてしまった。



ソースコードを読みます。 最初のポイント(コードをスキップできます。結論は以下のとおりです):



  public void executeOnKeyOwner(Runnable command, Object key) { Callable<?> callable = createRunnableAdapter(command); submitToKeyOwner(callable, key); }
      
      







Hazelcastは私たちの挑戦を
 Future submitToKeyOwner.         . 
      



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




 Future submitToKeyOwner.         . 
      



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




 Future submitToKeyOwner.         . 
      



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .




Future submitToKeyOwner. .



( , ):



public static final int SYNC_FREQUENCY = 100; private final AtomicInteger consecutiveSubmits = new AtomicInteger(); private volatile long lastSubmitTime; ... ... boolean sync = !preventSync && checkSync(); CallableTaskOperation op = new CallableTaskOperation(name, uuid, task); ICompletableFuture future = invoke(partitionId, op); if (sync) { Object response; try { response = future.get(); ... ... /** * This is a hack to prevent overloading the system with unprocessed tasks. Once backpressure is added, this can * be removed. */ private boolean checkSync() { boolean sync = false; long last = lastSubmitTime; long now = Clock.currentTimeMillis(); if (last + 10 < now) { consecutiveSubmits.set(0); } else if (consecutiveSubmits.incrementAndGet() % SYNC_FREQUENCY == 0) { sync = true; } lastSubmitTime = now; return sync; }








, ExecutorService



100 10 , hazelcast (100 ) .



, . , .. , . , . , .







Test 5 -



, 4. , hazelcast . . 100 .



hazelcast . .



:



INFO: 100000 25223 sec: 13.350

INFO: Done tasks: hz1: 49257 hz2: 50743 sec: 52.837



25k . , , - . hazelcast ( ) , .



, 50 OOM



おお



. , . - .



Test 6 - OOM



Hazelcast . . 10.



:



WARNING: [10.0.0.3]:5701 [dev] [3.2.3] While executing RunnableAdapter{task=Test6$RunnableTask@219307f2} on Executor[exe] java.util.concurrent.RejectedExecutionException: Executor[exe] is overloaded! at com.hazelcast.util.executor.CachedExecutorServiceDelegate.execute(CachedExecutorServiceDelegate.java:98)







:



public void execute(Runnable command) { if (!taskQ.offer(command)) { throw new RejectedExecutionException("Executor[" + name + "] is overloaded!"); } ...







, - . ( executeOnKeyOwner



), .



:



INFO: 100000 32604 sec: 16.931

INFO: Done tasks: hz1: 26304 hz2: 26281 sec: 27.269



.



Test 7 -



- , , ? executeOnKeyOwner



Future submitToKeyOwner



Callable



.



:



INFO: 100000 31790 sec: 16.713

INFO: Done tasks: hz1: 25898 hz2: 25873 sec: 27.046

INFO: Canceled tasks: 0 execution exceptions: 48229 rejected tasks: 48229



. .







hazelcast , 10 99; , ; , . OOM; . ;



- . , . .







All Articles