RxJavaの概要:シーケンスの作成

画像

Rxの基本原則を理解したので、シーケンスを作成および管理する方法を学習します。 シーケンス管理スタイルは、関数型プログラミングに触発された元のC# LINQから借用されました。 すべての操作をトピックで分割します。トピックは操作の複雑さの順に並べられています。 ほとんどのRxオペレーターは既存のシーケンスを管理しますが、最初にそれらを作成する方法を学びます。







内容
内容:




パート2-シーケンスの基本



シーケンスを作成する



以前はSubject



を使用し、値を手動で渡してシーケンスを作成しました。 これは、基本的なRx subscribe



メソッドなど、いくつかの重要なポイントを示すために行いました。 ほとんどの場合、 Subject



は新しいObservable



を作成する最良の方法ではありません。 このセクションでは、これを行うよりエレガントな方法を見ていきます。







シンプルなファクトリーメソッド



観察可能



Observable



作成するjust



、所定数の値が返されて完了します。







 Observable<String> values = Observable.just("one", "two", "three"); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





おわりに







 Received: one Received: two Received: three Completed
      
      





Observable.empty



このObservable



onCompleted



イベントのみをonCompleted



、それ以外は何もonCompleted



しません。







 Observable<String> values = Observable.empty(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





おわりに







 Completed
      
      





Observable.never



このObservable



は何も発行しません。







 Observable<String> values = Observable.never(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





上記のコードは何も出力しません。 しかし、これはプログラムがブロックされているという意味ではありません。 実際、それは即座に終了します。







Observable.error



このObservable



はonErrorイベントをスローして終了します。







 Observable<String> values = Observable.error(new Exception("Oops")); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





おわりに







 Error: java.lang.Exception: Oops
      
      





Observable.defer



defer



は新しいObservable



作成しませんが、サブスクライバーが表示されたときにObservable



を作成する方法を決定できます。 現在の時刻を表示するObservable



を作成する方法を考えてください。 値が1つしかないため、ここで役立つことがあるようです。







 Observable<Long> now = Observable.just(System.currentTimeMillis()); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println);
      
      





おわりに







 1431443908375 1431443908375
      
      





1秒後にサインアップする2番目のサブスクライバーが同じ時間を受信したことに注意してください。 これは、時間値が一度だけ計算されたためです:実行がjust



メソッドに達したとき。 ただし、この場合、各サブスクリプションの現在時刻を計算します。 defer



Observable



を返す関数を受け入れ、新しいサブスクライバーごとに実行されます。







 Observable<Long> now = Observable.defer(() -> Observable.just(System.currentTimeMillis())); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println);
      
      





おわりに







 1431444107854 1431444108858
      
      





Observable.create



create



Observable



を作成するための非常に強力な方法です。







 static <T> Observable<T> create(Observable.OnSubscribe<T> f)
      
      





すべてが見た目よりもはるかに単純です。 内部は、タイプT



Subscriber



を受け入れる単なる関数ですT



その中で、サブスクライバーに発行されるイベントを手動で決定できます。







 Observable<String> values = Observable.create(o -> { o.onNext("Hello"); o.onCompleted(); }); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





おわりに







 Received: Hello Completed
      
      





誰かがObservable



(この場合はvalues



)をサブスクライブすると、 Subscriber



の対応するインスタンスがcreate



関数に渡されます。 コードが実行されると、値がサブスクライバーに渡されます。 シーケンスの終了を通知するには、自分でonCompleted



メソッドを呼び出す必要があることに注意してください。







このメソッドは、他のメソッドがどれも機能しない場合にObservable



を作成するための推奨される方法です。 これはSubject



を作成して手動で値を渡す方法と似ていますが、いくつかの重要な違いがあります。 まず、イベントソースはきちんとカプセル化され、他のコードから分離されます。 第二に、 Subject



明らかな危険があります。オブジェクトにアクセスできる人は誰でもシーケンスを変更できます。 後でこの問題に戻ります。







Subject



を使用することとのもう1つの重要な違いは、新しいサブスクライバーが到着したときにのみコードが「怠lazに」実行されることです。 上記の例では、コードはObservable



された時点では実行されません (まだサブスクライバーがないため)が、 subscribe



メソッドが呼び出された時点では実行されます。 これは、 ReplaySubject



ように、各サブスクライバーの値が再計算されることを意味します。 最終結果は、キャッシュを除いてReplaySubject



に似ています。 create



実行を別のスレッドに簡単に転送することもできますが、 ReplaySubject



、値を計算するために手動でスレッドを作成する必要があります。 また、 onSubscribe



メソッドを並行してonSubscribe



する方法も検討します。







以前のObservable



いずれもObservable



を使用して実装できることにお気づきかもしれません。 create



