ExecutorServiceを使用するための10のヒント

Habrahabrの読者に出版物ExecutorService-10のヒントとコツの翻訳を提供します。







ExecutorServiceの抽象化はJava 5で導入されました。2004年は庭にありました。 また、多くのJavaプログラマーは、ExecutorServiceがどのように機能するかを完全には理解していません。 あなたの自由には多くのソースがありますが、今ではあまり知られていない微妙さとそれを扱うための実践についてお話したいと思います。



1.スレッドプールに名前を付ける



私はこれを言及せずにはいられません。 ダンプ時またはデバッグ中に、標準のスレッド命名スキームが次のようになっていることがわかります。プール内のスレッド番号。 たとえば、 pool-2-thread-3は、JVMライフサイクルの2番目のプールの3番目のスレッドを意味します。 参照: Executors.defaultThreadFactory() あまり有益ではありませんか? JDKでは、スレッドに適切な名前を付けることが少し難しくなります。 命名戦略はThreadFactory内に隠されています 。 幸いなことに、Google Guavaにはこのための組み込みクラスがあります。

import com.google.common.util.concurrent.ThreadFactoryBuilder; final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("-%d") .setDaemon(true) .build(); final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
      
      





デフォルトでは、 非デーモンスレッドプールが作成されます。どちらが適切かを自分で決定してください。



2.コンテキストに応じて名前を変更します



