Java 8、RxJava、Reactorを比較する

翻訳者から:

わずかな追加と修正を加えて、適応した翻訳を準備しました。 私は元の記事のややプロパガンダのスタイルを保持しましたが、それ自体、その中の情報は興味深いので、それにもかかわらず、それを翻訳することにしました。

人々はしばしば私に尋ねます:

Streams、CompletableFutures、Optionalsで同じことができるのに、なぜRxJavaまたはReactorを使用する必要があるのですか?







画像







実際、問題は、ほとんどの場合、単純なタスクを処理しており、これらのライブラリは本当に必要ないということです。 しかし、事態が複雑になると、,いコードを書かなければなりません。 その後、このコードはますます複雑になり、保守が難しくなります。 RxJavaとReactorには、今後何年もあなたのニーズを満たす多くの便利な機能があります。







これらのライブラリと標準のJava機能の違いを理解するのに役立つ8つの基準を定義しましょう。







  1. コンポーザブル
  2. レイジー(遅延/レイジー)
  3. 再利用可能
  4. 非同期
  5. キャッシュ可能
  6. プッシュまたはプル(受信者または受信者)
  7. 背圧
  8. 演算子融合


そして、比較するクラスを選択しましょう。







  1. CompletableFuture
  2. ストリーム
  3. オプショナル
  4. 観測可能(RxJava 1)
  5. 観測可能(RxJava 2)
  6. 流動性(RxJava 2)
  7. フラックス(リアクターコア)


準備はいい? 集まって運転した!







コンポーザブル



