RxJavaを使用したAndroid非同期プロセスの調整。 Yandexの経験

みなさんこんにちは、アレクセイ・アガピトフです。今日は

RxJavaのようなライブラリを使用すると、多くのことを簡単に処理できます

Androidアプリケーションの非同期プロセス。







独自のコールドシーケンスとホットシーケンスを作成する方法を分析します。

RxJavaを使用する際のいくつかのニュアンスへの注意

このライブラリが提供する強力なツールはどれくらいか

演算子。







Yandex.Real Estateアプリケーションとその例を使用して、すべてについて説明します

地図付きのホーム画面。







スクリリオンショット






まず、画面を見て、画面上で何が起こるか、そして何をするかを見てみましょう

実装されます。









GIF、13 mb
GIF






まず第一に、カードとの相互作用があります:人はカードを動かすことができます

フィルタに一致する広告のあるドットが表示されます。

ポイントは、単一のアナウンスメント、新しい建物、住宅、クラスター、

たくさんの広告をまとめる。 単一の広告が

表示済みとしてマークされます(このフラグはデバイスにローカルに保存されます)。







フィルタ自体は別の画面で変更されますが、プロンプトが表示されたら使用する必要があります。

地図上の興味のあるポイント。







リクエストの別のコンポーネントは、探している地理的オブジェクトです

発表。







ジオブジェクトのスクリーシンショット







この要素は、このオブジェクトまたはマップ上で現在開いているエリアで検索をすばやくオン/オフするために必要です。







したがって、マップ上のポイントは、リストされたそれぞれに対して更新する必要があります

アクション(マップ、フィルター、またはジオオブジェクトの変更)。 簡潔にするために、

地図上にオブジェクトを描画することを検討してください。

ジオオブジェクトの特殊なケースです。







これに加えて、他のスレッドで発生する2つのプロセスがあります。WebAPIからポイントを受信し、これらのポイントのいずれがこのデバイスで既に表示されているかを確認します(このため、データベースを参照します)。







マップ、フィルター、およびジオオブジェクトは、サーバーからのポイントの返信よりも速く頻繁に変更されることを考慮すると、最新の結果のみを使用し、以前の結果を破棄する必要があります。







したがって、かなりの量を含む画面を実装する必要があります

互いに依存する非同期プロセス。







RxJavaと従来のAndroidアプローチの比較



従来のAndroidのアプローチでは、考慮されるそれぞれを監視するために、

コールバックを使用するプロセス。 変更イベントが発生したとき

マップ上の構成要素(たとえば、マップが移動される)、残りを読む

コンポーネントを1つのリクエストに結合して実行します。







このアプローチを実装すると、いくつかの困難が生じます。







  1. コールバックは互いに不十分に結合します。

    1. コードを読みにくい-コールバックの相互接続を理解するのが難しく、判断する

      誰が誰に依存しているか、コールバックはコードによって分散されており、その中でナビゲートすることはより困難です。
    2. コードの柔軟性が失われます-オプションが少なくなります

      再利用する場合、既存のソリューションに変更を加えることはより困難です。
  2. に関連付けられた追加の状態を明示的に保存する必要があります

    非同期操作とそのコールバック。 そのような状態変数が多いほど、

    間違いを犯す可能性が高くなります(たとえば、複数のスレッドで作業する場合)。


次の理由により、RxJavaライブラリを選択しました。







  1. あらゆる性質の非同期プロセスに対する普遍的な抽象化の存在

    (イベントモデル、マルチスレッド処理)Observableと呼ばれる-

    観測されたシーケンス。
  2. 演算子を使用してシーケンスを変更する機能と

    多数の便利な演算子。
  3. シーケンスを相互に結合する機能。
  4. を使用して状態変数の数を減らす

    シーケンスと演算子。
  5. ライブラリ実装の安定性と品質。


このライブラリをアプリケーションでさまざまな目的に使用します-で始まる

バックグラウンドでの読み込みとデータ処理、そして多くの処理で終わる

ユーザーインターフェイスで発生するイベント。







実装



ライブラリがアプリケーションでどのように使用されるかの例を見てみましょう。







地図の変化を見る



最初に、マップの状態を見てみましょう。 これを行うには、次のシーケンスを使用します。これは、マップの座標境界が変更されたことを報告します。