例はObservable.just("hello")



と同等です。







機能的方法



関数型プログラミングでは、通常は無限のシーケンスを作成します。







Observable.range



機能プログラマー向けのシンプルで馴染みのある方法。 指定された範囲から値を返します。







 Observable<Integer> values = Observable.range(10, 15);
      
      





この例では、10から24までの値を順番に返します。







Observable.interval



この関数は、指定された時間間隔で区切られた値の無限シーケンスを作成します。







 Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); System.in.read();
      
      





おわりに







 Received: 0 Received: 1 Received: 2 Received: 3 ...
      
      





配信を停止するまで、シーケンスは終了しません。







例の最後で入力をブロックする必要がある理由に注意する必要があります。 これがないと、プログラムは何も印刷せずに終了します。 これは、すべての操作が非ブロッキングであるためですObservable



定期的に作成し、これらの値が到着したときに何らかのアクションを実行するサブスクライバーを登録します。 このいずれも、メインスレッドの終了をブロックしません。







Observable.timer



Observable.timer



は2つのオーバーロードがあります。 最初のオプションは、一定期間後に0L



を発行するObservable



を作成します。







 Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





おわりに







 Received: 0 Completed
      
      





2番目のオプションは、事前に決められた期間を想定してから、指定された頻度のinterval



と同じ方法で値の生成を開始します。







 Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





おわりに







 Received: 0 Received: 1 Received: 2 ...
      
      





上記の例は2秒待機してから、1秒ごとにカウントを開始します。







観測可能になる



Javaには、シーケンス、コレクション、および非同期イベントを操作するためのツールがありますが、これらはRxと直接互換性がない場合があります。 次に、これらをRxコードの入力データに変換する方法を見ていきます。







EventHandlerを使用する場合、 Observable.create



を使用して、イベントのシーケンスを作成できます。







 Observable<ActionEvent> events = Observable.create(o -> { button2.setOnAction(new EventHandler<ActionEvent>() { @Override public void handle(ActionEvent e) { o.onNext(e) } }); })
      
      





特定のイベントに応じて、そのタイプ(この場合はActionEvent



)自体がObservable



タイプになるのに十分な情報を運ぶことができます。 ただし、非常に多くの場合、たとえばイベント発生時の特定のフィールドの値など、何か他のものが必要になる場合があります。 UIスレッドがブロックされ、フィールド値が関連している間に、ハンドラー内でそのようなフィールドの値を取得することが最善です。 そして、最終加入者に到達するまで値が変更されないという保証はありませんが、正しく実装されたRxコードでは、変更は消費者側で制御されます[1]。







Observable.from



create



を使用して、任意の入力をObservable



変換できcreate



。 ただし、一般的なデータ型の場合、このプロセスを容易にするために設計された既製のメソッドがあります。







Future



はJavaの一部であり、マルチスレッドフレームワークで作業しているときに遭遇したに違いありません。 1つの値しか返さないため、Rxよりも強力なマルチスレッドツールではありません。 通常、それらをObservable



に変換します。







 FutureTask<Integer> f = new FutureTask<Integer>(() -> { Thread.sleep(2000); return 21; }); new Thread(f).start(); Observable<Integer> values = Observable.from(f); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





おわりに







 Received: 21 Completed
      
      





Observable



FutureTask



準備完了の結果をFutureTask



完了します。 タスクがキャンセルされた場合、observableはjava.util.concurrent.CancellationException



エラーをスローします。







限られた時間だけFuture



の結果に興味がある場合、引数としてタイムアウトを設定することができます。







 Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
      
      





この間にFuture



が完了しない場合、observableは結果を無視しTimeoutException



TimeoutException









Observable.from



を使用すると、コレクションをシーケンスに変換できます。 Observable



が作成され、コレクションの各要素が個別にonCompleted



れ、最後にonCompleted



が発行されます。







 Integer[] is = {1,2,3}; Observable<Integer> values = Observable.from(is); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
      
      





おわりに







 Received: 1 Received: 2 Received: 3 Completed
      
      





Observable



Iterable



Stream



と同じではありません。 Observable



プッシュ指向onNext



を呼び出すと、最後のsubscribe



メソッドまで実行するハンドラーのスタックが呼び出されるという意味で。 残りのモデルはプル指向です-一方、モデル内の値は要求され、結果が返されるまで実行はブロックされます。







[1] 消費者Observable



によって与えられた価値を吸収する消費者







現在、プロジェクトには独自のパブリックリポジトリがあり、誰でもRxの詳細なロシア語チュートリアルの作成に参加できます この部分の翻訳すでにそこにあり、残りはすぐに表示され、あなたの助けを借りて、さらに速くなります。








All Articles