RxJavaの実際の使用例

RxJavaは、JavaのReactiveX実装-データストリームを非同期に処理するためのライブラリです。 このパターンは、ステロイドで記述されているため、ステロイドで観察できます。 Habréを含むインターネットでは、多くの「RxJavaの紹介」があります。 実際のタスクの例をいくつか示します。 彼らはそれほど複雑ではありませんが、おそらく誰かが自分自身といくつかの類似点を見て、それについて考えるでしょう。



実際には、タスク:



1.単純なクライアントTCP接続。 TCP / IPの上にプロトコルがあります。メッセージを作成し、リモートホストに接続する必要があります。まだ接続していない場合は、メッセージを送信して応答を読み取ります。 さらに、エラー処理、タイムアウトチェック、失敗した場合の再試行。 厳密なパフォーマンス要件はありません。トラフィックは大きくありません。



2.エンジンとセンサーがあります。 スキャンを行う必要があります-エンジンを所定のパスに沿って歩きます:エンジンをポイントに送信し、エンジンが到達するのを待ち、センサーの測定値を取得し、グラフにポイントを表示し(GUIストリーム内)、次のポイントに移動します...



3.スキャン後に取得したデータを処理し(条件付きで長い計算プロセス)、グラフの画像とユーザーが入力したデータ(GUIストリーム)とともにpdfレポート(条件付きで長いI / Oプロセス)に入れる必要があります。



1.単純なTCPクライアント接続



何らかのメッセージングプロトコルがあるとします。 メッセージには、ヘッダー、チェックサムなどが含まれる場合があります。 各メッセージには、サーバーからの応答が必要です。 最も単純な形式では、ソリューションは次のようになります。



public String send(String command) { try { if (!isConnected()) { connect(); } byte[] bytes = command.getBytes(); bytes = addHeader(bytes); sendBytes(bytes); return readAnswer(); } catch (IOException e) { //  } }
      
      





実装の詳細は説明しませんが、簡単に説明します。connect()はjava.net.Socketを作成してサーバーに接続し、sendBytes()はソケット出力ストリームに書き込み、readAnswer()はソケット入力ストリームから読み取ります。 addHeader()に加えて、チェックサム、エンコードなどを追加するメソッドもあります。



このコードの問題は次のとおりです。書き込み/読み取りのブロックと不便なエラー処理-例外の処理方法が明確ではありません。先頭に進むか、ここで何かを実行します(送信を繰り返しますか?)。 RxJavaが解決するのは、これら2つの問題です。 書き直します:



 public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .map(result -> readAnswer()); }
      
      





アプリケーション:



 connection.send("echo 123") .subscribe( answer -> { /* */ }, throwable -> { /* */ } );
      
      





一般に、モナド、一連の演算子、およびいくつかのニュアンスの形でのみ同じことが起こりました。



まず、sendBytes()メソッドがブール値を返すようになりました。 RxJavaはデータストリームで動作し、誰かがデータの代わりにvoidを返すと、ストリームはどういうわけかもはや存在しません。 したがって、返された結果をメソッドに追加する(少なくともtrueを返す)か、mapの代わりにdoOnNextを使用する必要があります。この演算子は受け取ったものと同じものを返します。



第二に、send()メソッドは文字列そのものではなくObservableを返すようになりました。 したがって、別個の応答ハンドラー(または例のようにラムダ)が必要です。 同じことを除いて。 ここでは、彼らが言うように、非同期的に考え始める必要があります。 結果自体の代わりに、いつか結果を提供するオブジェクトを取得し、この結果が受け取るものを提供する必要があります。 これは、このコードがまだブロックしているだけなので、この非同期の考え方は意味がありません。 確かに、Stringのラッパーを作成し、このラッパーを閉じることで結果をモナドから引き出すことができますが、これらは既に関数型プログラミングの原則に違反する汚いハッキングです。



このコードを改善してください。 エラー処理から始めましょう。 RxJavaは、ステートメントで発生する例外をキャッチし、それらをサブスクライバーに渡します。 subscribe()メソッドの2番目の引数はAction1機能インターフェースです-例外の処理を担当します。 一部のメソッドがIOExceptionまたはその他のチェック済み例外をスローするために使用されていた場合、現在は不可能です。 このような例外は、手でキャッチする必要があります。 たとえば、RuntimeExceptionでラップして、さらにRxJavaソリューションを提供します。 ただし、Action1は、通常のtry-catchアプローチと大差ありません。 RxJavaには、doOnError()、onErrorReturn()、onErrorResumeNext()、およびonExceptionResumeNext()というエラー処理演算子があります。 また、通常の再試行()もあります。これはまさにここで必要なことです。 何らかの接続エラーが発生した場合、n回送信を再試行できます。



 public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT); }
      
      





