JavaとProject Reactor







みなさんこんにちは! 私の名前はリョーカで、FunCorpのバックエンド開発者として働いています。 今日は、リアクティブプログラミング、Reactorライブラリ、およびWebについて少し説明します。







リアクティブプログラミングはしばしば「言及」されますが、(記事の著者のように)それでもそれが何であるかわからない場合は、快適になって、一緒に考えてみてください。







リアクティブプログラミングとは何ですか?



リアクティブプログラミングは、非同期データストリームの管理です。 とても簡単です。 私たちはせっかちな人々であり、 あなたのマニフェストのすべてを詳細に掘り下げることはしませんが、それは価値があるでしょう。







そして、ウェブはどこにありますか?



うわさによると、HTTPサーバーからデータベースドライバーで終わるReactive Manifestoのすべての標準に従って、リアクティブにシステムを構築すると、再来する可能性があります。 さて、または少なくとも高品質のバックエンドを構築します。







これは、もちろん、簡単なguです。 ただし、ユーザーケースが複数のリクエストを処理し、必ずしも高速ではない場合、サーブレットコンテナが対応できなくなった場合は、リアクティブの美しい世界にようこそ!







128の連続した並列リクエストがある場合、サーブレットコンテナはおそらくジョブに適したツールではありません。

そして、 Nettyでない場合は、リアクティブに何を書くべきですか? 裸のNettyでバックエンドを書くのは骨が折れることは注目に値しますが、抽象化を行うと便利です。







Nettyに適したサーバー抽象化はそれほど多くないため、Pivo​​talのスタッフは、 Spring Boot 2でNettyのサポートを追加しました。 2018年3月1日に、これらすべてが開始されました 。 非常に満足させるために、彼らはWebFluxモジュールを作成しました。これは、 Spring MVCに代わるものであり、Webサービスを記述するための事後対応的なアプローチです。







WebFluxは自分自身をマイクロフレームワーク(マイクロフレームワークとSpring、haha)として位置付け、これらの(私たちの)ファッショナブルなマイクロサービスに適合することを約束し、APIを機能的なスタイルで提示し、 すでにHabréで言及されています 。 詳細(Spring MVCとの違いを含む)は、 ここにあります 。 しかし、今日は何か他のものについて。 WebFluxは、Reactorライブラリに基づいています。 彼女について話しましょう。







Reactorは、Pivo​​talが開発したリアクティブ(突然)オープンソースプラットフォームです。 私は、 この素晴らしい図書館の紹介を無料で(コメント付きで)語り直すことにしました。







行こう







ブロックコード(小さなコード用)



Javaコードは通常ブロックしています。 たとえば、HTTP経由の呼び出しやデータベースへのクエリは、サードパーティのサービスが応答するまで現在のスレッドをハングさせます。 サービスが許容可能な時間を担当している場合、これは通常の方法です。 それ以外の場合、このケースはボトルネックになります。 並列化を余儀なくされ、同じブロッキングコードを実行するスレッドをさらに実行します。 その過程で、競合と競争力の問題を解決する必要があります。







特にI / Oが原因で頻繁にブロックされる(そして、多くのモバイルクライアントがある場合、 I / Oがまったく高速はない )ため、多数のスレッドがデータを待機し、コンテキストの切り替えなどに貴重なリソースを費やしています。







並列化は、すべての問題を解決する魔法の杖ではありません。 これは、オーバーヘッドを伴う複雑なツールです。







非同期&&ノンブロッキング



これらの用語は見つけやすく、理解しにくく、忘れることができません。 しかし、それらは反応性に関してしばしば現れるので、それらを理解してみましょう。







上記のテキストから、ブロッキングコードがすべての原因であると結論付けることができます。 OK、ノンブロッキングを書き始めましょう。 これはどういう意味ですか? 結果を提供する準備がまだ整っていない場合は、それを待つ代わりに、後でリクエストを繰り返すように要求するなど、何らかのエラーを発生させます。 もちろんクールですが、この間違いで何をしますか? したがって、後で答えに応答するために非同期処理を取得します。すべて準備ができました!







非同期でノンブロッキングのコードを書く必要がありますが、すべてうまくいきますか? いいえ、彼はしません。 しかし、それは人生を楽にすることができます。 これを行うために、親切で賢い人々があらゆる種類の仕様(リアクティブな仕様を含む)を発明し、これらの仕様を尊重するライブラリーを見つけました。







それで、リアクター。 非常に短い場合



実際、Reactor(少なくともコア部分)は、 Reactive Streams仕様の実装であり、 ReactiveX演算子の一部です。 しかし、それについては後で。