このテクニックについては、記事「Supercharged jstack:How to Debug your Servers at 100mph」から学びました。 ストリーム名について知っているので、実行時にいつでも変更できます! ストリームダンプには、パラメーターとローカル変数のないクラスとメソッドの名前が含まれているため、これは理にかなっています。 ストリーム名に重要な情報を含めることで、どのメッセージ/レコード/リクエストなどを簡単に追跡できます。 システムの速度を落とすか、デッドロックを引き起こします。

 private void process(String messageId) { executorService.submit(() -> { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("-" + messageId); try { // ... } finally { currentThread.setName(oldName); } }); }
      
      





try-finallyブロック内では、現在のスレッドはProcessing-ID-of-current-messageと呼ばれ、システム内のメッセージの流れを監視するときに役立ちます。



3.明示的かつ安全な終了



クライアントスレッドとスレッドプールの間には、ジョブキューがあります。 アプリケーションがシャットダウンするとき、2つのことを心配する必要があります。キューで待機しているタスクに何が起こるかと、すでに実行中のタスクがどのように動作するかです(詳細は後述)。 驚くべきことに、多くの開発者はスレッドプールを適切に閉じません。 2つの方法があります。特定のケースに応じて、キュー内のすべてのタスクを動作させる( shutdown() )か、削除する( shutdownNow() )かのいずれかです。 たとえば、一連のタスクをキューに入れ、それらがすべて完了するとすぐに制御を返したい場合は、 shutdown()を使用します。

 private void sendAllEmails(List<String> emails) throws InterruptedException { emails.forEach(email -> executorService.submit(() -> sendEmail(email))); executorService.shutdown(); final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES); log.debug("    ? {}", done); }
      
      





この例では、文字のパケットを送信します。各パケットは、スレッドプールの個別のタスクとして送信されます。 これらのタスクをキューに入れた後、新しいタスクを受け入れないようにプールを閉じます。 次に、すべてのタスクが完了するまで最大1分間待機します。 ただし、一部のタスクがまだ完了していない場合、 awaitTermination()は単にfalseを返します 。 さらに、残りのタスクは引き続き完了します。 私は流行に敏感な人が行く準備ができていることを知っています:

 emails.parallelStream().forEach(this::sendEmail);
      
      





昔ながらの私を呼び出しますが、並列スレッドの数を制御するのが好きです。 段階的なシャットダウン()完了の代替手段はshutdownNow()です。

 final List<Runnable> rejected = executorService.shutdownNow(); log.debug(" : {}", rejected.size());
      
      





今回は、キュー内のすべてのタスクが破棄されて返されます。 すでに実行中のタスクは続行できます。



4.中断の取り扱いには注意してください。



Futureインターフェースのあまり知られていない機能は、キャンセルする機能です。 以下は、以前の記事の1つです: InterruptedExceptionおよび割り込みスレッドの説明

InterruptedExceptionは明確にチェック(チェック)されているため、ここ数年で抑制されたエラーの数について誰も考えなかったでしょう。 そして、それを処理する必要があるので、多くの人がそれを間違っているか、考えずにしています。 定期的に何らかのクリーニングを実行し、ほとんどの時間の間にスリープするスレッドの簡単な例を見てみましょう。

 class Cleaner implements Runnable { Cleaner() { final Thread cleanerThread = new Thread(this, ""); cleanerThread.start(); } @Override public void run() { while(true) { cleanUp(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } private void cleanUp() { //... } }
      
      





とにかくこのコードはひどいです!

  1. 多くの場合、コンストラクターからスレッドを開始するのは悪い考えです。 たとえば、Springなどの一部のフレームワークは、メソッドフックをサポートするために動的サブクラスを作成することを好みます。 最終的に、2つのインスタンスから2つのスレッドを実行します。
  2. InterruptedExceptionは飲み込まれ、適切に処理されません。
  3. このクラスは、各インスタンスで新しいスレッドを開始します。 代わりに、多くのオブジェクトに対して同じスレッドを生成するScheduledThreadPoolExecutorを使用する必要があります。これは、より信頼性が高く効率的です。
  4. さらに、 ScheduledThreadPoolExecutorの助けを借りて、スリープ/ワークサイクルの書き込みを回避し実際にスケジュールすることができます。
  5. 最後になりましたが。 他の誰もCleanerインスタンスを参照していない場合でも、このスレッドを削除する方法はありません。


これらの問題はすべて重要ですが、 InterruptedExceptionを抑制することは最大の罪です。 理由を理解する前に、この例外が必要な理由と、その利点を利用してスレッドを適切に中断する方法について考えてみましょう。 JDKの多くのブロッキング操作では、たとえば次のようにInterruptedExceptionを処理する必要があります。



I / Oのブロックは、InterruptedException(残念なことですが)をスローしないことに注意してください。 これらすべてのクラスがInterruptedExceptionを宣言している場合、これらの例外がスローされると驚くかもしれません。



InterruptedExceptionが実際に何であるかを知っていれば、正しく処理できます。 誰かがスレッドに割り込もうとすると、 InterruptedExceptionを処理してこれを見つけた場合、すぐに終了できるようにするのが賢明でしょう。たとえば:

 class Cleaner implements Runnable, AutoCloseable { private final Thread cleanerThread; Cleaner() { cleanerThread = new Thread(this, "Cleaner"); cleanerThread.start(); } @Override public void run() { try { while (true) { cleanUp(); TimeUnit.SECONDS.sleep(1); } } catch (InterruptedException ignored) { log.debug("Interrupted, closing"); } } //... @Override public void close() { cleanerThread.interrupt(); } }
      
      





この例のtry-catchブロックは、 whileループを囲んでいることに注意してください 。 したがって、 sleep()InterruptedExceptionをスローした場合、このループを中止します。 InterruptedException例外スタックを記録する必要があると主張するかもしれません。 それは状況次第です。 この場合、フローの中断は予想される動作であり、落下ではありません。 一般的に、あなたの裁量で。 ほとんどの場合、スリープ()の間にスレッドが中断され、 run()メソッドをすぐに完了します 。 あなたが非常に注意しているなら、おそらくあなたは尋ねる-cleanUp()の間にストリームが中断されたらどうなるか? 多くの場合、次のようにフラグを手動で設定する決定を思い付きます。

 private volatile boolean stop = false; @Override public void run() { while (!stop) { cleanUp(); TimeUnit.SECONDS.sleep(1); } } @Override public void close() { stop = true; }
      
      





停止フラグ(揮発性である必要があります!)がブロッキング操作を中断することはないことを忘れないでください。 一方、この明示的なフラグを使用すると、より適切な制御が可能になります。 いつでも監視できます。 スレッド割り込みがまったく同じように機能することが判明しました。 非ブロッキング計算(たとえば、 cleanUp() )を実行中に誰かがスレッドを中断した場合、そのような計算はすぐには中断されません。 ただし、スレッドは既に割り込み済みとしてマークされているため、 sleep()などの次のブロック操作はすぐに割り込みされ、 InterruptedExceptionがスローされるため、このシグナルは失われません。



スレッド割り込みメカニズムを利用したい非ブロッキングスレッドを実装する場合にも、この事実を利用できます。 InterruptedExceptionに依存する代わりに、定期的にThread.isInterrupted()をチェックする必要があります。

 public void run() { while (Thread.currentThread().isInterrupted()) { someHeavyComputations(); } }
      
      





ご覧のとおり、誰かがフローを中断した場合、 someHeavyComputations ()の前回の反復が許可するとすぐに計算をキャンセルします 。 非常に長い時間または無限に実行される場合、割り込みフラグに到達することはありません。 このフラグが1回だけではないことは注目に値します。 isInterrupted()の代わりにThread.interrupted()呼び出すフラグがクリアされ、続行できます。 割り込みフラグを無視して実行を継続したい場合があります。 この場合、 中断された()が役に立つかもしれません。



あなたが昔ながらのプログラマーなら、おそらく10年前に廃止されたThread.stop()メソッドを覚えているでしょう。 Java 8には「実装解除」の計画がありましたが、1.8u5ではまだあります。 ただし、それを使用せず、 Thread.interrupt()を使用して発生するコードをリファクタリングします。



InterruptedExceptionを完全に無視したい場合があります。 この場合、グアバのUninterruptiblesクラスに注意してください。 sleepUninterruptibly()awaitUninterruptibly(CountDownLatch)などの多くのメソッドが含まれています 気をつけてください。 InterruptedExceptionをスローしませんが、スレッドを割り込みから完全に排除します。これはかなり珍しいことです。



これで、一部のメソッドがInterruptedExceptionをスローする理由を理解できました。





5.キューの長さを監視し、境界を決定する



間違ったスレッドプールは、パフォーマンスの低下、不安定性、メモリリークを引き起こす可能性があります。 指定するスレッドが少なすぎると、キューが大きくなり、大量のメモリを消費します。 一方、スレッドの数が多すぎると、頻繁なコンテキストの切り替えによりシステム全体の速度が低下し、同じ症状が発生します。 キューの深さを維持し、その境界を決定することが重要です。 また、過負荷のプールは、単に新しいタスクを一時的に中止する場合があります。

 final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);
      
      





上記のコードはExecutors.newFixedThreadPool(n)と同等ですが、デフォルトの無制限のLinkedBlockingQueueを使用する代わりに、固定容量100のArrayBlockingQueueを使用します。これは、100個のタスクが既に完了している場合、次のタスクはRejectedExecutionException例外で拒否されることを意味します さらに、キューは外部からアクセスできるようになったため、ログに記録したり、JMXに送信したりするために、定期的にキューのサイズを照会できます。



6.例外を処理することを忘れないでください。



次のコードを実行した結果はどうなりますか?

 executorService.submit(() -> { System.out.println(1 / 0); });
      
      





私は彼が何も印刷しなかった回数に戸惑いました。 java.lang.ArithmeticExceptionの兆候なし:/ by zero 、なし。 スレッドプールは、例外がスローされたことがないかのように、単に例外を飲み込みました。 プールラッパーを使用せずにゼロから作成されたスレッドの場合、 UncaughtExceptionHandlerが発生した可能性があります。 ただし、スレッドプールでは、さらに注意する必要があります。 Runnableを実行のために送信した場合(上記のように結果なしで)、メソッド本体全体をtry-catch内に配置する必要があります。 Callableをキューに入れる場合は、ブロッキングget()で常に結果を取得して、例外を再度スローするようにしてください。

 final Future<Integer> division = executorService.submit(() -> 1 / 0); //   ExecutionException,  ArithmeticException division.get();
      
      





Springフレームワークでも@Asyncでこの間違いを犯したことは注目に値します。SPR -8995およびSPR-12090を参照してください。



7.順番に待機時間を追跡する



作業キューの深さの監視は一方向です。 単一のトランザクション/タスクで問題を解決する場合、タスクの設定からその実行の開始までにどれだけ時間が経過したかを確認することは理にかなっています。 この時間は理想的にはゼロになります(プールにアイドルスレッドがある場合)が、タスクがキューに入れられると増加します。 さらに、プールのスレッド数が固定されていない場合、新しいタスクを開始するには新しいスレッドの作成が必要になる場合があり、これにも時間がかかります。 これを明確に測定するには、元のExecutorServiceを類似したものでラップします。

 public class WaitTimeMonitoringExecutorService implements ExecutorService { private final ExecutorService target; public WaitTimeMonitoringExecutorService(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { final long startTime = System.currentTimeMillis(); return target.submit(() -> { final long queueDuration = System.currentTimeMillis() - startTime; log.debug(" {}    {} ", task, queueDuration); return task.call(); } ); } @Override public <T> Future<T> submit(Runnable task, T result) { return submit(() -> { task.run(); return result; }); } @Override public Future<?> submit(Runnable task) { return submit(new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }); } //... }
      
      





これは完全な実装ではありませんが、ポイントは明確です。 タスクをスレッドプールに入れた瞬間、すぐにその時間に気付きました。 その後、タスクが削除され、実行のために送信されるとすぐにストップウォッチが停止されました。 ソースコードのstartTimequeueDurationの近くに惑わされないでください。 実際、これらの2行は、異なるスレッドで実行されます。ミリ秒単位、または数秒単位で実行されます。

8.クライアントスタックトレースを保存する



最近、 リアクティブプログラミングに注目が集まっています: リアクティブマニフェストリアクティブストリームRxJava (既に1.0!)、 Clojureエージェントscala.rx ...これはすべて素晴らしいように見えますが、stackraceはもはやあなたの友人ではありません。 たとえば、スレッドプールでのジョブの実行中に発生する次の例外を考えてください。

  java.lang.NullPointerException:null
     com.nurkiewicz.MyTask.call(Main.java:76)〜[クラス/:na]
     com.nurkiewicz.MyTask.call(Main.java:72)〜[クラス/:na]
     java.util.concurrent.FutureTask.run(FutureTask.java:266)〜[na:1.8.0]で
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)〜[na:1.8.0]
     java.util.concurrent.ThreadPoolExecutorで$ Worker.run(ThreadPoolExecutor.java:617)〜[na:1.8.0]
     at java.lang.Thread.run(Thread.java:744)〜[na:1.8.0] 


MyTaskが76行目にNPEをスローしたことは簡単にわかります。しかし、スタックがThreadThreadPoolExecutorのみを参照しているため、誰がこのタスクを承認したのかわかりません。 技術的には、 MyTaskがキューに入れられている領域を1つだけ見つけることを期待して、コードを簡単にナビゲートできます。 しかし、個別のフロー(イベント指向、リアクティブなどのプログラミングは言うまでもありません)がなければ、常に全体像を一度に見ることができます。 (タスクを開始する)クライアントコードのスタックトレースを保存して、エラーが発生した場合などに表示できるとしたらどうでしょうか。 この考え方は新しいものではありません。たとえば、 Hazelcastは所有者ノードからクライアントコードに例外を伝播します。 以下は、これを行う方法の簡単な例です。

 public class ExecutorServiceWithClientTrace implements ExecutorService { protected final ExecutorService target; public ExecutorServiceWithClientTrace(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) { return () -> { try { return task.call(); } catch (Exception e) { log.error(" {}     {}:", e, clientThreadName, clientStack); throw e; } }; } private Exception clientTrace() { return new Exception(" "); } @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return tasks.stream().map(this::submit).collect(toList()); } //... }
      
      





今回は、失敗した場合に、完全なスタックトレースと、ジョブがキューに入れられたスレッドの名前を取得します。 前述の標準的な例外と比較して、はるかに価値のある情報:

メインスレッドからのジョブのJava.lang.NullPointerException例外:

  java.lang.Exception:クライアントスタックトレース
     com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43)〜[クラス/:na]で
     com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28)〜[クラス/:na]で
     com.nurkiewicz.Main.main(Main.java:31)〜[クラス/:na]
     at sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド)〜[na:1.8.0]
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)〜[na:1.8.0]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)〜[na:1.8.0]
     java.lang.reflect.Method.invoke(Method.java:483)〜[na:1.8.0]
     com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)〜[idea_rt.jar:na]で 




9. CompletableFutureを優先



Java 8は、より強力なCompletableFutureクラスを導入しました。 可能な限り使用してください。 ExecutorServiceはこの抽象化をサポートするように拡張されいないため、自分で管理する必要があります。 代わりに:

 final Future<BigDecimal> future = executorService.submit(this::calculate);
      
      







使用:

 final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);
      
      







