RxJava 2 for Androidの探索

ここに画像の説明を入力してください







私の名前はArkadyです。私はBadooのAndroid開発者です。 最近のブログには、Go、PHP、JS、QAに関する多くの投稿があり、モバイル開発に関するトピックでそれらを薄めることにしました。 RxJava 1からRxJava 2に1つのAndroidプロジェクトを移植し、インターネットでこのトピックに記載されているすべてのものを読みました。 特に、GOTOコペンハーゲン2016カンファレンスのJake Wortonのレポートです。これは翻訳にふさわしい候補であるように思われました。多くのAndroid開発者がRxJava 2への切り替えを考えていると思います。







Jakeはリアクティブプログラミングについてかなり多くの紹介を行ったため、この記事を理解するためにRxJava 1の知識は必要ありません。 レポートは、RxJava2がリリースの準備ができたときに準備されました(バージョン2.1.0はすでにリリースされています)。









反応する理由



なぜ全員がリアクティブプログラミングについて突然話し始めたのですか? アプリケーションを完全に同期化できない場合、単一の非同期リソースがあると、従来の命令型プログラミングスタイルが完全に壊れてしまいます。 「すべてが機能しなくなる」という意味ではなく、「複雑さを増す」という意味での「ブレイク」。その結果、命令型プログラミングのすべての利点が失われ始めます。







私がこれを深刻な問題と考える理由を説明するために、例を挙げます。







いくつかの修飾子を使用してUserオブジェクトを取得できる単純なクラスから始めましょう。







interface UserManager { User getUser(); void setName(String name); void setAge(int age); } UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe"); System.out.println(um.getUser());
      
      





同期のシングルスレッドの世界に住んでいた場合、このコードは、インスタンスの作成、ユーザー出力、一部のプロパティの変更、ユーザー出力など、まさに期待どおりの動作をします。







問題は、非同期性に頼り始めるときに発生します。 サーバー側のプロパティの変更を反映する必要があるとしましょう。 これを行うには、最後の2つのメソッドが非同期である必要があります。 その場合、どのようにコードを変更しますか?







解決策の1つは、何もしないことです。非同期サーバー更新の呼び出しが成功すると想定して、ローカルで変更を加えることができます。 それらは即座に反映されます。 ご存知のように、これは良い考えではありません。 ネットワークは予測不能であり、サーバーはエラーを返す可能性があるため、何らかの理由でローカル状態をロールバックする必要があります。







簡単な解決策は、非同期呼び出しが正常に完了したときに実行されるRunnable



を使用することです。 これは事後対応的な動作です。変更要求が成功したことが確実な場合にのみ、表示されたデータを更新します。







 interface UserManager { User getUser(); void setName(String name, Runnable callback); void setAge(int age, Runnable callback); } UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe", new Runnable() { @Override public void run() { System.out.println(um.getUser()); } });
      
      





ただし、発生する可能性のある問題(ネットワークの問題など)は処理しません。 エラーが発生した場合に何かできるように、特別なListener



作成する価値があるのでしょうか?







 UserManager um = new UserManager(); System.out.println(um.getUser()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error... } });
      
      





問題についてユーザーに通知できます。 自動的に再試行できます。 同様のソリューションが機能し、この方向では、非同期コードと単一スレッド(Androidの場合、これはUIスレッド)で実行されるコードを組み合わせる必要があります。







