Springでのリアクティブプログラミングの紹介

こんにちは、Habr!



今週、印刷会社からの新しいSpring 5の本を期待しています。









Spring 5の興味深い機能の中で、リアクティブプログラミングは特に言及する価値があります。このフレームワークでの実装は、Matt Raibleの提案記事で簡単に説明されています。 前述の本では、リアクティブパターンについては第11章で説明しています。



マットは昨年夏にリリースされた JavaとSpringに関するもう1つの素晴らしい本「 Java in the Cloud 」の著者であるJosh Longの共著です。



リアクティブプログラミングは、高負荷に耐えるシステムを構築する方法です。 サーバーが非ブロッキングであり、クライアントプロセスが応答を待つ必要がないため、巨大なトラフィックの処理はもはや問題ではありません。 クライアントは、サーバー上でプログラムがどのように実行されるかを直接観察して、それと同期することはできません。 APIが要求の処理が困難であると判断した場合でも、妥当な応答を提供する必要があります。 制御されない方法でメッセージを拒否および破棄しないでください。 上位コンポーネントに負荷の下で動作していることを通知し、この負荷から部分的に解放できるようにする必要があります。 この手法は、リアクティブプログラミングの重要な側面であるバックプレッシャーと呼ばれます。



この記事はJosh Longと共同執筆しました。 JoshはJavaチャンピオンであり、Spring Developer Advocateであり、一般的にはPivotalで働くグローバルな人です。 私はSpringと長い間仕事をしてきましたが、Spring Bootを見せてくれたのはジョシュでした。それはベルギーのDevoxxカンファレンスでのことでした。 それ以来、私たちは強い友人になり、Javaが好きで、クールなアプリケーションを作成しています。



リアクティブプログラミングまたはI / O、I / O、私たちは仕事に行きます...



リアクティブプログラミングは、非同期I / Oをアクティブに使用するソフトウェアを作成する方法です。 非同期I / Oは、プログラミングに大きな変化を伴う小さなアイデアです。 アイデア自体は単純です:リソースの非効率的な割り当てで状況を修正し、私たちの介入なしでアイドル状態になっていたリソースを解放し、I / Oの完了を待ちます。 非同期入力/出力は、I / O処理への通常のアプローチを逆にします。クライアントは解放され、新しい通知を待つ他のタスクを実行できます。



同期入出力と非同期入出力の共通点、およびそれらの違いを検討してください。



ソースからデータを読み取る簡単なプログラムを作成します(具体的には、 java.io.File



リンクについて説明しています)。 古き良きjava.io.InputStream



を使用する実装から始めましょう。