CompletableFutureFutureを拡張し、すべてが以前のように機能するようにします。 しかし、APIの上級ユーザーは、CompletableFutureによって提供される拡張機能を本当に評価するでしょう。



10.同期キュー



SynchronousQueueBlockingQueueの興味深いバリエーションで、実際にはキューではありません。 それ自体はデータ構造ではありません。 容量がゼロのキューとして最も適切に定義できます。

JavaDocの説明は次のとおりです。

追加される各操作は、別のスレッドで対応する削除操作を待機する必要があり、その逆も同様です。 同期キューには内部容量がなく、単一のキューもありません。 要素は削除しようとしたときにのみ表示されるため、同期キューを調べることはできません。 別のスレッドが削除するまで要素を(メソッドを使用して)挿入することはできません。バイパスするものがないため、キューをバイパスすることはできません。

同期キューは、CSPおよびAdaで使用される「ランデブーチャネル」に似ています。


これはすべてスレッドプールにどのように関係していますか? SynchronousQueueThreadPoolExecutorと一緒に使用してみましょう。

 BlockingQueue<Runnable> queue = new SynchronousQueue<>(); ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, queue);
      
      





その前に、2つのスレッドとSynchronousQueueを持つスレッドプールを作成しました基本的に、SynchronousQueueは容量が0のキューであるため、このようなExecutorServiceは、アイドルスレッドが利用可能な場合にのみ新しいタスクを受け入れます。すべてのスレッドがビジーの場合、新しいタスクはすぐに拒否され、順番に待機することはありません。このモードは、可能な場合、即時のバックグラウンド処理に役立ちます。



以上で、少なくとも1つの興味深い機能を発見できたと思います!



All Articles