非同期呼び出しを行う必要があるほど、問題が多くなります。 たとえば、ユーザーがフォームに入力すると、いくつかのプロパティが変更されます。 または、ある呼び出しが正常に完了すると別の非同期呼び出しがトリガーされる場合に、一連の非同期呼び出しがあり、成功または失敗も伴う場合があります。







 UserManager um = new UserManager(); System.out.println(um.getUser()); um. setName(“Jane Doe”, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error… } }); um.setAge(40, new UserManager.Listener() { @Override public void success() { System.out.println(um.getUser()); } @Override public void failure(IOException e) { // TODO show the error… } });
      
      





これはすべてAndroidのコンテキストで行われることを忘れないでください。 したがって、他の多くの要因を考慮する必要があります。 たとえば、 success



コールバックで情報をUIに直接転送しようとすることができますが、問題はAndroidのActivity



が一時的であることです。 それらはいつでも破壊できます。 ユーザーが着信コールを受信したとしましょう-アプリケーションはシステムによって最小化されます。 または、ユーザーが[ Home



または[ Back



クリックした可能性があります。 UIの破棄後に非同期呼び出しが返される場合、問題が発生します。







 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { tv.setText(um.getUser().toString()); } @Override public void failure(IOException e) { // TODO show the error... } }); } }
      
      





問題を解決するための必須のアプローチがあります。 UIメソッドを呼び出す前にステータスを確認できます。







 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName(“Jane Doe”, new UserManager.Listener() { @Override public void success() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } } @Override public void failure(IOException e) { // TODO show the error… } }); } }
      
      





この例では、非同期呼び出しが完了するまでActivity



への参照を保持するため、短期的なメモリリークを明確に引き起こす匿名型を作成します。







問題は、これらのコールバックがどのスレッドで呼び出されるかわからないという事実にもあります。 バックグラウンドスレッドで呼び出される可能性があるため、イベントを実行のメインスレッド( main/UI thread



)に送信する必要があります。







 public final class UserActivity extends Activity { private final UserManager um = new UserManager(); @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.user); TextView tv = (TextView) findViewById(R.id.user_name); tv.setText(um.getUser().toString()); um.setName("Jane Doe", new UserManager.Listener() { @Override public void success() { runOnUiThread(new Runnable() { @Override public void run() { if (!isDestroyed()) { tv.setText(um.getUser().toString()); } } }); } @Override public void failure(IOException e) { // TODO show the error... } }); } }
      
      





コードによって解決されたメインタスクに関連しないものがたくさんあるActivity



が散らかっています。 そして、これはすべて非同期で作業を開始し、非同期の結果を処理することです。 非同期要求呼び出しを実装しました。 ユーザー入力をブロックせず、ボタンクリックを処理せず、複数のフィールドで動作しません。







コードが単純なタスクを1つだけ解決し、それを実際のアプリケーションに変え始めたとしても、すぐに問題が発生し、 Activity



状態とチェックの束を管理する必要に直面します。







リアクティブ思考



実際のアプリケーションでは、すべてが非同期に動作します。 リクエストを送信し、長い時間を経て回答を受け取るネットワークがあります。 メインの実行スレッドをブロックすることはできないため、ネットワークの操作はバックグラウンドスレッドで実行する必要があります。 ファイルシステム、データベース、リポジトリへの書き込み、さらにはshared preferences



への書き込み時にメインスレッドをブロックすることはできないため、これらの操作をバックグラウンドストリームで実行する必要があります。







ユーザーも非同期データソースのようなものです。 UIを介して情報を提供し、ボタンを押してフィールドにデータを入力することで、それに応答します。







ここに画像の説明を入力してください







ユーザーは、異なる時間にアプリケーションに戻ることができます。 また、アプリケーションはデータを受信する準備ができている必要があり、実行のメインスレッドがブロックされている状態がないようにリアクティブにする必要があります。 そのため、データの一部が非同期的に到着する状況はありませんが、アプリケーションはこれを予期せず、その結果、受信したデータを考慮せず、クラッシュさえしません。 これが困難です。 Activity



/ Fragment



でこれらの状態をすべて維持する必要があります。 多数の非同期ソースが、おそらく異なる速度でデータを生成および消費するという事実と調和させる必要があります。 また、非同期プラットフォームであるAndroid自体の作業については考慮していません。 プッシュ通知、ブロードキャスト、構成の変更があります。 ユーザーはいつでもデバイスをポートレートからランドスケープに、またはその逆に切り替えることができます。コードの準備が整っていない場合、アプリケーションはクラッシュしたり、正しく動作しません。







アプリケーションのアーキテクチャ全体の同期を保証することはできませんが、単一の非同期リソースがあると、従来の命令型プログラミングスタイルが崩れます。







ネットワーク要求を使用しないアプリケーションを見つけることは難しく、それらは本質的に非同期です。 ディスクがあり、データベースは非同期ソースです。 UIは、非同期ソースとしてのみ考慮されるべきです。 そのため、デフォルトでは、Androidのすべてが非同期に機能します。 従来の命令型プログラミングと状態管理の手法に固執すると、自分自身に害を及ぼします。







ここに画像の説明を入力してください







すべての非同期アーキテクチャ要素を調整しようとする代わりに、それらを直接接続することにより、この責任から解放されます。 UIをデータベースに直接署名して、データの変更に応答できるようにすることができます。 データベース呼び出しとネットワーク呼び出しを変更して、クリックを取得して送信するのではなく、ボタンのクリックに応答するようにすることができます。







私たちが受け取ったネットワーク応答がデータを更新するならば、それは素晴らしいでしょう。 結局、データが更新されると、UIは自動的に更新されます。 したがって、私たちはこれについて責任を負いません。 Androidが非同期で何かを行う場合(たとえば、画面の切り替えやブロードキャスト)、インターフェイスに自動的に反映されるか、バックグラウンドタスクを自動的に開始するのは素晴らしいことです。







ここに画像の説明を入力してください







一般に、このアプローチにより、状態をサポートするために必要な大量のコードを記述できなくなります。状態を管理する代わりに、コンポーネントを互いに接続するだけです。







Rxjava



RxJavaに渡します。 このリアクティブライブラリは、Java開発の最初のフル機能の[リアクティブ]ツールであったため、Android開発で最も人気がありました。 RxJava 2は、Androidの開発に重要な古いバージョンのJavaのサポートを保持しています。







RxJavaは以下を提供します。









ソース



データソースは、リッスンを開始または終了するときに何らかの作業を行います。 応答の待機を開始するまで送信されないネットワーク要求を送信します。 また、完了前にデータソースのサブスクリプションを解除すると、理論的にはネットワーク要求をキャンセルできます。







ソースは、同期および非同期の両方で動作できます。 たとえば、バックグラウンドスレッドで実行されているブロッキングネットワークリクエスト、またはAndroidを呼び出してonActivityResultを待機するような純粋に非同期なものです。 ソースは、単一のアイテムまたは複数のアイテムを生成できます。 ネットワーク要求は単一の応答を返します。 ただし、UIが機能している間は、1つのボタンにサブスクライブしていても、ボタンクリックのフローは無限に続く可能性があります。







他のソースが空になる場合があります。 これは、要素を含まず、作業が成功または失敗するデータソースの概念です。 明確にするために、データベースまたはファイルにデータを書き込んでいると想像してください。 彼らはあなたにアイテムを返しません。 記録は成功するかどうかのいずれかです。 RxJavaでは、ソースは、いわゆる端末イベントonComplete()/ onError()



を使用して、この「実行または失敗」アプローチをモデル化します。 これは、応答を返すか、例外をスローするメソッドに似ています。







完了していない可能性があります。 たとえば、UIが機能している限り機能するデータソースとして、ボタンの押下をシミュレートします。 そして、UIが消えると、おそらくボタンクリックのこのソースからサブスクライブを解除しますが、その作業は完了しません。







これはすべて、 Observer



パターンに対応しています。 データを生成できるものがあります。 このデータの表示方法については合意があります。 そして、私たちはそれらを見たいです。 リスナーを追加し、何かが発生したときに通知を受け取ります。







流動性vs. 観測可能



RxJava 2では、ソースは2つの主要なタイプ-FlowableとObservableで表されます。 それらは非常に似ています。 どちらもゼロからn個の要素を生成します。 両方とも成功または失敗する場合があります。 では、なぜ同じデータ構造を表すために2つの異なるタイプが必要なのでしょうか?







それはすべて背圧のようなものになります。 詳細に入ることなく、バックプレッシャーがデータソースの速度を低下させる可能性があるとしか言えません。 既存のシステムのリソースは限られています。 そして、バックプレッシャーの助けを借りて、データを送信するすべての人に、データが遅くなるように伝えることができます。







RxJava 1はバックプレッシャーをサポートしていましたが、APIの開発中にかなり遅れて追加されました。 RxJava 1では、システムの各タイプにバックプレッシャーメカニズムがあります。 バックプレッシャーの概念はすべてのタイプでサポートされていますが、すべてのソースがそれを実装しているわけではないため、このメカニズムを使用するとアプリケーションがクラッシュする可能性があります。 背圧アプリケーションは、事前に設計および検討する必要があります。 これが、RxJava 2に2種類のソースがある理由です。 そのため、バックプレッシャーをサポートするかどうかをソースタイプで指定できます。







画面タッチイベントというデータソースがあるとします。 遅くすることはできません。 ユーザーに「キャラクターの半分を描き、処理中に停止して待機し、残りを終了します」と伝えることはできません。 ボタンをオフにする、別のUIを表示するなど、別の方法でデータ入力を遅くすることはできますが、ソース自体を遅くすることはできません。







別の例を見てみましょう。一度に複数の行を抽出する必要がある行の大きなセットを含むデータベースがあります。 データベースは、 カーソルなどのツールのおかげで、この問題を非常に効果的に解決できます。 ただし、タッチイベントのフローの場合、ユーザーの指を遅くすることはできないため、これを実装することはできません。







RxJava 1では、上記の両方のタイプがObservableとして実装されているため、実行時にバックプレッシャーを適用しようとすると、 MissingBackpressureException



発生する場合があります。 これが、RxJava 2でさまざまなタイプでソースが表示される理由になりました。1つはバックプレッシャーをサポートし、もう1つはサポートしません。 Observable



Flowable



両方のタイプは、データをコールバックに転送するという点で同様に動作します。 これには2つの対応するインターフェイスがあります。







Observer









 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); }
      
      





そしてSubscriber









 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); }
      
      





最初のメソッドはonNext



と呼ばれ、要素はここに配信されます。 このメソッドは、 Observable



またはFlowable



が要素を生成するたびに呼び出され、任意に処理できるようにします。 これは無限に起こり得ます。 ボタンのクリックを聞くと、クリックするonNext



メソッドが呼び出されます。 無限のソースには、2つの端末イベントがあります。









onComplete



onError



は端末イベントです。つまり、いずれかを受信した後、ソースからイベントを受信することはなくなります。

Observer



インターフェースとSubscriber



インターフェースの違いは、最後のメソッドonSubscribe



です。 これは、RxJava 1と比較した新しい方法です。ObservableまたはFlowableをサブスクライブする場合、リソースを作成します。リソースの操作が終了したら、リソースをクリーンアップする必要があります。 onSubscribe



は、ObservableまたはonSubscribe



リッスンを開始するとすぐに呼び出され、 Disposable



の2つのタイプのオブジェクトを提供します。







 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); } interface Disposable { void dispose(); }
      
      





またはSubscription









 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void cancel(); void request(long r); }
      
      





Observable



に関してObservable



Disposable



タイプを使用すると、disposeメソッドを呼び出すことができます。つまり、「このリソースでの作業が終了しました。データは不要です」という意味です。 ネットワーク要求がある場合は、キャンセルできます。 ボタン押下の無限のストリームを聞いた場合、これはこれらのイベントを受信したくないことを意味します。その場合、 View



からOnClickListener



を削除できView









これはすべて、 Subscription



インターフェイスにも当てはまります。 呼び出し方法は異なりますが、まったく同じ方法で使用されますdispose()



似たcancel()



メソッドがあります。 これは、2番目のrequest(long r)



メソッドが存在することでのみ異なります。これにより、APIにbackpressure



が現れます。 このメソッドを使用して、 Flowable



さらに要素が必要であることを伝えます。







背圧サポート付き 背圧サポートなし
0 – n要素、 complete | error



complete | error



流動性 観測可能


したがって、これら2つのタイプの唯一の違いは、一方がバックプレッシャーをサポートし、もう一方がサポートしないことです。







ジェットストリーム



Disposable



タイプとSubscription



タイプの名前が異なる理由と、そのメソッドdispose()



cancel()



について触れたいと思います。 request()



メソッドを追加することで、一方だけを拡張できなかったのはなぜですか? ジェットストリームの仕様がすべてです。 これは、多数の企業が率先して取り組み、Javaリアクティブライブラリ用の標準インターフェイスセットを開発することを決定した結果です。 仕様には4つのインターフェイスが含まれます。







 interface Publisher<T> { void subscribe(Subscriber<? super T> s); } interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void request(long n); void cancel(); } interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
      
      





上記のコードでは、 Subscriber



Subscription



種類が表示されます。 これらは仕様の一部であり、したがって、これらの名前はRxJava 2で使用されていました。これらは標準の一部であるため、それに対してできることは何もありません。 しかし、この状況には良い面があります。 ストリームに2つの異なるライブラリを使用する必要があるとしましょう。 作成者が上記の標準を実装している場合、それらを安全に切り替えることができます。







リアクティブストリーム(バックプレッシャーサポート付き) 背圧サポートなし
0 ... n要素、 complete | error



complete | error



流動性 観測可能


Flowableタイプは、背圧のサポートを意味するリアクティブフローの仕様を実装します。







UserManager



戻ります。 以前はこのクラスからユーザーを抽出し、適切だと思ったときに表示していました。 これで、Observableを使用できます。







 interface UserManager { Observable<User> getUser(); void setName(String name); void setAge(int age); }
      
      





Observable<User>



は、Userオブジェクトのソースです。 すべての変更で要素を生成し、画面にデータを表示することでこれに対応できます。 これで、システムで発生する他のイベントに基づいて、これに最適な時間を決定する必要がなくなります。







特化したソース



RxJava 2には、 Observable



サブセットである3つの特殊なソースがあります。 最初のものはSingle



と呼ばれます。 単一の要素を含むか、エラーを生成するため、これは単一の要素の潜在的に非同期のソースほど要素のシーケンスではありません。 バックプレッシャーはサポートしていません。 通常の方法として想像できます。 メソッドを呼び出して戻り値を取得します。 メソッドが例外をスローします。 Single



実装するのはこのスキームです。 購読すると、アイテムまたはエラーが表示されます。 しかし同時に、 Single



はリアクティブです。







Completable



. void-. - , . , , , .







Maybe



. RxJava 1. , , — Optional. backpressure.







RxJava 2 , Single/ Completable/ Maybe



, backpressure



( Reactive Streams Specification).







( backpressure) backpressure
0…n , complete | error



Flowable Observable
item | complete | error



Maybe
item | error



Single
complete | error



Completable


 interface UserManager { Observable<User> getUser(); void setName(String name); void setAge(int age); }
      
      





setName



setAge



, , , . Completable



.







 interface UserManager { Observable<User> getUser(); Completable setName(String name); Completable setAge(int age); }
      
      







, , , . , .







 Flowable.just("Hello"); Flowable.just("Hello", "World"); Observable.just("Hello"); Observable.just("Hello", "World"); Maybe.just("Hello"); Single.just("Hello");
      
      





Iterable



.







 String[] array = { “Hello”, “World” }; List<String> list = Arrays.asList(array); Flowable.fromArray(array); Flowable.fromIterable(list); Observable.fromArray(array); Observable.fromIterable(list);
      
      





, , , , ( , ).







fromCallable



.







 Observable.fromCallable(new Callable<String>() { @Override public String call() { return getName(); } });
      
      





, . fromCallable Java- Callable



, , . , HTTP- -.







 OkHttpClient client = // … Request request = // … Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception{ return client.newCall(request).execute(); } });
      
      





Observable ( ) , , onError



. , onNext



.







fromCallable



:







 Flowable.fromCallable(() -> "Hello"); Observable.fromCallable(() -> "Hello"); Maybe.fromCallable(() -> "Hello"); Single.fromCallable(() -> "Hello"); Completable.fromCallable(() -> "Ignored!");
      
      





. , .







Maybe



Completable



. , , – , .







 Maybe.fromAction(()-> System.out.println(“Hello”)); Maybe.fromRunnable(()-> System.out.println(“Hello”)); Completable.fromAction(()-> System.out.println(“Hello”)); Completable.fromRunnable(()-> System.out.println(“Hello”));
      
      





, , Observable create



.







 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Hello"); e.onComplete(); } });
      
      





RxJava 1, – , RxJava 1. RxJava 2, create – , . , subscribe, . ObservableEmitter



, . ObservableEmitter



. , .







.







 Observable.create(e -> { e.onNext("Hello"); e.onComplete(); });
      
      





.







 Observable.create(e -> { e.onNext("Hello"); e.onNext("World"); e.onComplete(); });
      
      





onNext



.







— . , HTTP-, onNext



HTTP-.







 OkHttpClient client = // … Request request = // … Observable.create(e -> { Call call = client.newCall(request); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); } @Override public void onFailure(IOException e) { e.onError(e); } }); });
      
      





, Observable



reate



, , . - HTTP-, . HTTP- .







 Observable.create(e -> { Call call = client.newCall(request); e.setCancelation(() -> call.cancel()); call.enqueue(new Callback() { @Override public void onResponse(Response r) throws IOException { e.onNext(r.body().string()); e.onComplete(); }A @Override public void onFailure(IOException e) { e.onError(e); } }); });
      
      





Android . , Observable



, , Listener



, .







 View view = // … Observable.create(e -> { e.setCancellation(() -> view.setOnClickListener(null)); view.setOnClickListener(v -> e.onNext(v)); });
      
      





create



:







 Flowable.create(e -> { … }); Observable.create(e -> { … }); Maybe.create(e -> { … }); Single.create(e -> { … }); Completable.create(e -> { … });
      
      







onSubscribe



Observer



/ Subscriber



.







Observer



:







 interface Observer<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Disposable d); } interface Disposable { void dispose(); }
      
      