RxJavaに精通している、または聞いている場合、ReactorはRxJavaのアプローチと哲学を共有していますが、多くのセマンティックの違いがあります(RxJavaとAndroid開発の機能との後方互換性のために大きくなります)。







Javaのリアクティブストリームとは何ですか?



失礼な場合、 reactive-streams-jvmライブラリに表示される4つのインターフェイスがあります。









それらの正確なコピーは、Flowクラス9に存在します。







さらに失礼な場合、彼らはすべて次の要件を考え出す:









JDK 9のFlowクラスのコードを見てみましょう(簡潔にするためJavadocコメントは削除されています)。







public final class Flow { public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); } public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } public static interface Subscription { public void request(long n); public void cancel(); } public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { } }
      
      





これまでのところ、これはすべてJDKレベルの反応性サポートです。 インキュベーターモジュールのどこかで、HTTP / 2クライアントが成熟しており、Flowがアクティブに使用されています。 JDK 9内で他の用途は見つかりませんでした。







統合



Reactorは、 CompletableFuture、Stream、Durationなど、お気に入りのJava 8 Pribludaに統合されています IPCモジュールをサポートしますAkkaおよびRxJava用のアダプターテストモジュール( テストの作成用)、および追加 (ユーティリティクラス)があります。







Redisファンの場合、 レタス/ redissonクライアントには、ReactorをサポートするリアクティブAPIがあります。

MongoDBのファンには、Reactive Streamsを実装する公式のジェットドライバーがあります。これが、Reactorがそれを簡単に選択できる理由です。







さて、どのようにすべてを始めましたか?



これらはすべてJDK8以降で実行できます。 ただし、Androidと自分のもの(minSdk <26)を使用している場合は、RxJava 2を確認することをお勧めします。







Mavenがある場合
 <dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> </dependencies>
      
      





「これら」の場合
 plugins { id "io.spring.dependency-management" version "1.0.1.RELEASE" } dependencyManagement { imports { mavenBom "io.projectreactor:reactor-bom:Bismuth-RELEASE" } } dependencies { compile 'io.projectreactor:reactor-core' }
      
      





BOMは、Reactorのさまざまな部分の間の互換性を高めるために使用されます。 GradleにはネイティブのBOMサポートがないため、プラグインが必要です







ReactorはKotlinをサポートしています









そのため、非同期の非ブロッキングコードを記述する必要があります。 言い換えると、現在の実行スレッドがロックして待機するのではなく、有用なものに切り替えて、非同期処理が完了したときに現在のプロセスに戻ることを許可します。







Javaと呼ばれる日当たりの良い島では、これには主に2つの方法があります。









これらはよく知られたツールですが、ある時点で不十分になります。







コールバックの問題



コールバックは作成が難しく、すぐに「コールバック地獄」と呼ばれるハッシュに変わります。







例を見てみましょう



ユーザーに上位5つのミームを表示する必要があります。そうでない場合は、オファーサービスに移動して、そこから5つのミームを取得します。







合計で3つのサービスが関係しています。1つ目はユーザーのお気に入りのミームのIDを提供し、2つ目はミーム自体を取得し、3つ目はお気に入りのミームがない場合にオファーを提供します。







 //    userService.getFavoriteMemes(userId, new Callback<>() { //  public void onSuccess(List<String> userFavoriteMemes) { if (userFavoriteMemes.isEmpty()) { //   ,    suggestionService.getSuggestedMemes(new Callback<>() { public void onSuccess(List<Meme> suggestedMemes) { uiUtils.submitOnUiThread(() -> { suggestedMemes.stream().limit(5).forEach(meme -> { //   UI })); } } public void onError(Throwable error) { uiUtils.errorPopup(error); //   UI } }); } else { //   userFavoriteMemes.stream() .limit(5) //  5  .forEach(favId -> memeService.getMemes(favId, new Callback<Favorite>() { //  public void onSuccess(Meme loadedMeme) { uiUtils.submitOnUiThread(() -> { //   UI }); } public void onError(Throwable error) { uiUtils.errorPopup(error); } })); } } public void onError(Throwable error) { uiUtils.errorPopup(error); } });
      
      





どうやらクールではないようです。







それでは、Reactorでどのように行うかを見てみましょう。



 //   userService.getFavoriteMemes(userId) .flatMap(memeService.getMemes) //   ID // ,      .switchIfEmpty(suggestionService.getSuggestedMemes()) .take(5) //     5  .publishOn(UiUtils.uiThreadScheduler()) //   UI- .subscribe(favorites -> { uiList.show(favorites); //  UI- }, UiUtils::errorPopup); //   
      
      





Reaction.jpeg

微妙な英語のユーモア