subscribe()に渡された例外ハンドラは、すべての再試行が失敗した場合にのみ呼び出されます。 信頼性のために、ソケットを閉じてリセットを再試行する前に、disconnect()を呼び出します。 そうでない場合、内部のcheckConnection()でisConnected()を呼び出すと、誤検知が発生する可能性があり、繰り返されるすべての試行で再びエラーが発生します。 たとえば、サーバーがタイムアウトによって接続を強制終了した場合、クライアント側のSocket.isConnected()メソッドは引き続きtrueを返します。クライアント側では、ソケットが接続され、すべて正常です。



サーバーが病気になり、クライアントがソケットへの書き込みをブロックされた場合にタイムアウトを追加することもできます。



 public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT); }
      
      





指定した時間内にObservableからアイテムが受信されない場合、タイムアウト演算子は例外をスローします。 そして、我々はすでに例外を処理する方法を知っています。



2番目の問題は、ブロッキング操作が残っているため、GUIスレッドからsend()を呼び出すと、インターフェースがハングする可能性があることです。 これらすべてのアクションが別のスレッドで実行されるように、RxJavaに指示するだけです。



これにはobserveOn()およびsubscribeOn()演算子があります。 多くの人々は、これらの演算子の違いを理解するのに問題があります-このトピックに関する多くの記事とstackoverflowに関する質問があります。 このトピックをもう一度取り上げて、今使用する必要があるものを一緒に考えましょう。 公式文書で彼らが言うことは次のとおりです。



SubscribeOn -Observableが動作するスケジューラを指定します。

ObserveOn-オブザーバーがこのObservableを監視するスケジューラーを指定します。


Observableは、データを提供する人です。 オブザーバーは、データを受信して​​データを処理する人です。 別のスレッドで実行するにはすべてが必要です。 むしろ、最初に別のスレッドでデータを配信するにはObservableが必要です。 また、データは別のスレッドで配信されるため、すべてのオブザーバーは別のスレッドで処理します。 これは、定義によりsubscribeOn()です。最初に作成したObservableのスケジューラーを定義します。



 public Observable<String> send(String command) { return Observable.just(command) .doOnNext(cmd -> checkConnection()) .map(cmd -> cmd.getBytes()) .map(bytes -> addHeader(bytes)) .map(bytes -> sendBytes(bytes)) .timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS) .map(result -> readAnswer()) .doOnError(throwable -> disconnect()) .retry(MAX_RETRY_COUNT) .subscribeOn(Schedulers.io()); }
      
      





これで、ステートメントはioスケジューラーが提供するスレッドで実行されます。 応答を待たずに連続してsend()を数回呼び出すと、同期の問題が発生する場合があります。 良い方法では、演算子に渡される関数は(副作用なしで)きれいでなければなりませんが、ソケットの場合、これは問題があります。 通常、純粋な関数はI / Oにあまり適していません。 呼び出しをソケットに同期させるか、ConnectionPool'aのようなものを実装する必要があります。ここでは、タスクから続行する必要があります。



サブスクライバー(オブザーバーでもある)による応答の処理は別のスレッドで実行されることに注意してください。これは常に良いとは限りません。 たとえば、グラフィカルインターフェイスで回答を表示する場合、ほとんどの場合、メインスレッドではこれを行わないという例外が発生します。 これを行うには、グラフィカルインターフェイスを担当するフレームワークのイベントキューにハンドラーを配置する必要があります。 異なるフレームワークでは、これは異なる方法で行われます。 JavaFXには、このためのPlatform.runLater(実行可能)メソッドがあります。 応答ハンドラーで直接呼び出すか、独自のスケジューラーを作成できます。



 public final class FxScheduler extends Scheduler { private final static FxScheduler m_instance = new FxScheduler(); private FxScheduler() {} public static FxScheduler getInstance() { return m_instance; } @Override public Worker createWorker() { return new Worker() { private final CompositeSubscription m_subscription = new CompositeSubscription(); @Override public Subscription schedule(Action0 action0) { Platform.runLater(action0::call); return m_subscription; } @Override public Subscription schedule(Action0 action0, long delay, TimeUnit timeUnit) { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { Platform.runLater(action0::call); } }, timeUnit.toMillis(delay)); return m_subscription; } @Override public void unsubscribe() { m_subscription.unsubscribe(); } @Override public boolean isUnsubscribed() { return m_subscription.isUnsubscribed(); } }; } }
      
      