Subscriber



:







 interface Subscriber<T> { void onNext(T t); void onComplete(); void onError(Throwable t); void onSubscribe(Subscription s); } interface Subscription { void cancel(); void request(long r); }
      
      





, Observer/ Subscriber



, subscribe. - onSubscribe



– - Disposable



/ Subscription



.







 Observable<String> o = Observable.just(“Hello”); o.subscribe(new Observer<String>() { @Override public void onNext(Sring s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } @Override public void onSubscribe(Disposable d) { ??? } });
      
      





DisposableObserver



, , Observable



.







 Observable<String> o = Observable.just("Hello"); o.subscribe(new DisposableObserver<String>() { @Override public void onNext(String s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } });
      
      





? .

DisposableObserver



Observer



. Disposable



, dispose, .







 Observable<String> o = Observable.just(“Hello”); DisposableObserver observer = new DisposableObserver<String>() { @Override public void onNext(Sring s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } } o.subscribe(observer); observer.dispose();
      
      





RxJava 2 subscribeWith



, subscribe



RxJava 1. Disposable



.







 Observable<String> o = Observable.just(“Hello”); Disposable d = new o.subscribeWith(new DisposableObserver<String>() { @Override public void onNext(String s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } }); d.dispose();
      
      





RxJava Disposable