しかし、800ミリ秒のタイムアウトで突然落ち、キャッシュされたデータをロードしたい場合はどうでしょうか?







 userService.getFavoriteMemes(userId) .timeout(Duration.ofMillis(800)) // - //   .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(memeService.getMemes) //   ID .switchIfEmpty(suggestionService.getSuggestedMemes()) .take(5) //   5  .publishOn(UiUtils.uiThreadScheduler()) .subscribe(favorites -> { uiList.show(favorites); }, UiUtils::errorPopup);
      
      





Reactorでは、コールチェーンにタイムアウトステートメントを追加するだけです。 タイムアウトは例外をスローします。 onErrorResume演算子を使用して、エラーが発生した場合にデータを取得する代替(フォールバック)ソースを指定します。







20のコールバック!8、しかしCompletableFutureがあります



名前と統計を要求し、それらをキーと値のペアの形式で結合するIDのリストがあります。これはすべて非同期です。







 CompletableFuture<List<String>> ids = ifhIds(); //   CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip = l.stream().map(i -> { //  () CompletableFuture<String> nameTask = ifhName(i); //  () CompletableFuture<Integer> statTask = ifhStat(i); //  return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); //  CompletableFuture List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray( new CompletableFuture[combinationList.size()]); //   Feature   allOf CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); //  ,   ,  allOf  Feauture<Void> return allDone.thenApply(v -> combinationList.stream() .map(CompletableFuture::join) .collect(Collectors.toList())); }); List<String> results = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" );
      
      