ところで、Androidの場合、RxJavaにはAndroidSchedulers.mainThread()があります-RxJavaのアドオンです。 コマンドを送信する例は次のようになります。



 send("echo 123") .observeOn(FxScheduler.getInstance()) .subscribe( answer -> { /* */ }, throwable -> { /* */ } );
      
      





ここでは、すでにobserveOn()を使用しています。RxJavaに「次のオブザーバはそのようなスケジューラを介して実行する必要がある」と伝える必要があります。



RxJavaは、便利なオペレーターパイプライン管理を提供します。 .map(bytes-> sendBytes(bytes))の隣に、チェックサム計算を追加し、エンコードを介してバイトを実行できます。 最初に発信コマンドのロギングを追加し、最後に受信した応答を追加できます。 一般的に、あなたはその考えを理解しました。



2.エンジンとセンサーでスキャンする



ポイントのセットがあります-これらは、エンジンの回転角度(度単位)またはエンジンによって駆動されるデバイスの位置です。 一般的に、ある種のアクチュエータがあります。 また、値を取得できる外部センサーがあります。 センサーから値を取得するために各ポイントでグラフの曲線を作成するために、一連のポイントを介してエンジンを駆動する必要があります。 手順をn回繰り返します(グラフのn個の曲線)。 同時に、エンジンはすぐには動作せず、位置に到達するまで待つ必要があります。



そのため、ポイントのセットがあり、それぞれに何かを行う必要があり(できれば別のスレッドで)、結果はGUIスレッドで処理されます(たとえば、LineChartにポイントを追加します)。 RxJavaの典型的なタスクのように聞こえます。



 public Observable<Point> startScan(List<Double> trajectory, int iterationCount) { return Observable.from(trajectory) .subscribeOn(Schedulers.io()) .doOnNext(this::moveMotor) .doOnNext(this::blockUntilTargetReached) .map(this::createResultPoint) .repeat(iterationCount); }
      
      





Schedulers.io()を使用します。エンジンとセンサーの制御はすべて同じI / O操作です。 moveMotor()は、コマンドをエンジンに送信します(たとえば、以前に作成したConnectionを介して)。



blockUntilTargetReached()は、エンジンに位置を要求し、ターゲットと比較して、エンジンがまだ到着していない場合、ストリームを数ミリ秒間スリープ状態にします。 createResultPoint()は、センサーに値を要求し、数値のペア(ターゲット位置とセンサーからの値)を含むクラスPointのオブジェクトを返します。 repeat()は、retry()とほぼ同じように機能します。毎回、最初からストリーム全体を繰り返し、エラーの後にのみ再試行します。



オリジナルのObservableは一度に1つのポイントを生成します。 彼は、前のポイントがすべてのオペレーターを加入者に渡すときにのみ、次のポイントを与えます。 これは、レイジーコンピューティングとストリーム処理による機能的アプローチと一致しています。 StreamAPIとLINQは同じように機能します。 このため、スキャンはforEach(this :: moveMotor)ではなくforEach(this :: blockUntilTargetReached)などではなく、順番にポイントを通過します。



アプリケーション:



 final List<Double> trajectory = ...; final int n = ...; startScan(trajectory, n) .observeOn(FxScheduler.getInstance()) .subscribe( point -> processPoint(point), throwable -> processError(throwable), () -> processData() );
      
      





問題は、サブスクライバーがポイントを受け取った繰り返しを区別しないことです。 つまり、n個の曲線の代わりに、1つの曲線がn倍長くなります。 新しいスキャンが開始されたことを手動で追跡する必要があります。 たとえば、ポイントの数をカウントし、カウンター値がパス内のポイントの数を超えた場合に新しい曲線を開始します。 または、到着したポイントを軌跡の最初のポイントと比較します。



3番目の引数は、subscribe()に登場しました-これは、要素がObservableで終了したときに呼び出されるonComplete()ハンドラです。