: , Disposable



, CompositeDisposable



.







 Observable<String> o = Observable.just(“Hello”); CompositeDisposable disposables = new CompositeDisposable(); disposables.add(o.subscribeWith(new DisposableObserver<String>() { @Override public void onNext(Sring s) { … } @Override public void onComplete() { … } @Override public void onError(Throwable t) { … } })); disposables.dispose();
      
      





Android , CompositeDisposable



Activity



, onDestroy



( -).







subscribeWith



backpressure.







 Observable<String> o = Observable.just(“Hello”); Disposable d2 = o.subscribeWith(new DisposableObserver<String>() { … }); Maybe<String> m = Maybe.just(“Hello”); Disposable d3 = m.subscribeWith(new DisposableMaybeObserver<String>() { … }); Single<String> s = String.just(“Hello”); Disposable d4 = s.subscribeWith(new DisposableSingleObserver<String>() { … }); Completable c = Completable.completed(); Disposable d5 = c.subscribeWith(new Disposable Completable Observer<String>() { … });
      
      





Flowable



subscribeWith



, , Flowable



onSubscribe Subscription



, Disposable



.







 Flowable<String> f = Flowable.just("Hello"); Disposable d1 = f.subscribeWith(new DisposableSubscriber<String>() { … });
      
      





Disposable