これらのクラスはすべて構成可能であり、機能的に考えることができます( 著者タイプミスは修正されます-約 。 このために私たちは彼らを愛しています。







CompletableFuture-多くの.then*()



メソッドにより、何もまたは単一の+スロー可能値がステージ間で渡されるチェーンを構築できます。







ストリーム -入力データを変換できる連結演算子の束。 ステージからステージにN値を渡すことができます。







オプション -中間演算子のペア: .map()



.flatMap()



.filter()









Observable、Flowable、Flux - Streamに似ています







怠け者



CompletableFutureは、非同期の結果を単純に保存するため、レイジーではありません。 このようなオブジェクトは、すでに開始されている作業を表すために作成されます。 ( 数値の調整は修正されます-約。 )彼らは仕事について何も知りませんが、結果はわかっています。 したがって、上流に移動してチェーンの実行を上から下に開始する方法はありません。 CompletableFuture



値が設定されると、次のステップが開始されCompletableFuture





結論は正しいが、理由は議論の余地がある。実際、 CompletableFuture



ない。なぜなら、結果を求める前にその値の検索と設定が始まるからである-およそper







ストリーム -すべての中間操作は遅延します。 すべての最終操作により、コンピューティングプロセスが開始されます。







オプション -遅延ではなく、すべての操作がすぐに実行されます。







Observable、Flowable、Flux-サブスクライバー(サブスクライバー)がいるまで何も起こりません。







再利用可能



CompletableFuture-値の単なるラッパーであるため、再利用可能です。 ただし、このラッパーは可変であるため、慎重に使用する必要があります。 誰も.obtrude*()



を呼び出さないことが確実な場合、これは安全です。







ストリーム -再利用不可。 JavaDocで述べられているように







ストリーム(中間または最終)の操作は1回だけ実行する必要があります。 スレッドの実装は、スレッドが再利用されていることを検出した場合、IllegalStateExceptionをスローする場合があります。 ただし、ストリームの一部の操作は、Streamクラスの新しいオブジェクトではなく、受信者に戻る場合があるため、再利用を常に検出できるとは限りません。

Optionalは不変であり、すべての作業がすぐに行われるため、 Optionalは完全に再利用可能です。







観察可能、流動可能、フラックス -再利用可能に設計。 すべてのステージは、サブスクライバーが存在する場合にのみ、開始点から実行され始めます。







非同期



CompletableFuture-まあ、このクラスのポイントは、操作を非同期にバインドすることです。 CompletableFuture



は、一部のExecutor



関連付けられた作業を表します。 タスクを作成するときにExecutor



を明示的に指定しない限り、通常のForkJoinPool



ます。 このプールはForkJoinPool.commonPool()



を使用して取得できます。デフォルトでは、システム内のハードウェアスレッドと同じ数のスレッド(通常はコアの数、コアがハイパースレッディングをサポートしている場合は2倍)を作成します。 ただし、JVMパラメーターを使用してこのプールのスレッド数を設定できます

-Djava.util.concurrent.ForkJoinPool.common.parallelism=?





または、ワークステップを作成するたびに新しいExecutor



使用します。







ストリーム -非同期処理の可能性はありませんが、計算を並列に実行して、並列化されたストリームstream.parallel()



作成できます。







オプション -いいえ、それは単なるコンテナです。







Observable、Flowable、Flux-非同期システムの構築を目的としていますが、デフォルトでは同期です。 observeOn



observeOn



使用すると、サブスクリプションの登録と通知の受信を制御できます(つまり、オブザーバーでonNext



/ OnError



/ OnCompleted



をトリガーするスレッド)。







subscribeOn



Observable.create



を実行するScheduler



を決定Observable.create



ます。 自分でcreate



呼び出さない場合でも、内部的に同等のものがあります。 例:







 Observable .fromCallable(() -> { log.info("Reading on thread: " + currentThread().getName()); return readFile("input.txt"); }) .map(text -> { log.info("Map on thread: " + currentThread().getName()); return text.length(); }) .subscribeOn(Schedulers.io()) // <-- setting scheduler .subscribe(value -> { log.info("Result on thread: " + currentThread().getName()); });
      
      





出力:







 Reading file on thread: RxIoScheduler-2 Map on thread: RxIoScheduler-2 Result on thread: RxIoScheduler-2
      
      





一方、 observeOn()



は、 observeOn()



続く後続のステップを呼び出すために使用されるScheduler



制御します。 例:







 Observable .fromCallable(() -> { log.info("Reading on thread: " + currentThread().getName()); return readFile("input.txt"); }) .observeOn(Schedulers.computation()) // <-- setting scheduler .map(text -> { log.info("Map on thread: " + currentThread().getName()); return text.length(); }) .subscribeOn(Schedulers.io()) // <-- setting scheduler .subscribe(value -> { log.info("Result on thread: " + currentThread().getName()); });
      
      





出力:







 Reading file on thread: RxIoScheduler-2 Map on thread: RxComputationScheduler-1 Result on thread: RxComputationScheduler-1
      
      





キャッシュ可能



再利用とキャッシュの違いは何ですか? チェーンA



、チェーンB = A + O



およびC = A + O



を作成するために2回再利用するとしますC = A + O









B



C



が成功した場合、クラスは再利用可能です。

B



C



が成功し、チェーンA



各ステージが1回だけ呼び出されると、クラスがキャッシュされます。 キャッシュするには、クラスが再利用可能でなければなりません。







CompletableFutureは、再利用性と同じ答えです。







ストリーム -最終ステートメントが呼び出されるまで、中間結果をキャッシュする方法はありません。







すべての作業はすぐに行われるため、 オプションは「キャッシュ」です。







Observable、Flowable、Flux-デフォルトではキャッシュされません。 ただし、 .cache()



呼び出してA



キャッシュ可能にできます。







 Observable<Integer> work = Observable.fromCallable(() -> { System.out.println("Doing some work"); return 10; }); work.subscribe(System.out::println); work.map(i -> i * 2).subscribe(System.out::println);
      
      





出力:







 Doing some work 10 Doing some work 20
      
      





.cache()









 Observable<Integer> work = Observable.fromCallable(() -> { System.out.println("Doing some work"); return 10; }).cache(); // <- apply caching work.subscribe(System.out::println); work.map(i -> i * 2).subscribe(System.out::println);
      
      





出力:







 Doing some work 10 20
      
      





プッシュまたはプル



プルの原則に関するストリームおよびオプションの作業。 結果は、さまざまなメソッド( .get()



.collect()



など)を呼び出すことによってチェーンから取得されます。 プルは、多くの場合、ブロッキング、同期実行に関連付けられます。 メソッドを呼び出すと、スレッドはデータの到着を待機し始めます。 それまで、スレッドはブロックされます。







CompletableFuture、Observable、Flowable、FluxはPushの原則に基づいて動作します。 チェーンにサブスクライブし、処理が必要になると通知されます。 プッシュは、多くの場合、非ブロッキング非同期実行に関連付けられています。 チェーンがスレッドで実行されている間は何でもできます。 実行するコードについては既に説明しているため、次のステップで通知によりこのコードの実行が開始されます。







背圧



流れを抑えるためには、プッシュの原理に基づいてチェーンを構築する必要があります。

スレッドの封じ込めは、非同期ステップの一部が値を十分に速く処理できず、チェーンを上に移動する方法が必要な場合に、それらを遅くするように要求するチェーン内の状況です。 容認できない状況は、データが多すぎるために何らかの段階で拒否が発生する場合です( 著者の言葉遣いのあいまいさが保持されます-約Per。 )。







画像









Observable(RxJava 1)、Flowable、Flux-この問題を解決します。 主な戦略は次のとおりです。









Observable(RxJava 2) -この問題を解決しません。 多くのRxJava 1ユーザーは、不合理なイベントにObservable



を使用したか、戦略を使用しなかったため、予期しない例外が発生しました。 したがって、 RxJava 2では 、保持された( Flowable )クラスと抑制されていない( Observable



)クラスが明確に分離されています。







演算子融合



アイデアは、ライフサイクル全体のさまざまな時点でチェーンを変更して、ライブラリアーキテクチャによって作成される複雑さを軽減することです。 これらの最適化はすべて内部で行われるため、エンドユーザーにはすべてが明確になります。







RxJava 2Reactorのみが演算子のマージをサポートしますが、少し異なります。 一般に、最適化には2つのタイプがあります。









画像









画像







...サブスクライバーは親Observableから値を要求できます。







画像







詳細については、 パート1パート2を参照してください。







おわりに



画像比較







Stream



CompletableFuture



およびOptional



は、特定の問題を解決するために作成されました。 そして、彼らはこれらの問題を解決するのが得意です。 ニーズを満たすのに十分であれば、先に進みます。







ただし、問題ごとに複雑さが異なり、その一部には新しいアプローチが必要です。 RxJava&Reactorは、そのような問題を解決するように設計されていないツールを使用して「ハック」を作成する代わりに、宣言的なスタイルで問題を解決するのに役立つ汎用ツールです。








All Articles