Reactorでこれを行うにはどうすればよいですか?







 Flux<String> ids = ifhrIds(); Flux<String> combinations = ids.flatMap(id -> { Mono<String> nameTask = ifhrName(id); Mono<Integer> statTask = ifhrStat(id); //zipWith-   return nameTask.zipWith( statTask, (name, stat) -> "Name " + name + " has stats " + stat ); }); Mono<List<String>> result = combinations.collectList(); List<String> results = result.block(); // ..   ,    assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" );
      
      





その結果、構成可能で読み取り可能な高レベルAPI( 実際には、同じスタイルで非同期コードを記述する方法が必要だったため、最初はReactorを使用しました )、およびその他の利点:遅延実行、backPressure管理、さまざまなスケジューラ(スケジューラー)および統合。







さて、他のFluxとMonoは何ですか?



FluxとMonoは、2つの主要なReactorデータ構造です。







フラックス



画像







FluxPublisherインターフェースの実装であり、0からN個の要素のシーケンスであり、終了する場合があります(エラーがある場合を含む)。







Fluxシーケンスには3つの有効な値があります:シーケンスオブジェクト、完了信号、またはエラー信号(それぞれonNextonComplete、およびonErrorメソッドの呼び出し)。







3つの値はそれぞれオプションです。 たとえば、Fluxは無限の空のシーケンスにすることができます(メソッドは呼び出されません)。 または、最後の空のシーケンス(onCompleteのみが呼び出されます)。 または、値の無限シーケンス(onNextのみが呼び出されます)。 等







たとえば、 Flux.interval()は、 Flux <Long>タイプのティックの無限シーケンスを返します







デザインを見る:







 Flux .interval(Duration.ofSeconds(1)) .doOnEach(signal -> logger.info("{}", signal.get())) .blockLast();
      
      





次のテキストが表示されます。







 12:24:42.698 [parallel-1] INFO - 0 12:24:43.697 [parallel-1] INFO - 1 12:24:44.698 [parallel-1] INFO - 2 12:24:45.698 [parallel-1] INFO - 3 12:24:46.698 [parallel-1] INFO - 4 12:24:47.699 [parallel-1] INFO - 5 12:24:48.696 [parallel-1] INFO - 6 12:24:49.696 [parallel-1] INFO - 7 12:24:50.698 [parallel-1] INFO - 8 12:24:51.699 [parallel-1] INFO - 9 12:24:52.699 [parallel-1] INFO - 10
      
      





doOnEach(Consumer <T>)メソッドは、シーケンス内の各要素に副作用を適用します。これは、ロギングに便利です。







blockLast()に注意してください :as シーケンスは無限であり、呼び出しが発生するフローは無限に待機します。







RxJavaに精通している場合、FluxはObservableに非常に似ています。







モノ



MonoはPublisherインターフェースの実装であり、何らかの非同期要素またはその不在Mono.empty()です。







モノ







Fluxとは異なり、Monoは1つしかアイテムを返せません。 Fluxと同様に、 onComplete()およびonError()の呼び出しはオプションです。







Monoは、「Runnableに似た」戻り結果なしで、「完了して忘れた」スタイルの非同期タスクとしても使用できます。 これを行うには、Mono <Void>として宣言し、空の演算子を使用します。







 Mono<Void> asyncCall = Mono.fromRunnable(() -> { // -  // Mono.empty()   }); asyncCall.subscribe();
      
      





RxJavaに精通しているなら、Single + MaybeからカクテルとしてMonoを取りましょう







この分離はなぜですか?



FluxとMonoに分離すると、リアクティブAPIのセマンティクスが向上し、表現力は十分になりますが、冗長ではありません。







理想的には、戻り値を見るだけで、メソッドの動作を理解できます:ある種の呼び出し(Mono <Void>)、要求応答(Mono <T>)、またはデータストリームを返す(Flux <T>)。







FluxとMonoはセマンティクスを使用し、互いに流れます。 FluxにはMono <T>を返す単一の()メソッドがあり、MonoにはすでにFlux <T>を返すconcatWith(Mono <T>)メソッドがあります。







また、独自の演算子もあります。 シーケンス内のN個の要素(Flux)でのみ意味を持つものもあれば、逆に1つの値のみに関連するものもあります。 たとえば、Monoにはor(Mono <T>)があり、Fluxにはlimit / takeステートメントがあります。







その他の例



Flux / Monoを作成する最も簡単な方法は、これらのクラスで提示される多くのファクトリメソッドの1つを使用することです。







準備値でフラックスを初期化する
 Flux<String> sequence = Flux.just("foo", "bar", "foobar");
      
      





反復可能から初期化できます
 List<String> iterable = Arrays.asList("foo", "bar", "foobar"); Flux<String> sequence = Flux.fromIterable(iterable);
      
      





サードパーティのパブリッシャーからできます
 Publisher<String> publisher = redisson.getKeys().getKeys(); Flux<String> from = Flux.from(publisher);
      
      





さて、そうすることができます
 Mono<String> noData = Mono.empty(); // Mono Mono<String> data = Mono.just("foo"); // "foo" //  // - 5,,7 Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
      
      





FluxとMonoは怠け者です。 ある種の処理を開始し、MonoとFluxにあるデータを活用するには、 .subscribe()を使用してそれらをサブスクライブする必要があります。







サブスクライブは、遅延動作を保証すると同時に、データで何を行う必要があるかを示す方法です。 サブスクライブメソッドは、Java 8のラムダ式をパラメーターとして使用します。







購読する方法
 subscribe(); // .. // ..   -     subscribe(Consumer<? super T> consumer); // ..   -    subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); // ..   -   subscribe( Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer );
      
      





出力1、2、3
 Flux<Integer> ints = Flux.range(1, 3); ints.subscribe(i -> System.out.println(i));
      
      





次を出力します。







 1 2 3
      
      





出力1、2、3およびエラー
 Flux<Integer> ints = Flux.range(1, 4) .map(i -> { if (i <= 3) { return i; } throw new RuntimeException("Got to 4"); }); ints.subscribe( i -> System.out.println(i), error -> System.err.println("Error: " + error) );
      
      





次を出力します。







 1 2 3 Error: java.lang.RuntimeException: Got to 4
      
      





出力1、2、3、4、および完了
 Flux<Integer> ints = Flux.range(1, 4); ints.subscribe(i -> System.out.println(i), error -> System.err.println("Error " + error), () -> {System.out.println("Done"); });
      
      





次を出力します。







 1 2 3 4 Done
      
      





デフォルトでは、これらはすべて現在のスレッドで機能します。 実行のフローは、たとえば.publishOn()演算子を使用して変更し、そこに関心のあるスケジューラを渡すことができます(SchedulerはExecutorServiceのようなひねりです)。







実行の流れを変える
 Flux<Integer> sequence = Flux.range(0, 100).publishOn(Schedulers.single()); // onNext, onComplete  onError     single. sequence.subscribe(n -> { System.out.println("n = " + n); System.out.println("Thread.currentThread() = " + Thread.currentThread()); }); sequence.blockLast();
      
      





以下を出力します(100回):







 n = 0 Thread.currentThread() = Thread[single-1,5,main]
      
      





どのような結論を導き出すことができますか?





興味深いレビューが判明しました(いいえ)。 あなたが興味を持っていた場合-書いて、私たちは何が起こっているかを掘り下げます。 気軽にコメントしてください!







ご清聴ありがとうございました!







Reactorのドキュメントに基づく







この文書のコピーは、印刷物または電子的に配布されるかどうかに関係なく、そのようなコピーに料金を請求せず、さらに各コピーにこの著作権表示が含まれている場合、ユーザー自身の使用および他者への配布のために作成することができます。

私はここにいませんが、もっとふさわしい男性がいます。 そして貢献者/メンテナー。








All Articles