CompletableFuture
新しいクラスが導入さ
CompletableFuture
、これにより非同期コードを簡単に記述できます。
複数のスレッドから
CompletableFuture
を使用するとき、その非自明な動作に遭遇しました。つまり、そのスレッドでのコールバックは、それらのスレッドではまったく期待どおりに実行できないということです。 このことと、どうやって問題を解決したかについて-この記事で説明します。
スレッドセーフなデータ構造を使用するサーバーに対して、非同期の非ブロッキングシングルスレッドクライアントを開発しました。 テストは問題なく合格しましたが、ベンチマークは、シングルスレッドクライアントの内部構造で
ConcurrentModificationException
が発生することがありました。
クライアントの非同期は
CompletableFuture
を使用して実装され、クライアント内のすべての操作は1つのスレッド(以降、コード
singleThreadExecutor
)で実行されました。
ユーザーが使用できる
get
メソッドを使用したクライアントコードのフラグメント:
// private final Set<CompletableFuture> pendingFutures = Collections.newSetFromMap(new IdentityHashMap<>()); public CompletableFuture<String> get(String key) { CompletableFuture<String> future = new CompletableFuture<>(); // singleThreadExecutor.execute(() -> { // future pendingFutures.add(future); future.whenComplete((v, e) -> { // future pendingFutures.remove(future); }); // // future.complete(data); singleThreadExecutor }); return future; }
これは行われるべきではないことが判明しました。
CompletableFuture
のjavadocを注意深く読んでいれば、おそらくこれについては以前に知っていただろう。
javadocを表示
非非同期メソッドの依存する完了のために提供されるアクションは、現在のCompletableFutureを完了するスレッドによって、または完了メソッドのその他の呼び出し元によって実行されます。
このアーキテクチャを使用する場合、
CompletableFuture
すべてのコールバックは、
CompletableFuture
と同じスレッドで呼び出される必要があります。
上記のコードによると、それは起こっているようです。 ただし、ベンチマークは、同じクライアントスレッド(
singleThreadExecutor
)の
pendingFutures
を
pendingFutures
するコードの
ConcurrentModificationException
で終了する場合がありました。
実際、
future.whenComplete
(
future.whenComplete
を呼び出す)に渡されるコールバックは、完全に異なるスレッドで実行される場合があります。 むしろ、私のクライアントが使用するアプリケーションのスレッドで:
Client client = new Client("127.0.0.1", 8080); CompletableFuture<String> result = client.get(key); result.thenAccept(data -> { System.out.println(data); });
このアプリケーションで
result.thenAccept
を呼び出すと、クライアントコード自体の内部に追加された将来のコールバックの残りを呼び出すことがあります。
問題を簡単な例で見てみましょう。
Thread mainThread = Thread.currentThread(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); future.complete(null);
コールバックはcompleteメソッドと同じスレッドで実行されるため、このようなコードは常に
true
を表示し
true
。
ただし、他のスレッドから
CompletableFuture
への呼び出しが少なくとも1つある場合、動作が変わる可能性があります。
// Thread mainThread = Thread.currentThread(); // Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread) }); // callback future executor.execute(() -> { future.thenRun(() -> { //nop }); }); // future future.complete(null);
そのようなコードは時々
false
返すことがあり
false
。
実際には、同じ未来で
thenRun
を呼び出し
thenRun
が、2番目のスレッドで、最初の
thenRun
コールバックが発生する可能性があります。 この場合、最初の
thenRun
コールバックは2番目のスレッドで呼び出されます。
これは、
future.complete(null)
が実行を開始した時点で発生しますが、まだコールバックを呼び出すことができず、
thenRun
2番目のスレッドで
thenRun
れました。
問題は簡単に解決されます。
// Thread mainThread = Thread.currentThread(); // Executor executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> secondThreadFuture = future.thenRun(() -> { System.out.println(Thread.currentThread() == mainThread); }); // callback future executor.execute(() -> { secondThreadFuture.thenRun(() -> { //nop }); }); // future future.complete(null);
最初のfutureの結果に依存するsecondThreadFutureを追加しました。 そして、2番目のスレッドで
thenRun
を呼び出しても、元のfutureでコールバックがトリガーされる可能性はありません。
ユーザー定義のスレッドでのコールバックの呼び出しを保証するために、
CompletableFuture
は、
thenRunAsync
に渡す必要のある
thenRunAsync
などの非同期メソッドの実装があります。 ただし、メソッドの非同期バージョンは、通常のバージョンよりも実行速度が遅くなる場合があります。 したがって、私はそれらを再び使用したくありませんでした。
おわりに
私が自分で
CompletableFuture
結論:複数のスレッドで1つの
CompletableFuture
オブジェクトを使用しないで
CompletableFuture
オブジェクトのすべてのコールバックが特定のスレッドで実行されることを確認する必要がある場合。 また、1つのCompletableFutureで複数のストリームを使用する必要がある場合、元の
CompletableFuture
ではなく、元の
CompletableFuture
に依存する新しいストリームに転送するだけで十分です。 たとえば、次のように:
CompletableFuture<Void> secondThreadFuture = firstThreadFuture.whenComplete((v, e) -> { //nop });