元の記事は2017年11月29日に作成されました。翻訳は無料です。
GO-JEKでは、アプリケーションで多数の非同期操作を実行する必要がありますが、ユーザーインターフェイスの速度と滑らかさを犠牲にすることはできません。
複雑なマルチスレッドAndroidアプリケーションの作成は、非常に時間のかかるプロセスになる可能性があります。相互に関連する多数のことを処理する必要があるため、ときどきそれはあなたを圧倒します。 これと他の多くの理由により、開発中のAndroidアプリケーションでRxJavaを使用するようになりました。
この記事では、RxJavaの実際のマルチスレッド機能を使用して、アプリケーション開発プロセスを可能な限りシンプル、簡単、そして楽しいものにする方法について説明します。 以下のすべてのコード例では、RxJava 2が使用されますが、説明されている概念は他のリアクティブエクステンションに適用できます。
なぜリアクティブプログラミングなのか?
リアクティブプログラミングに関する各記事は、そのような義務的なブロックから始まり、この伝統を破りません。 リアクティブアプローチを使用してAndroidアプリケーションを構築することにはいくつかの利点があります。 本当に必要なものに注目しましょう。
コールバックはもうありません
長い間Android向けに開発している場合は、ネストされたコールバックを使用すると、事態が非常に複雑になり、制御不能になるのに気付いたはずです。
これは、複数の非同期操作を連続して実行し、前の操作の結果に応じてさらにアクションを実行する場合に発生します。 すぐに、コードがオーバーロードになり複雑になり、サポートできなくなります。
シンプルなエラー制御
命令的な世界では、多くの複雑な非同期操作が実行される状況では、エラーが多数の場所で発生する可能性があります。 そして、これらのエラーを処理する必要があるすべての場所で、結果として、多くの繰り返しテンプレートコードが表示され、メソッドが面倒になります。
マルチスレッドの非常に簡単な使用
私たちは皆、Javaマルチスレッドが時々複雑になることを知っています(そして密かに認めます)。 たとえば、バックグラウンドスレッドでコードを実行し、結果をメインスレッドに返します。 単純に聞こえますが、実際には回避する必要がある多くの落とし穴があります。
RxJavaを使用すると 、選択した任意のスレッドでいくつかの複雑な操作を非常に簡単に実行でき、正しい同期を管理し、問題なくスレッドを切り替えることができます。
RxJavaの利点は無限です。 私たちは何時間もそれについて話すことができ、あなたを悩ませることはできませんが、代わりに、RxJavaでマルチスレッドの実際の仕事をより深く掘り下げて学びましょう。
RxJavaはデフォルトではマルチスレッドではありません
はい、あなたはそれを正しく読みました。 とにかく、RxJavaはデフォルトではマルチスレッドではありません。 公式WebサイトでRxJavaに与えられた定義は次のようになります。
「仮想Javaマシンの監視可能なシーケンスを使用して、非同期およびイベントベースのプログラムをコンパイルするためのライブラリ。」
「非同期」という言葉を見て、多くの人はRxJavaがデフォルトでマルチスレッドであると誤って信じています。 はい、RxJavaはマルチスレッドをサポートし、非同期操作で簡単に操作できる多くの強力な機能を提供しますが、これはRxJavaのデフォルトの動作がマルチスレッドであることを意味しません。
すでにRxJavaを少し使用したことがある場合は、その基本構成を知っています。
- Observable source (ソースObservable) 、さらに
- いくつかの演算子
- ターゲットサブスクライバー(サブスクライバー)
Observable.just(1, 2, 3, 4, 5) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { println("Emitting item on: " + currentThread().getName()); } }) .map(new Function<Integer, Integer>() { @Override public Integer apply(@NonNull Integer integer) throws Exception { println("Processing item on: " + currentThread().getName()); return integer * 2; } }) .subscribeWith(new DisposableObserver<Integer>() { @Override public void onNext(@NonNull Integer integer) { println("Consuming item on: " + currentThread().getName()); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });
このサンプルコードを実行すると、すべてのアクションがアプリケーションのメインスレッドで実行されることが明確にわかります(コンソールのログのスレッド名に従ってください)。 この例は、RxJavaのデフォルトの動作がブロッキングであることを示しています。 すべては、コードが呼び出されるのと同じスレッドで実行されます。
ボーナス:
doOnNext()
は何をするのでしょうか? これは副作用ステートメントに過ぎません。
observable
オブジェクトのチェーンに入り、ダーティ(不純)操作を実行するのに役立ちます。 たとえば、デバッグ用の呼び出しチェーンに追加のコードを埋め込みます。 詳細はこちら 。
簡単な例
RxJavaを使用してマルチスレッドの操作を開始するには、 Schedulers 、 observeOn / subscribeOnなどの基本クラスとメソッドに精通する必要があります 。
最も単純な例の1つを見てみましょう。 ネットワーク要求で
Book
オブジェクトのリストを取得し、メインアプリケーションスレッドに表示するとします。 まずはかなり一般的で理解しやすい例です。
getBooks().subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<Book>() { @Override public void onNext(@NonNull Book book) { // Book } @Override public void onError(@NonNull Throwable e) { // } @Override public void onComplete() { // Book . ! } });
ここでは、ネットワーク呼び出しを行い、書籍のリストを収集する
getBooks()
メソッドを確認します。 ネットワーク呼び出しには時間がかかる(数ミリ秒または数秒)ので、
subscribeOn()
を使用し、
Schedulers.io()
スケジューラーを指定して、I / Oストリームで操作を実行します。
また、メインスレッドで結果を処理し、アプリケーションのユーザーインターフェイスで書籍のリストを表示するために、
observeOn()
演算子と
AndroidSchedulers.mainThread()
スケジューラーを使用します。
心配する必要はありません。すぐに高度なものに移ります。 この例は、深く掘り下げる前に基本的な概念を思い出すことのみを目的としていました。
スケジューラで友達を作る
RxJavaは、強力なスケジューラーセットを提供します。 ストリームに直接アクセスまたは管理することはできません。 スレッドを使用する必要がある場合は、組み込みスケジューラを使用する必要があります。
スケジューラは、あらゆる種類のタスクのスレッドまたはスレッドプール(スレッドのコレクション)と考えることができます。
簡単に言えば、別のスレッドでタスクを完了する必要がある場合、利用可能なスレッドのプールからスレッドを取得し、そのタスクを完了する忠実なスケジューラーを使用する必要があります。
RxJavaには、いくつかの種類のスケジューラがあります。 最も難しいのは、タスクに適したスケジューラを選択することです。 適切なスケジューラを選択しない限り、タスクが最適に実行されることはありません。 各プランナーを見てみましょう。
Schedulers.io()
このスケジューラは、無制限のスレッドプールに基づいており 、ファイルシステムへのアクセス、ネットワーク呼び出し、データベースへのアクセスなど、 CPUを使用せずにI / Oを集中的に使用するために使用されます。 このスケジューラのスレッドの数は無制限であり、必要に応じて増やすことができます。
Schedulers.computation()
このスケジューラは、大量のデータや画像などを処理するなど、 CPUを集中的に使用する作業を実行するために使用されます 。 スケジューラは、利用可能なプロセッサの数のサイズを持つスレッドの限定されたプールに基づいています。
このスケジューラはCPUの集中的な作業にのみ適しているため、スレッドの数は制限されています。 これは、スレッドがプロセッサー時間と競合せず、アイドル状態にならないようにするためです。
Schedulers.newThread()
このスケジューラは、呼び出しごとに完全に新しいスレッドを作成します。 この場合、スレッドプールを使用してもメリットはありません。 ストリームの作成と破棄は非常に高価です。 スレッドの過剰な作成を乱用しないように注意する必要があります。これにより、システムの速度低下とメモリオーバーフローが発生する可能性があります。 監視可能なソースから受け取った各アイテムを処理するための新しいスレッドが作成されます 。
理想的には、主にプログラムの実行時間の長い部分を別のストリームに表示するために、このスケジューラをほとんど使用しないでください。
Schedulers.single()
このスケジューラは、タスクの順次実行に使用される単一のスレッドに基づいています。 アプリケーションのさまざまな場所に一連のバックグラウンドタスクがある場合に非常に役立ちますが、これらのタスクの複数を同時に実行することはできません。
Schedulers.from(Executorエグゼキューター)
このスケジューラは、独自の
Executor
基づいています。 スレッド割り当ての独自のロジックに基づいて、スケジューラで特定のタスクを実行する必要がある状況が発生する場合があります。
アプリケーションが行う同時ネットワーク呼び出しの数を制限するとします。 制限されたサイズのスレッドプール(
Scheduler.from(Executors.newFixedThreadPool(n))
)に基づいて動作する独自のスケジューラーを作成し、ネットワークコールに関連するすべての場所で使用できます。
AndroidSchedulers.mainThread()
これは、RxJavaライブラリでは利用できない特別なスケジューラです。 このスケジューラにアクセスするには、 RxAndroid拡張ライブラリを使用する必要があります。 このスケジューラは、Androidアプリケーションでユーザーインターフェイススレッドでアクションを実行するのに役立ちます 。
デフォルトでは、このスケジューラはメインスレッドに関連付けられた
Looper
ジョブをキューに入れますが、オーバーライドする可能性があります:
AndroidSchedulers.from(Looper looper)
。
注:
Schedulers.io()
など、無制限のスレッドプールに基づくスケジューラーを使用する場合は注意してください。 スレッドの数が無限に増加するリスクは常にあります。
subscribeOn()およびobserveOn()を理解する
スケジューラーのタイプについて理解できたので 、 subscribeOn()およびobserveOn()を詳細に見てみましょう 。
RxJavaでマルチスレッドを使用して専門的に作業するには、これら2つの演算子が別々にどのように機能するかを深く理解する必要があります。
subscribeOn()
簡単に言えば、 このステートメントは、 ソースobservableがどのストリームでelementsを送信するかを示しています。 「ソース」という言葉の重要性を理解する必要があります 。 observableのチェーンがある場合、ソース(ソースobservable)は常にルート要素、またはイベントが生成されるチェーンの最上部になります。
すでに見たように、
subscribeOn()
使用しない場合、すべてのイベントはコードが呼び出されたスレッド(この場合は
main
スレッド)で発生します。
subscribeOn()
および
Schedulers.computation()
スケジューラーを使用して、イベントを計算ストリームにリダイレクトしましょう。 次のコード例を実行すると、プールで使用可能な計算スレッドの1つ
RxComputThreadPool-1
でイベントが発生することがわかります。
コードを削減するために、
onError()
および
onComplete()
を再定義する必要がないため、すべての
DisposableSubscriber
メソッドを完全に再定義することはしません。
doOnNext()
とラムダを使用します。
Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.computation()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
呼び出しチェーンのどこで
subscribeOn()
を使用するかは重要ではありません。 observable source (source observable)でのみ動作し、observable sourceがイベントを送信するストリームを制御します。
次の例では、他のオブザーバブルオブジェクトがオブザーバブルソースの後に作成され(
map()
および
filter()
メソッドを使用)、
subscribeOn()
演算子がコールチェーンの最後に配置されます。 しかし、このコードを実行するとすぐに、すべてのイベントが
subscribeOn()
指定されたストリームで発生することに気付くでしょう。
observeOn()
を呼び出しチェーンに追加すると、これはより明確になります。 そして、
subscribeOn()
を
observeOn()
下に
observeOn()
、作業のロジックは変わりません。
subscribeOn()
は、観測可能なソースでのみ機能します。
Observable.just(1, 2, 3, 4, 5, 6) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .map(integer -> integer * 3) .filter(integer -> integer % 2 == 0) .subscribeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
また、同じコールチェーンで
subscribeOn()
複数回使用できないことを理解することも重要です。 もちろん、もう一度書くことはできますが、変更は必要ありません。 以下の例では、3つの異なるスケジューラーを順番に呼び出していますが、起動時にどのスケジューラーが機能するかを推測できますか?
Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
Schedulers.io()
と回答した場合、あなたは正しいです! 何度も呼び出しを行っても、 監視可能なソースの後に呼び出される最初の
subscribeOn()
のみが機能します。
ボンネットの下
検討された例のより詳細な研究にもう少し時間をかける価値があります。 なぜ
Schedulers.io()
スケジューラーのみが機能するのですか? 通常、
Schedulers.newThread()
はチェーンの最後にあるため、誰もが機能すると考えています。
RxJavaでは、
Observable
のすべてのインスタンスのコールバック後にサブスクリプションが作成されることを理解する必要があります。 以下のコードはこれを理解するのに役立ちます。 これは以前にレビューされた例ですが、より詳細に描かれています。
Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5); Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0); Observable<Integer> o3 = o2.map(integer -> integer * 10); o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
すべてがどのように機能するかを理解するために、例の最後の行からすべてを分析し始めます。 その中で、ターゲットサブスクライバーは、オブザーバブルオブジェクト
o3
で
subscribe()
メソッドを呼び出します。このメソッドは、親オブザーバブルオブジェクト
o2
暗黙的に
subscribe()
を呼び出します。
o3
オブジェクトによって提供されるオブザーバーの実装は、送信された数値に10を掛けます。
プロセスが繰り返され、
o2
暗黙的に
o1
オブジェクトで
subscribe()
を呼び出し、偶数の処理のみを許可するオブザーバー実装を渡します。 これでルート要素(
o1
)に到達しました。これには、
subscribe()
後続の呼び出しの親がありません。 この段階で、 観測可能な要素のチェーンが完了し、その後、観測可能なソースが要素の送信(放射)を開始します。
RxJavaのサブスクリプションの概念を理解する必要があります。 ここまでで、 監視可能なオブジェクトのチェーンがどのように形成され、イベントが監視可能なソースからどのように伝播するかを理解する必要があります。
observeOn()
これまで見てきたように、
subscribeOn()
は、特定のストリームに要素を送信するように監視可能なソースに指示し、このストリームはSubscriberまで要素をプロモートする責任があります。 したがって、デフォルトでは、サブスクライバーは同じストリームで処理済みアイテムを受け取ります。
しかし、これはあなたが期待する動作ではないかもしれません。 ネットワークからデータを取得して、ユーザーインターフェイスに表示するとします。
2つのことを行う必要があります。
- ノンブロッキングI / Oストリームでネットワーク呼び出しを行います
- メインアプリケーションスレッドで結果を取得する
入力/出力ストリームでネットワーク呼び出しを行い、結果をサブスクライバーに渡す
Observable
があります。
subscribeOn(Schedulers.io())
のみを使用する場合、ターゲットサブスクライバーは同じ入力/出力ストリームで結果を処理します。 メインスレッドではAndroidのユーザーインターフェイスしか操作できないため、幸運ではありませんでした。
今すぐフローを切り替える必要があり、このために
observeOn()
を使用します。
observeOn()
がコールチェーンで発生すると、observable sourceによって送信された要素は
observeOn()
指定されたストリームに直ちに転送されます。
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
この発明された例では、ネットワークからの整数の受信と、観測可能なソースからのさらなる伝送を観察します。 実際の例では、これは他の非同期操作、たとえば、大きなファイルの読み取り、データベースからのデータのフェッチなどです。 この例を実行して結果を確認し、コンソールのログをたどるだけです。
次に、データ処理中にスレッドを切り替えるために
observeOn()
が数回呼び出される、より複雑な例を考えてみましょう。
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.computation()) .map(integer -> { println("Mapping item " + integer + " on: " + currentThread().getName()); return integer * integer; }) .observeOn(Schedulers.newThread()) .filter(integer -> { println("Filtering item " + integer + " on: " + currentThread().getName()); return integer % 2 == 0; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
上記の例では、
subscribeOn()
を
Schedulers.io()
とともに使用したため、監視可能なソースは要素を出力入力ストリーム内のハンドラーのチェーンに渡します。 次に、
map()
演算子を使用して各要素を変換しますが、計算ストリームでこれを行う必要があります。 これを行うには、
map()
を呼び出してストリームを切り替え、要素をターゲットの計算ストリームに転送する前に、
observeOn()
と
Schedulers.computation()
を使用します。
次のステップでは、いくつかの要素を除外し、何らかの理由で、各要素の新しいスレッドでこの操作を実行します。
observeOn()
再度使用しますが、
filter()
演算子を呼び出して各要素を新しいスレッドに渡す前に、すでに
Schedulers.newThread()
とペアになっています。
その結果、サブスクライバーがユーザーインターフェイスストリームで処理の結果を受け取るようにします。 これを行うには、
observeOn()
と
AndroidSchedulers.mainThread()
スケジューラーを使用します。
しかし、
observeOn()
数回
observeOn()
て使用するとどうなりますか? 次の例では、サブスクライバーはどのスレッドで結果を受け取りますか?
getIntegersFromRemoteSource() .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.single()) .observeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));
この例を実行すると、サブスクライバーが
RxComputationThreadPool-1
計算ストリームの要素を受け取ることがわかります。 これは、
observeOn()
最後の呼び出しが
observeOn()
したことを意味します。 なぜだろうか?
ボンネットの下
おそらくあなたはすでに推測しました。 知っているように、サブスクリプションは
Obsevable
すべてのラウンド
Obsevable
後に呼び出されますが、イベント(エミッション)の送信では、すべてが逆に発生します。つまり、コードが記述されている通常の方法です。 呼び出しは、観測可能なソースから、さらに呼び出しチェーンを下ってサブスクライバーに到達します。
observeOn()
演算子は常に直接の順序で機能するため、フローは順番に切り替えられ、最後に計算ストリームに切り替えられます(
observeOn(Schedulers.computation())
)。 したがって、新しいストリームのデータを処理するためにストリームを切り替える必要がある場合は、
observeOn()
に
observeOn()
呼び出してから、要素を処理します。 同期、競合状態の例外、これらすべて、およびマルチスレッドRxJavaのその他の多くの困難がユーザーに代わって処理します。
まとめ
これで、RxJavaを適切に使用して、ユーザーインターフェイスの高速でスムーズな操作を提供するマルチスレッドアプリケーションを作成する方法について、かなり良いアイデアが得られました。
すぐに理解が得られない場合、それは大丈夫です。 記事をもう一度読み、コード例を試してください。 理解するには多くのニュアンスがあります。時間をかけてください。