![](https://habrastorage.org/webt/vn/yi/x9/vnyix9bhqbcrtq11czmjjgusjhg.jpeg)
イベント処理はループです。
最後の部分では、Androidのバックグラウンド作業にスレッドプールエグゼキューターを使用することについて説明しました。 このアプローチの問題は、イベントディスパッチャーが結果の処理方法を知っていることです。 次に、RxJavaが提供するものを見てみましょう。
免責事項:これは、AndroidでRxJavaを使用する方法に関する記事ではありません。 インターネット上のそのようなテキスト、そして突破口。 これは、ライブラリの実装の詳細についてです。
一般的に、RxJavaはバックグラウンドでの作業専用のツールではなく、イベントフローを処理するためのツールです。
バックグラウンド作業は、この処理の1つの側面にすぎません。 このアプローチの背後にある一般的な考え方は、スケジューラを使用することです。 このクラスのコードを直接見てみましょう:
public abstract class Scheduler { @NonNull public Disposable scheduleDirect(@NonNull Runnable run) { ... } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { ... } @NonNull public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { ... } { @NonNull public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) { ... } }
かなり複雑ですね。 良いニュースは、自分で実装する必要がないことです! ライブラリには、Schedulers.io()、Schedulers.computation()などのプランナーが既に含まれています。 必要なのは、スケジューラインスタンスをRxチェーンのsubscribeOn()/ observeOn()メソッドに渡すことだけです。
apiClient.login(auth) // some code omitted .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
その後、RxJavaが残りを実行します。ステートメントに渡すラムダを取得し、目的のスケジューラで実行します。
たとえば、オブザーバーにユーザーインターフェースを変更させたい場合は、AndroidSchedulers.mainThread()をobserveOn()に渡すだけです。 そして問題なのは、過剰な接続性、プラットフォーム固有のコード、幸福感がなくなることです。 もちろん、AndroidSchedulersは元のRxJavaライブラリには含まれていませんが、別のライブラリとして接続されていますが、これはbuild.gradleの別の行にすぎません。
そして、スレッドとは何ですか? 秘Theは、rxChainのどこにでもsubscribeOn()/ observeOn()を配置できないことです(これは便利でしょうか?)代わりに、これらのオペレーターがスケジューラを取得する方法を考慮する必要があります。 まず、map、flatMap、filter、または他の何かを呼び出すたびに、新しいオブジェクトを取得することを理解しましょう。
例:
private fun attemptLoginRx() { showProgress(true) apiClient.login(auth) .flatMap { user -> apiClient.getRepositories(user.repos_url, auth) } .map { list -> list.map { it.full_name } } .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doFinally { showProgress(false) } .subscribe( { list -> showRepositories(this, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } ) }
そのため、ほぼすべての行で新しいオブジェクトが作成されます。
// new SingleFlatMap() val flatMap = apiClient.login(auth) .flatMap { apiClient.getRepositories(it.repos_url, auth) } // new SingleMap val map = flatMap .map { list -> list.map { it.full_name } } // new SingleSubscribeOn val subscribeOn = map .subscribeOn(Schedulers.io()) // new SingleObserveOn val observeOn = subscribeOn .observeOn(AndroidSchedulers.mainThread()) // new SingleDoFinally val doFinally = observeOn .doFinally { showProgress(false) } // new ConsumerSingleObserver val subscribe = doFinally .subscribe( { list -> showRepositories(this@LoginActivity, list) }, { error -> Log.e("TAG", "Failed to show repos", error) } ) }
また、たとえば、SingleMapは、チェーンの最後にある.subscribe()の呼び出しで始まる呼び出しのチェーンを通じてスケジューラを受け取ります。
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(SingleObserver<? super T> subscriber) { ObjectHelper.requireNonNull(subscriber, "subscriber is null"); subscriber = RxJavaPlugins.onSubscribe(this, subscriber); ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null"); try { subscribeActual(subscriber); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); NullPointerException npe = new NullPointerException("subscribeActual failed"); npe.initCause(ex); throw npe; } }
subsribeActualは、次のように各単一演算子に実装されます。
source.subscribe()
ここで、sourceは現在の演算子の前の演算子であるため、作業を行い、最初に作成されたSingleに到達するチェーンが作成されます。 私たちの場合、これはSingle.fromCallableです:
override fun login(auth: Authorization): Single<GithubUser> = Single.fromCallable { val response = get("https://api.github.com/user", auth = auth) if (response.statusCode != 200) { throw RuntimeException("Incorrect login or password") } val jsonObject = response.jsonObject with(jsonObject) { return@with GithubUser(getString("login"), getInt("id"), getString("repos_url"), getString("name")) } }
このラムダ内で、ネットワーク呼び出しを行います。
しかし、スケジューラーはどこにありますか? ここで、SingleSubsribeOn内:
@Override protected void subscribeActual(final SingleObserver<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source); s.onSubscribe(parent); Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); }
この場合、スケジューラはsubsribeOn()メソッドに渡したものです。
このすべてのコードは、チェーンに渡したスケジューラーが演算子lambdasに渡したコードによってどのように使用されるかを示しています。
observeOn()メソッドにも注意してください。 クラスのインスタンス(この場合はSingleObserveOn)を作成し、そのsubscribeActialはすでに簡単に見えます。
@Override protected void subscribeActual(final SingleObserver<? super T> s) { source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler)); }
しかし、ここではObserveOnSingleObserverの方がはるかに興味深いです。
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) { this.actual = actual; this.scheduler = scheduler; } @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); }
obserOnがスケジューラースレッドで呼び出されると、オブザーバーが呼び出されます。これにより、スレッドを直接rxChainに切り替える可能性が開きます。 、他の何かをカウントしてから、subscribeのコードに移動します。
RxJavaはかなり複雑な「内部」であり、イベントを処理する(そして結果としてバックグラウンド作業を管理する)非常に柔軟で強力なツールです。 しかし、私の考えでは、このアプローチには欠点があります。
- RxJavaの学習には多くの時間がかかります
- 学習する必要があるオペレーターの数は多く、両者の違いは明らかではありません
- RxJavaの呼び出しのスタックトレースは、自分で作成したコードとはほとんど関係ありません。
次は? もちろん、Kotlin corutins!
シリーズの以前の記事:
著者から: メビウス会議は明日と明後日開催され、コトリンのコルーチンについてお話します。 一連の記事が興味深く、継続したい場合は、彼女の訪問を決定するのに遅すぎません!