subscribe()は、Subscriptionインターフェイスを持つオブジェクトを返します。 unsubscribe()メソッドを呼び出すと、Observableはデータを受け入れるサブスクライバーを持たなくなり、データの発行を停止します。 レイジーコンピューティングの原則-誰もがデータを必要としない場合、データを転送する必要はありません。 それでもやはり、関数型プログラミングのパラダイムに従ってオペレーターに副作用はないはずなので、Observableはサブスクライバーなしでオペレーターを実行するだけでは意味がありません。 unsubscribe()を使用して、スキャンキャンセルを実装できます。 エンジンが移動を停止するコマンドを送信する必要がない限り、unsubscribe()はこれに責任を負いません。



3.データ処理とレポート



スキャン後、多くの有用なデータが得られました。次に、それらを処理し、必要な値を計算して、pdfレポートを生成する必要があります。



レポートには、インターフェイスの一部のフィールドの値(たとえば、ユーザーの名前)と受信したグラフの図も含まれている必要があります。 JavaFXの場合、スナップショット()メソッドを使用して描画を取得できます。これは、すべてのグラフィックオブジェクトが持っています。 これらはJavaFXオブジェクトのアクションであるため、GUIスレッドで実行する必要があります。 このために、すでにFxSchedulerがあります。



 class ReportMetaInfo { private String fileName; private String name; private WritableImage image; } final Observable<ReportMetaInfo> reportGuiData = Observable.just(m_reportInfoProvider) .subscribeOn(FxScheduler.getInstance()) .map(provider -> { final ReportMetaInfo info = new ReportMetaInfo(); info.fileName = provider.getFileName(); info.name = provider.getName(); info.image = provider.getChartSnapshot(); return info; });
      
      





m_reportInfoProviderは、ReportInfoProviderインターフェイスの実装であり、モデルとビューの間のレイヤーです。 本質的に、これはTextViewからgetterへの呼び出しですが、モデルはすべて同じであり、インターフェイスのみがあります。



計算には、Schedulers.computation()があります。



 final Observable<ScanResult> reportComputationalData = Observable.just(scanData) .subscribeOn(Schedulers.computation()) .map(data -> new ResultProcessor(data).calculateAll());
      
      





次に、フォームからのデータと計算からのデータを組み合わせて、すべてを重いPDFファイルに入れたいと思います。 これには、zip()およびSchedulers.io()演算子があります。



 class ReportData { ReportMetaInfo metaInfo; ScanResult result; ReportData(ReportMetaInfo metaInfo, ScanResult result) { this.metaInfo = metaInfo; this.result = result; } } Observable.zip( reportGuiData, reportComputationalData, (reportInfo, scanResult) -> new ReportData(reportInfo, scanResult) ) .observeOn(Schedulers.io()) .map(reportData -> ReportGenerator.createPdf( reportData.metaInfo.fileName, reportData.metaInfo.name, reportData.metaInfo.image, reportData.result )).subscribe( isOk -> { /* ,  -,   */ }, throwable -> { /* -    */ }, () -> { /*   ,     */ } );
      
      





zip()は最大9つの異なるObservableを受け取り、それらの要素をタプルに結合します。 接続用の関数とタプルの結果の型を自分で提供する必要があります。 その結果、インターフェース(グラフの画像を含む)からのデータの受信とスキャン結果の処理は並行して行われます。 そのようなアクションの並列化が必要かどうかは、特定のタスクとデータ量に依存します-少し単純化した例を挙げました。



複数のデータストリームがある場合、バックプレッシャーが発生する可能性があることに注意してください。 これらは、ObservableとObserverの異なるストリームパフォーマンスと異なるパフォーマンスに関連するさまざまな問題です。 一般的に、これらは誰かがアイドル状態で、誰かがすでにバッファをオーバーフローしている状況です。 したがって、注意する必要があります。



おわりに



ほとんどの場合、これらのタスクには他の(そしてより効果的な)ソリューションがあります-誰かが私にそれらを指摘した場合、私はこれを考慮に入れて作業を考慮します。 これらのタスクを例として使用して、RxJavaの機能のいくつかを示しました:エラー処理、subscribeOn()とobserveOn()の違い、GUIスレッドでのカスタムスケジューラと結果の取得、遅延コンピューティングの原理、外部デバイスを制御するためのアプリケーション、監視可能な操作の中断、並列操作いくつかの観測可能なものとその組み合わせ。 したがって、これらのタスクがRxJavaで完全に成功していなくても、原則自体は他の人にとって有用です。



All Articles