, Flowable



.







, , . , - . , . Disposable



Observable, .







オペレーター



:









.

, , . , toUppercase()



.







 String greeting = “Hello”; String yelling = greeting.toUppercase();
      
      





Observable



.







 Observable<String> greeting = Observable.just("Hello"); Observable<String> yelling = greeting.map(s -> s.toUppercase());
      
      





map



. - , .







User



: , , . , , .







 Observable<User> user = um.getUser(); Observable<User> mainThreadUser = user.observeOn(AndroidSchedulers.mainThread());
      
      





: « Observable



». .







observeOn , Observable



.







 OkHttpClient client = // … Request request = // … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }); Observable<Response> backgroundResponse = response.subscribeOn(Schedulers.io());
      
      





. , , . , ( ), . , Schedulers.io()



— . , . subscribeOn



— , .







, Observable



. , . – . . . . . .







 OkHttpClient client = // … Request request = // … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(response -> response.body().string()); // NetworkOnMainThread!
      
      





map observeOn



, Android. HTTP- – , .







 OkHttpClient client = // … Request request = // … Observable<Response> response = Observable.fromCallable(() -> { return client.newCall(request).execute(); }) .subscribeOn(Schedulers.io()) .map(response -> response.body().string()); // Ok! .observeOn(AndroidSchedulers.mainThread())
      
      