public static Observable<BoundingBox> observeMapBoundingBox(final MapController mapController) { return Observable.create(new Observable.OnSubscribe<BoundingBox>() { @Override public void call(final Subscriber<? super BoundingBox> subscriber) { final OnMapListener listener = new OnMapListener() { @Override public void onMapActionEvent(MapEvent mapEvent) { switch (mapEvent.getMsg()) { case MapEvent.MSG_SCALE_END: case MapEvent.MSG_SCROLL_END: case MapEvent.MSG_ZOOM_END: if (!subscriber.isUnsubscribed()) { subscriber.onNext(getViewportBoundingBox(mapController)); } break; } } }; mapController.addMapListener(listener); //    -    subscriber.add(Subscriptions.create(() -> { mapController.removeMapListener(listener); })); } }); }
      
      





この実装では、2つの主要な部分:







  1. カードイベントリスナーの作成と追加
  2. シーケンスがサブスクライブされていないときのこのリスナーの削除。


OnSubscribe内、つまりシーケンスがアクティブになった(誰かがサブスクライブした)ときにリスナーを作成して登録することに注意してください。







ここでは、コールドシーケンスの典型的な例-サブスクライブ中に新しい要素をリリースするものを扱っています。 このようなシーケンスの実装の優れた例は、 RxBindingライブラリです。これにより、標準APIに存在するウィジェット内のイベントとサポートライブラリを監視できます。







フィルターの変化を観察します



次に、ポイントリクエストの2番目のコンポーネントであるフィルターについて検討します。 現在のフィルターを保存し、それらを更新するメソッドを提供するクラスがあるとします。 そして、このフィールドの値の変化を観察したいと思います。 マップの場合と同じ方法で、このフィールドの変更のオブザーバーにフィールドを追加し、フィールドが変更されたときにオブザーバーに通知することができます。 ただし、フィールドには多くのオブザーバーが存在する可能性があります。つまり、配列を保存するか、シーケンスを作成するときに演算子sharepublish + autoConnectを使用する必要があります

一度に複数のシーケンスオブザーバーにイベントを送信するため。 ただし、これは消費者に対して透過的に行いたいので、ここでは、上記のすべてを転送するSubjectとしてのRxJavaライブラリのこのようなクラスの助けを借ります。

義務。







サブジェクトは、すべてのデータとその完了またはエラーの通知を受信する多くのサブスクライバーを同時に持つことができるシーケンスです。 同時に、Subjectの操作は、サブスクライバーが持つ同じメソッドonNext



onCompleted



onError



ます。 つまり、サブジェクト自体はサブスクライバーです。つまり、必要に応じて、彼は別のシーケンスにサブスクライブし、それをすべてのサブスクライバーに中継できます。







これが私たちに与えるものの例を見てみましょう:







 public class FilterHolder { private final PublishSubject<Filter> subject = PublishSubject.create(); private Filter current; public Observable<Filter> observeChanges(boolean emitCurrentValue) { return emitCurrentValue ? subject.startWith(current) : subject; } public void set(Filter filter) { this.current = filter; subject.onNext(filter); } }
      
      





ご覧のとおり、新しい値を設定すると、すべてのサブスクライバーに送信されます。 この場合、 PublishSubjectを使用して、新しく受信したデータをすべてのサブスクライバーに送信します。 原則として、 ReplaySubjectを使用して、最後に受信したデータを保存し、このデータを受信した後にサインアップしたサブスクライバーに対してそれを繰り返すことができます。 ただし、この場合、 observeChanges



メソッドの実装を変更する必要があります。現在の値を送信する代わりに、スキップします。







同様に、既存のクラスを拡張して追加できます

リアクティブ機能。







サブジェクトはホットシーケンスの例です。つまり、アクティブなままで、誰もサブスクライブしていない場合でも要素を送受信します。 主なことは、SubjectがonNext



新しいシーケンス要素をonNext



onCompleted



またはonError



が呼び出されるまで、それらをサブスクライバーに配信できることを思い出すonError



です。







これは、データ/イベントソースが無限であり、 onCompleted



およびonError



呼び出しがonError



ない状況で重要です。そのため、このデータをサブスクライバに送信するSubjectでこれらのメソッドを呼び出すと、予期しない結果が生じる可能性があります。







ポイントのAPIリクエストの3番目のコンポーネントの監視-ジオオブジェクトも同様です

件名を使用してフィルタリングおよび実装されます。







最後に、これら3つの要素をまとめて、ネットワーク要求で送信する必要があります。







API呼び出し



APIにアクセスするには、よく知られているRetrofitライブラリを使用し、ネットワーク呼び出しの結果はすべてObservableとして表示されます。







その結果、ネットワーク層のメソッドは次のようになります。







 public Observable<ClustersData> getClusters(MapBoundingBox box, Filter filter, GeoObject geo) { //   API }
      
      





すべてをまとめる



したがって、リストされているすべての非同期プロセスを結合します。







 Observable.combineLatest( observeMapBoundingBox(mapController).debounce(300L, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()), filterHolder.observeChanges(true), observeGeoObject(true), SearchRequest::clusters//      ) .switchMap(request -> networkHelper .getClusters(request.boundingBox, request.filter, request.geoObject) .observeOn(AndroidSchedulers.mainThread()) .doOnError(handleErrorAction())//  .onErrorResumeNext(Observable.empty())//  ) .observeOn(Schedulers.computation()) //  ,      .map(this::processViewedClusters) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<ClustersData>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { //     ,    } @Override public void onNext(ClustersData clustersData) { //    } });
      
      





composeLatestオペレーターを使用して、3つの値のそれぞれの変化をモニターし、そのうちの1つが変化すると、以下を記述するオブジェクトを作成します

ネットワーク要求。







この演算子は次のように機能します。渡された各シーケンスが1つの要素を提供するまで待機し、これらすべての要素を他のオブジェクトに変換する関数を呼び出します。 次に、シーケンスのいずれかに新しい要素が現れるたびに、オペレーターはこの関数を再度呼び出し、この新しい値と残りの最後の値を渡します。







CombineLatestステートメント







したがって、これはzipと非常によく似ていますが、唯一の違いは、シーケンスが新しい値を提供しなかった場合、最後の既知の要素値を使用することです。 これは、異なる周波数の要素を解放するシーケンスを組み合わせるときに便利です。たとえば、フィルターよりも頻繁に発生するカードの状態の変化などです。







ネットワーク要求オブジェクトが構築された後、要求に直接移動します。 これを行うには、APIとのやり取りを行うオブジェクトに目を向け、収集したパラメーターを渡します。 ここには2つの興味深い点があります。







しかし、最初に、元のシーケンスの各要素に対して新しいシーケンスを返し、それらをすべて1つの結果シーケンスに結合するflatMap演算子について考えます。







FlatMapオペレーター







switchMapオペレーターも同じように機能しますが、唯一の違いは、前のエレメントから受け取ったシーケンスからサブスクライブを解除し、新しいエレメントに切り替えて、その結果を待つことです。







SwitchMapステートメント







これは、ネットワークリクエストの実行が遅いために必要です。たとえば、ある人がマップを移動した場合、以前のリクエストは関係がなくなり、新しいポイントをリクエストする必要があります。







2番目のポイントは、 doOnError



およびonErrorResumeNext



(空のシーケンスを返す)を使用してネットワークエラーを抑制することです。

これは、シーケンスマップ/フィルター/ジオオブジェクト->ネットワークリクエスト->マップ上のポイントが (ネットワーク)エラーで終了した場合に壊れないようにするためです-この場合、マップへの新しい変更は結果をもたらさず、ネットワークエラー発生する可能性があり、それらを処理する必要があります。







マップ上のポイントを受け取った後の次のステップは、ユーザーが既に表示したポイントを識別することです。 これを行うには、データベースに対して要求が行われ、その後、対応するフラグがすべての表示ポイントに付加されます。 これは長時間の操作であるため、ネットワークスケジューラを解放し、計算をobserveOn(Schedulers.computation())



切り替えます。 データベースを照会するには、 Cupboardとその上のRxラッパーを使用しますが、この場合はObservable



を返すメソッドを使用できますが、通常の同期メソッドで管理しました。







おそらく、カードの位置の変化を監視するシーケンスにデバウンスステートメントが表示されていることにお気づきでしょう。これにより、不要な要素がすべて指定された時間間隔内に来た場合、不要な要素を破棄できます。 これは、ユーザーがマップを表示しているときにサーバーにリクエストを頻繁に送信しないようにするために必要です。 デフォルトでは、この演算子は計算スケジューラを使用しますが、イベントがメインスレッドで発生していることがわかっているため、メインスレッドのスケジューラでイベントを再定義できます。 これにより、特定の場所での不必要なスレッドの切り替えを回避でき、不必要なタスクから計算スケジューラを節約できます(デフォルトでは、スレッドの数はコアの数によって制限されるため)。







そして今まとめます。







少量のコード。



すべてのロジックが1つのシーケンスに収まり、データの動きとその処理のロジックが理解できるようになります。







主観的に、そのようなコードは、コールバックを使用した場合よりも簡単に見えます。 ただし、ここでは、少なくともRxJavaの基本に関する知識が必要であることを予約する必要があります。







より簡単な実装



非同期操作の中間状態を同期および保存するすべての作業は、ライブラリの肩に転送されました。 その結果、最小限の中間状態を維持します。 これにより、複数のスレッドおよび非同期プロセスで作業する際のエラーの可能性が減少します。







さらに、多くの非同期プロセスの処理を実装する代わりに、データストリームを操作し、ビジネスタスクを直接実装します。 さらに、新しい処理ステップをシーケンスに追加し、既存のステップを変更するのは簡単です。







また、シーケンスを使用したコードはテストが簡単です。

シーケンスは、テストに必要なものに置き換えることができます(コールバックの場合、より困難になります)。







たとえば、インターフェイスに関連付けられているすべてのシーケンスを次のように置き換えることができます。

just演算子を使用して値を事前設定します。








All Articles