Androidバックグラウンドチュートリアル。 パート4:RxJava



イベント処理はループです。



最後の部分では、Androidのバックグラウンド作業にスレッドプールエグゼキューターを使用することについて説明しました。 このアプローチの問題は、イベントディスパッチャーが結果の処理方法を知っていることです。 次に、RxJavaが提供するものを見てみましょう。



免責事項:これは、AndroidでRxJavaを使用する方法に関する記事ではありません。 インターネット上のそのようなテキスト、そして突破口。 これは、ライブラリの実装の詳細についてです。



一般的に、RxJavaはバックグラウンドでの作業専用のツールではなく、イベントフローを処理するためのツールです。





バックグラウンド作業は、この処理の1つの側面にすぎません。 このアプローチの背後にある一般的な考え方は、スケジューラを使用することです。 このクラスのコードを直接見てみましょう:



public abstract class Scheduler { @NonNull public Disposable scheduleDirect(@NonNull Runnable run) { ... } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { ... } @NonNull public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { ... } { @NonNull public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) { ... } }
      
      





かなり複雑ですね。 良いニュースは、自分で実装する必要がないことです! ライブラリには、Schedulers.io()、Schedulers.computation()などのプランナーが既に含まれています。 必要なのは、スケジューラインスタンスをRxチェーンのsubscribeOn()/ observeOn()メソッドに渡すことだけです。



 apiClient.login(auth) // some code omitted .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
      
      





その後、RxJavaが残りを実行します。ステートメントに渡すラムダを取得し、目的のスケジューラで実行します。



たとえば、オブザーバーにユーザーインターフェースを変更させたい場合は、AndroidSchedulers.mainThread()をobserveOn()に渡すだけです。 そして問題なのは、過剰な接続性、プラットフォーム固有のコード、幸福感がなくなることです。 もちろん、AndroidSchedulersは元のRxJavaライブラリには含まれていませんが、別のライブラリとして接続されていますが、これはbuild.gradleの別の行にすぎません。



そして、スレッドとは何ですか? 秘Theは、rxChainのどこにでもsubscribeOn()/ observeOn()を配置できないことです(これは便利でしょうか?)代わりに、これらのオペレーターがスケジューラを取得する方法を考慮する必要があります。 まず、map、flatMap、filter、または他の何かを呼び出すたびに、新しいオブジェクトを取得することを理解しましょう。



例:



 private fun attemptLoginRx() { showProgress(true) apiClient.login(auth) .flatMap { user -> apiClient.getRepositories(user.repos_url, auth) } .map { list -> list.map { it.full_name } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally { showProgress(false) } .subscribe( { list -> showRepositories(this, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } ) }
      
      





そのため、ほぼすべての行で新しいオブジェクトが作成されます。



 // new SingleFlatMap() val flatMap = apiClient.login(auth) .flatMap { apiClient.getRepositories(it.repos_url, auth) } // new SingleMap val map = flatMap .map { list -> list.map { it.full_name } } // new SingleSubscribeOn val subscribeOn = map .subscribeOn(Schedulers.io()) // new SingleObserveOn val observeOn = subscribeOn .observeOn(AndroidSchedulers.mainThread()) // new SingleDoFinally val doFinally = observeOn .doFinally { showProgress(false) } // new ConsumerSingleObserver val subscribe = doFinally .subscribe( { list -> showRepositories(this@LoginActivity, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } ) }
      
      





また、たとえば、SingleMapは、チェーンの最後にある.subscribe()の呼び出しで始まる呼び出しのチェーンを通じてスケジューラを受け取ります。



  @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(SingleObserver<? super T> subscriber) { ObjectHelper.requireNonNull(subscriber, "subscriber is null"); subscriber = RxJavaPlugins.onSubscribe(this, subscriber); ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null"); try { subscribeActual(subscriber); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); NullPointerException npe = new NullPointerException("subscribeActual failed"); npe.initCause(ex); throw npe; } }
      
      





subsribeActualは、次のように各単一演算子に実装されます。



 source.subscribe()
      
      





ここで、sourceは現在の演算子の前の演算子であるため、作業を行い、最初に作成されたSingleに到達するチェーンが作成されます。 私たちの場合、これはSingle.fromCallableです:



 override fun login(auth: Authorization): Single<GithubUser> = Single.fromCallable { val response = get("https://api.github.com/user", auth = auth) if (response.statusCode != 200) { throw RuntimeException("Incorrect login or password") } val jsonObject = response.jsonObject with(jsonObject) { return@with GithubUser(getString("login"), getInt("id"), getString("repos_url"), getString("name")) } }
      
      





このラムダ内で、ネットワーク呼び出しを行います。



しかし、スケジューラーはどこにありますか? ここで、SingleSubsribeOn内:



 @Override protected void subscribeActual(final SingleObserver<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source); s.onSubscribe(parent); Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); }
      
      





この場合、スケジューラはsubsribeOn()メソッドに渡したものです。



このすべてのコードは、チェーンに渡したスケジューラーが演算子lambdasに渡したコードによってどのように使用されるかを示しています。



observeOn()メソッドにも注意してください。 クラスのインスタンス(この場合はSingleObserveOn)を作成し、そのsubscribeActialはすでに簡単に見えます。



 @Override protected void subscribeActual(final SingleObserver<? super T> s) { source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler)); }
      
      





しかし、ここではObserveOnSingleObserverの方がはるかに興味深いです。



 ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) { this.actual = actual; this.scheduler = scheduler; } @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); }
      
      





obserOnがスケジューラースレッドで呼び出されると、オブザーバーが呼び出されます。これにより、スレッドを直接rxChainに切り替える可能性が開きます。 、他の何かをカウントしてから、subscribeのコードに移動します。



RxJavaはかなり複雑な「内部」であり、イベントを処理する(そして結果としてバックグラウンド作業を管理する)非常に柔軟で強力なツールです。 しかし、私の考えでは、このアプローチには欠点があります。



  1. RxJavaの学習には多くの時間がかかります
  2. 学習する必要があるオペレーターの数は多く、両者の違いは明らかではありません
  3. RxJavaの呼び出しのスタックトレースは、自分で作成したコードとはほとんど関係ありません。


次は? もちろん、Kotlin corutins!



シリーズの以前の記事:





著者から: メビウス会議は明日と明後日開催され、コトリンのコルーチンについてお話します。 一連の記事が興味深く、継続したい場合は、彼女の訪問を決定するのに遅すぎません!



All Articles