RxJava , Observable



. , first()



, . RxJava 1 Observable



, . : , get(0)



, , , , – . RxJava 2 : first()



, , Single



.







ここに画像の説明を入力してください







Observable



, , Single



, .







ここに画像の説明を入力してください







firstElement()



, Maybe



. Observable



Maybe



.







ここに画像の説明を入力してください







, Completable



. , , ignoreElements



.







ここに画像の説明を入力してください







Flowable



: , .







.







ここに画像の説明を入力してください







«» . , , , «» Single



. «» . , Single



Observable



.









, User : « , UI ». User', , .







 um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void onNext(User user) { tv.setText(user.toString()); } @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } }));
      
      





, - Disposable. Android, Activity



. onDestroy



Disposables



.







 // onCreate disposables.add(um.getUser() .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableObserver<User>() { @Override public void onNext(User user) { tv.setText(user.toString()); } @Override public void onComplete() { /* ignored */ } @Override public void onError(Throwable t) { /* crash or show */ } })); // onDestroy disposables.dispose();
      
      





, , , . – . .







 disposables.add(um.setName("Jane Doe") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onComplete() { // success! re-enable editing } @Override public void onError(Throwable t) { // retry or show } }));
      
      





: . Disposable



. Disposable



.







RxJava 2 , Android: . , Observable



, . map, Observable



, , .







. RxJava 2 . , . , , – API.







おわりに



RxJava 2 : — , Android, , UI — , , .







RxJava 1, , . RxJava 2.







 class RxJavaInterop { static <T> Flowable<T> toV2Flowable(rx.Observable<T> o) { … } static <T> Observable<T> toV2Observable(rx.Observable<T> o) { … } static <T> Maybe<T> toV2Maybe(rx.Single<T> s) { … } static <T> Maybe<T> toV2Maybe(rx.Completable c) { … } static <T> Single<T> toV2Single(rx.Single<T> s) { … } static Completable toV2Completable(rx.Completable c) { … } static <T> rx.Observable<T> toV1Observable(Publisher<T> p) { … } static <T> rx.Observable<T> toV1Observable(Observable<T> o, …) { … } static <T> rx.Single<T> toV1Single(Single<T> o) { … } static <T> rx.Single<T> toV1Single(Maybe<T> m) { … } static rx.Completable toV1Completable(Completable c) { … } static rx.Completable toV1Completable(Maybe<T> m) { … } }
      
      





— . , . , - .








All Articles