例1.ファイルからデータを同期的に読み取る



 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.function.Consumer; @Log4j2 class Synchronous implements Reader { @Override public void read(File file, Consumer<BytesPayload> consumer) throws IOException { try (FileInputStream in = new FileInputStream(file)) { //1 byte[] data = new byte[FileCopyUtils.BUFFER_SIZE]; int res; while ((res = in.read(data, 0, data.length)) != -1) { //2 consumer.accept(BytesPayload.from(data, res)); //3 } } } }
      
      





  1. 通常のjava.io.File



    で読み取るためのファイルを提供します
  2. 一度に1行ずつソースから結果を取得します...
  3. Consumer<BytesPayloadgt;



    を受け入れるためにこのコードを書きConsumer<BytesPayloadgt;



    新しいデータが到着したときに呼び出されます


簡単です、あなたは何と言いますか? このコードを実行すると、ログ出力(各行の左側)に表示され、すべてのアクションが単一のスレッドで発生することを示します。

ここでは、ソースで取得したデータからバイトを抽出します(この場合、 java.io.FileInputStream



から継承したjava.io.InputStream



サブクラスについて話します)。 この例の何が問題になっていますか? この場合、ファイルシステムにあるデータを指すInputStreamを使用します。 ファイルが存在し、ハードドライブが機能している場合、このコードは期待どおりに機能します。



しかし、 File



ではなくネットワークソケットからデータを読み取り、 InputStream



別の実装を使用するとどうなりますか? 心配することは何もありません! もちろん、ネットワーク速度が無限に速い場合、心配することはまったくありません。 そして、これと他のノードとの間のネットワークチャネルに障害が発生しない場合。 これらの条件が満たされている場合、コードは完全に機能します。



しかし、ネットワークがスローダウンまたはレイダウンするとどうなりますか? この場合、操作in.read(…)



までの期間を増やすことを意味しin.read(…)



。 実際、彼女はまったく戻ってこないかもしれません! これは、データの読み取り元のストリームで何か他のことをしようとすると問題になります。 もちろん、いつでも別のストリームを作成して、そこからデータを読み取ることができます。 これは特定のポイントまで実行できますが、最終的には、さらにスケーリングするためにスレッドを追加するだけでは不十分になるという制限に達します。 マシン上にあるコアの数を超える真の競争はありません。 行き止まり! この場合、追加のフローのためにのみ入出力処理(ここでは読み取りを意味します)を増やすことができますが、ここでは遅かれ早かれ限界に達します。



この例では、主な作品は読書です。他の面ではほとんど何も起こりません。 I / Oに依存しています。 非同期ソリューションが、フローの独占を部分的に克服するのにどのように役立つかを検討してください。



例2.ファイルからデータを非同期で読み取る



 package com.example.io; import lombok.extern.log4j.Log4j2; import org.springframework.util.FileCopyUtils; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Consumer; @Log4j2 class Asynchronous implements Reader, CompletionHandler<Integer, ByteBuffer> { private int bytesRead; private long position; private AsynchronousFileChannel fileChannel; private Consumer<BytesPayload> consumer; private final ExecutorService executorService = Executors.newFixedThreadPool(10); public void read(File file, Consumer<BytesPayload> c) throws IOException { this.consumer = c; Path path = file.toPath(); // 1 this.fileChannel = AsynchronousFileChannel.open(path, Collections.singleton(StandardOpenOption.READ), this.executorService); //2 ByteBuffer buffer = ByteBuffer.allocate(FileCopyUtils.BUFFER_SIZE); this.fileChannel.read(buffer, position, buffer, this); //3 while (this.bytesRead > 0) { this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } } @Override public void completed(Integer result, ByteBuffer buffer) { //4 this.bytesRead = result; if (this.bytesRead < 0) return; buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); //5 consumer.accept(BytesPayload.from(data, data.length)); buffer.clear(); this.position = this.position + this.bytesRead; this.fileChannel.read(buffer, this.position, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { log.error(exc); } }
      
      





  1. 今回はjava.io.File



    を適合させ、そこからJava NIO java.nio.file.Path



    を作成しJava NIO java.nio.file.Path



  2. 特にChannel



    を作成するとき、必要なデータが表示されたときにCompletionHandler



    ハンドラーを呼び出すために使用されるjava.util.concurrent.ExecutorService



    サービスを指定します
  3. CompletionHandler<Integer, ByteBuffer> (this)



    リンクを渡すことで読み取りを開始します
  4. コールバックで、 ByteBuffer



    からbyte[]



    容量に読み込みますbyte[]



  5. Synchronous



    例と同様に、 byte[]



    データがコンシューマに渡されます。


すぐに予約します。このコードははるかに難しいことがわかりました! ここで非常に多くのことが行われているので、頭がすぐに回転しますが、指摘してください...このコードは、 Java NIO Channel



からデータを読み取り、コールバックを担当する別のスレッドでこのデータを処理します。 したがって、読み取りが開始されたストリームは独占されません。 .read(..)



呼び出した後、ほぼ瞬時に戻り、最終的にデータを自由に使用できるようになっ.read(..)



、コールバックが行われます-すでに別のスレッドで。 .read()



呼び出し間に遅延がある場合、スレッドで実行することで他の事項に進むことができます。 最初のバイトから最後のバイトまでの非同期読み取り操作の期間は、せいぜい同期読み取り操作の期間より長くありません。 通常、非同期操作は無視できるほど長くなります。 ただし、このような追加の困難に取り組むと、フローをより効果的に処理できます。 より多くの作業を行い、有限数のスレッドを持つプールでI / Oを多重化します。



私はクラウドコンピューティング会社で働いています。 水平スケーリングの問題を解決するために、アプリケーションの新しいインスタンスを取得してください! もちろん、ここで私は少し不誠実です。 非同期I / Oは状況を少し複雑にしますが、この例がリアクティブコードの有用性を示していることを願っています:パフォーマンスがI / Oに大きく依存している場合、より多くの要求を処理し、既存のハードウェアでより多くの作業を行うことができます。 パフォーマンスがプロセッサの使用に依存している場合(たとえば、フィボナッチ数の操作、ビットコインのマイニングまたは暗号化について話している場合)、リアクティブプログラミングでは何も得られません。



現在、私たちのほとんどは日常業務でChannel



またはInputStream



実装を使用していません! より高いレベルの抽象化のレベルで問題を考える必要があります。 それは、配列、またはむしろjava.util.Collection



階層のようなものについてです。 java.util.Collection



コレクションは、InputStream上で非常によく表示されます。両方のエンティティは、すべてのデータを一度に、ほぼ瞬時に操作できると想定しています。 ほとんどのInputStreams



からの読み取りを、後でではなく早く終了できることが期待されています。 大量のデータに移行する場合、コレクションタイプは少し不快になります。 潜在的に無限(無制限)なもの(たとえば、Webソケットやサーバーイベント)を処理している場合はどうなりますか? 録音間に遅延がある場合はどうすればよいですか?



この種のデータを記述するためのより良い方法が必要です。 最終的に発生するような非同期イベントについて話している。 Future<T>



またはCompletableFuture<T>



はこの目的に非常に適しているように思えるかもしれませんが、最終的に発生することを1つだけ説明しています。 実際、Javaはこの種のデータを説明するのに適したメタファーを提供していません。 Java 8のIterator



Stream



両方のタイプは無関係かもしれませんが、両方ともプル指向です。 あなた自身が次のエントリを要求します。タイプはコードにコールバックを送信するべきではありません。 このケースでプッシュベースの処理がサポートされていて、スレッドレベルでより多くのことを達成できる場合、APIはスレッド化とスケジューリング制御も提供すると想定されています。 Iterator



実装はスレッド化について何も述べておらず、すべてのJava 8スレッドは同じfork-joinプールを共有しています。



Iterator



Stream



実際にプッシュ処理をサポートしている場合、I / Oのコンテキストで実際に悪化する別の問題が発生します。何らかの逆浸透メカニズムが必要です。 データコンシューマは非同期に処理されるため、データがパイプラインにいつ、どのような量で入るかはわかりません。 次のコールバックで処理する必要があるデータの量はわかりません:1バイトまたは1テラバイト!



InputStream



からデータをInputStream



、処理する準備ができているだけの情報を読み取ることができ、それ以上は読み取りません。 前の例では、固定長の既知のbyte[]



バッファーにデータを読み込みました。 非同期コンテキストでは、処理するデータの量をプロバイダーに伝える方法が必要です。

はい、先生。 確かに何かがここにありません。



欠落している隠phorを検索する



この場合、非同期I / Oの本質を美しく反映し、データの逆転送のためのこのようなメカニズムをサポートし、分散システムの実行フローを制御できるメタファーを探しています。 リアクティブプログラミングでは、クライアントが処理できる負荷を通知する機能を「逆流」と呼びます。



現在、リアクティブプログラミングをサポートするVert.x、Akka Streams、RxJavaの優れたプロジェクトが多数あります。 SpringチームはReactorというプロジェクトも実行しています。 これらのさまざまな標準の間には、 Reactive Streamsイニシアチブ標準で事実上強調されているかなり広い一般的なフィールドがあります。 Reactive Streamsイニシアチブでは、4つのタイプが定義されています。



Publisher<T&gt



インターフェースPublisher<T&gt



; 長期的に到達する可能性のある値を生成します。 Publisher<T&gt



インターフェースPublisher<T&gt



; Subscriber<T>



のタイプT



値を生成します。



例3.リアクティブストリーム: Publisher<T>



インターフェイス




 package org.reactivestreams; public interface Publisher<T> { void subscribe(Subscriber<? super Tgt; s); }
      
      





Subscriber



タイプはPublisher<T>



サブスクライブし、 onNext(T)



メソッドを介してタイプT



新しい値の通知を受け取ります。 エラーが発生すると、そのonError(Throwable)



メソッドがonError(Throwable)



ます。 処理が正常に完了すると、 onComplete



メソッドが呼び出されます。



例4. Jetストリーム: Subscriber<T>



インターフェイス。




 package org.reactivestreams; public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
      
      





Subscriber



最初にPublisher



に接続すると、 Subscriber#onSubscribe



Subscription



を受信します。 サブスクリプSubscription



サブスクリプSubscription



は、おそらく仕様全体の中で最も重要な部分です。 リターンフローを提供するのは彼女です。 サブスクライバーサブスクライバーは、 Subscription#request



メソッドを使用して追加データを要求するか、 Subscription#cancel



メソッドを使用して処理を停止します。



例5.リアクティブストリーム: Subscription<T>



インターフェイス




 package org.reactivestreams; public interface Subscription { public void request(long n); public void cancel(); }
      
      





リアクティブストリームの仕様は、明らかではありますが、別の便利なタイプを提供しますProcessor<A,B>



は、 Subscriber<A>



Publisher<B>



両方を継承する単なるインターフェイスです。



例6. Jetストリーム: Processor<T>



インターフェース




 package org.reactivestreams; public interface Processor<T, R> extends Subscriber&ltT>, Publisher<R> { }
      
      





仕様は実装の要件として位置付けられていません;実際、その目的は相互運用性をサポートするために型を定義することです。 リアクティブフローに関連付けられたタイプの明らかな利点は、Java 9リリースで場所を見つけたということです。さらに、セマンティック的には、 java.util.concurrent.Flow



クラスからのインターフェースに対応する「1対1」です: java.util.concurrent.Flow.Publisher







Reactorに会う



リアクティブストリームのタイプだけでは不十分です。 フィルタリングや変換などの操作をサポートするには、より高次の実装が必要です。 そのため、Reactorプロジェクトは便利です。 Reactive Streams仕様に基づいて構築され、2つのPublisher<T>



スペシャライゼーションを提供します。



まず、 Flux<T>



はゼロ以上の値を生成するPublisher



です。 2番目のMono<T>



Publisher<T>



、ゼロまたは1つの値を生成します。 どちらも値を公開し、それに応じて処理できますが、それらの機能はReactive Streams仕様よりもはるかに広範囲です。 どちらも、値ストリームを処理できる演算子を提供します。 リアクタータイプは適切に構成されます-そのうちの1つの出力は他のタイプの入力として機能し、タイプが他のデータストリームと連携する必要がある場合、 Publisher<T>



インスタンスに依存します。



Mono<T>



Flux<T>



どちらもPublisher<T>



実装しています。 メソッドはPublisher<T>



インスタンスを受け入れ、 Flux<T>



またはMono<T>



返すことをお勧めします。 これは、クライアントが受信するデータの種類を区別するのに役立ちます。



Publisher<T>



が与えられ、このPublisher<T>



ユーザーインターフェイスを表示するように要求されたとします。 CompletableFuture<T>



取得できるので、1つのレコードの詳細を含むページを表示する必要がありますか? または、すべてのエントリがページごとに表示されるリストまたはグリッドで概要ページを表示しますか? 言うのは難しいです。



また、 Flux<T>



Mono<T>



非常に特殊です。 Flux<T>



受信した場合はレビューページを表示し、 Mono<T>



を受信した場合は1つの(または単一ではない)レコードの詳細を含むページを表示する必要があることを知っています。



Reactorは、Pivo​​talによって開始されたオープンソースプロジェクトです。 今、彼は非常に人気があります。 Facebookはジェットエンジンでこれを使用してリモートプロシージャを呼び出し 、RxJavaの作成者Ben Christensenの指導の下でRsocketでも使用します。 Salesforceは、 リアクティブgRPC実装でそれを使用します 。 ReactorはReactive Streamsタイプを実装しているため、これらのタイプをサポートする他のテクノロジー、たとえばNetflixのRxJava 2Lightbendの Akka Streams 、Eclipse FoundationのVert.xプロジェクトとやり取りできます。 RxJava 2のディレクターであるDavid Cairnockは、Pivo​​talと密接に連携してReactorを開発し、プロジェクトをさらに改善しました。 さらに、もちろん、Spring Framework 4.0以降、Spring Frameworkには何らかの形で存在します。



Spring WebFluxによるリアクティブプログラミング



そのすべての有用性について、Reactorは単なる基礎にすぎません。 アプリケーションはデータソースと通信する必要があります。 認証と承認をサポートする必要があります。 Springはこれをすべて提供します。 Reactorが欠落している比phorを提供する場合、Springはすべての人が共通の言語を話すのを助けます。



Spring Framework 5.0は2017年9月にリリースされました。ReactorおよびReactive Streams仕様に基づいています。 Spring WebFluxと呼ばれる新しいリアクティブランタイムおよびコンポーネントモデルがあります。



Spring WebFluxはサーブレットAPIから独立しており、動作する必要はありません。 必要に応じて、サーブレットエンジンの上で使用できるアダプタが付属していますが、これは必須ではありません。 また、Spring WebFluxと呼ばれるまったく新しいNettyベースのランタイムも提供します。 Java 8およびJava EE 7以降で動作するSpring Framework 5は、Spring Data Kay、Spring Security 5、Spring Boot 2、Spring Cloud Finchleyなど、Springエコシステムの多くの基盤として機能します。



All Articles