CompletableFutureを複数のスレッドで使用する問題とその解決策

画像 Java 8ではCompletableFuture



新しいクラスが導入さCompletableFuture



、これにより非同期コードを簡単に記述できます。

複数のスレッドからCompletableFuture



を使用するとき、その非自明な動作に遭遇しました。つまり、そのスレッドでのコールバックは、それらのスレッドではまったく期待どおりに実行できないということです。 このことと、どうやって問題を解決したかについて-この記事で説明します。



スレッドセーフなデータ構造を使用するサーバーに対して、非同期の非ブロッキングシングルスレッドクライアントを開発しました。 テストは問題なく合格しましたが、ベンチマークは、シングルスレッドクライアントの内部構造でConcurrentModificationException



が発生することがありました。



クライアントの非同期はCompletableFuture



を使用して実装され、クライアント内のすべての操作は1つのスレッド(以降、コードsingleThreadExecutor



)で実行されました。



ユーザーが使用できるget



メソッドを使用したクライアントコードのフラグメント:



 //   private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>()); public CompletableFuture<String> get(String key) { CompletableFuture<String> future = new CompletableFuture<>(); //       singleThreadExecutor.execute(() -> { // future     pendingFutures.add(future); future.whenComplete((v, e) -> { // future       pendingFutures.remove(future); }); //            //     future.complete(data);    singleThreadExecutor }); return future; }
      
      





これは行われるべきではないことが判明しました。



CompletableFuture



のjavadocを注意深く読んでいれば、おそらくこれについては以前に知っていただろう。



javadocを表示
非非同期メソッドの依存する完了のために提供されるアクションは、現在のCompletableFutureを完了するスレッドによって、または完了メソッドのその他の呼び出し元によって実行されます。



このアーキテクチャを使用する場合、 CompletableFuture



すべてのコールバックは、 CompletableFuture



と同じスレッドで呼び出される必要があります。



上記のコードによると、それは起こっているようです。 ただし、ベンチマークは、同じクライアントスレッド( singleThreadExecutor



)のpendingFutures



pendingFutures



するコードのConcurrentModificationException



で終了する場合がありました。



実際、 future.whenComplete



future.whenComplete



を呼び出す)に渡されるコールバックは、完全に異なるスレッドで実行される場合があります。 むしろ、私のクライアントが使用するアプリケーションのスレッドで:



 Client client = new Client("127.0.0.1", 8080); CompletableFuture<String> result = client.get(key); result.thenAccept(data -> { System.out.println(data); });
      
      





このアプリケーションでresult.thenAccept



を呼び出すと、クライアントコード自体の内部に追加された将来のコールバックの残りを呼び出すことがあります。



問題を簡単な例で見てみましょう。



 Thread mainThread = Thread.currentThread(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); future.complete(null);
      
      





コールバックはcompleteメソッドと同じスレッドで実行されるため、このようなコードは常にtrue



を表示しtrue







ただし、他のスレッドからCompletableFuture



への呼び出しが少なくとも1つある場合、動作が変わる可能性があります。



 //  Thread mainThread = Thread.currentThread(); //   Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread) }); //  callback    future    executor.execute(() -> { future.thenRun(() -> { //nop }); }); // future future.complete(null);
      
      





そのようなコードは時々 false



返すことがありfalse







実際には、同じ未来でthenRun



を呼び出しthenRun



が、2番目のスレッドで、最初のthenRun



コールバックが発生する可能性があります。 この場合、最初のthenRun



コールバックは2番目のスレッドで呼び出されます。



これは、 future.complete(null)



が実行を開始した時点で発生しますが、まだコールバックを呼び出すことができず、 thenRun



2番目のスレッドでthenRun



れました。



問題は簡単に解決されます。



 //  Thread mainThread = Thread.currentThread(); //   Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); //  callback    future    executor.execute(() -> { secondThreadFuture.thenRun(() -> { //nop }); }); // future future.complete(null);
      
      





最初のfutureの結果に依存するsecondThreadFutureを追加しました。 そして、2番目のスレッドでthenRun



を呼び出しても、元のfutureでコールバックがトリガーされる可能性はありません。



ユーザー定義のスレッドでのコールバックの呼び出しを保証するために、 CompletableFuture



は、 thenRunAsync



に渡す必要のあるthenRunAsync



などの非同期メソッドの実装があります。 ただし、メソッドの非同期バージョンは、通常のバージョンよりも実行速度が遅くなる場合があります。 したがって、私はそれらを再び使用したくありませんでした。



おわりに



私が自分でCompletableFuture



結論:複数のスレッドで1つのCompletableFuture



オブジェクトを使用しないでCompletableFuture



オブジェクトのすべてのコールバックが特定のスレッドで実行されることを確認する必要がある場合。 また、1つのCompletableFutureで複数のストリームを使用する必要がある場合、元のCompletableFuture



ではなく、元のCompletableFuture



に依存する新しいストリームに転送するだけで十分です。 たとえば、次のように:



 CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> { //nop });
      
      






All Articles