Rxの基本原則を理解したので、シーケンスを作成および管理する方法を学習します。 シーケンス管理スタイルは、関数型プログラミングに触発された元のC# LINQから借用されました。 すべての操作をトピックで分割します。トピックは操作の複雑さの順に並べられています。 ほとんどのRxオペレーターは既存のシーケンスを管理しますが、最初にそれらを作成する方法を学びます。
内容:
- パート1-はじめに
- パート2-シーケンス
- シーケンスを作成する
- シーケンスフィルタリング
- リサーチ
- 集計
- 変換
- パート3-シーケンス管理
- パート4-並行性
パート2-シーケンスの基本
シーケンスを作成する
以前は 、 Subject
を使用し、値を手動で渡してシーケンスを作成しました。 これは、基本的なRx subscribe
メソッドなど、いくつかの重要なポイントを示すために行いました。 ほとんどの場合、 Subject
は新しいObservable
を作成する最良の方法ではありません。 このセクションでは、これを行うよりエレガントな方法を見ていきます。
シンプルなファクトリーメソッド
観察可能
Observable
作成するjust
、所定数の値が返されて完了します。
Observable<String> values = Observable.just("one", "two", "three"); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: one Received: two Received: three Completed
Observable.empty
このObservable
はonCompleted
イベントのみをonCompleted
、それ以外は何もonCompleted
しません。
Observable<String> values = Observable.empty(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Completed
Observable.never
このObservable
は何も発行しません。
Observable<String> values = Observable.never(); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
上記のコードは何も出力しません。 しかし、これはプログラムがブロックされているという意味ではありません。 実際、それは即座に終了します。
Observable.error
このObservable
はonErrorイベントをスローして終了します。
Observable<String> values = Observable.error(new Exception("Oops")); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Error: java.lang.Exception: Oops
Observable.defer
defer
は新しいObservable
作成しませんが、サブスクライバーが表示されたときにObservable
を作成する方法を決定できます。 現在の時刻を表示するObservable
を作成する方法を考えてください。 値が1つしかないため、ここで役立つことがあるようです。
Observable<Long> now = Observable.just(System.currentTimeMillis()); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println);
1431443908375 1431443908375
1秒後にサインアップする2番目のサブスクライバーが同じ時間を受信したことに注意してください。 これは、時間値が一度だけ計算されたためです:実行がjust
メソッドに達したとき。 ただし、この場合、各サブスクリプションの現在時刻を計算します。 defer
はObservable
を返す関数を受け入れ、新しいサブスクライバーごとに実行されます。
Observable<Long> now = Observable.defer(() -> Observable.just(System.currentTimeMillis())); now.subscribe(System.out::println); Thread.sleep(1000); now.subscribe(System.out::println);
1431444107854 1431444108858
Observable.create
create
はObservable
を作成するための非常に強力な方法です。
static <T> Observable<T> create(Observable.OnSubscribe<T> f)
すべてが見た目よりもはるかに単純です。 内部は、タイプT
Subscriber
を受け入れる単なる関数ですT
その中で、サブスクライバーに発行されるイベントを手動で決定できます。
Observable<String> values = Observable.create(o -> { o.onNext("Hello"); o.onCompleted(); }); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: Hello Completed
誰かがObservable
(この場合はvalues
)をサブスクライブすると、 Subscriber
の対応するインスタンスがcreate
関数に渡されます。 コードが実行されると、値がサブスクライバーに渡されます。 シーケンスの終了を通知するには、自分でonCompleted
メソッドを呼び出す必要があることに注意してください。
このメソッドは、他のメソッドがどれも機能しない場合にObservable
を作成するための推奨される方法です。 これはSubject
を作成して手動で値を渡す方法と似ていますが、いくつかの重要な違いがあります。 まず、イベントソースはきちんとカプセル化され、他のコードから分離されます。 第二に、 Subject
明らかな危険があります。オブジェクトにアクセスできる人は誰でもシーケンスを変更できます。 後でこの問題に戻ります。
Subject
を使用することとのもう1つの重要な違いは、新しいサブスクライバーが到着したときにのみコードが「怠lazに」実行されることです。 上記の例では、コードはObservable
された時点では実行されません (まだサブスクライバーがないため)が、 subscribe
メソッドが呼び出された時点では実行されます。 これは、 ReplaySubject
ように、各サブスクライバーの値が再計算されることを意味します。 最終結果は、キャッシュを除いてReplaySubject
に似ています。 create
実行を別のスレッドに簡単に転送することもできますが、 ReplaySubject
、値を計算するために手動でスレッドを作成する必要があります。 また、 onSubscribe
メソッドを並行してonSubscribe
する方法も検討します。
以前のObservable
いずれもObservable
を使用して実装できることにお気づきかもしれません。 create
例はObservable.just("hello")
と同等です。
機能的方法
関数型プログラミングでは、通常は無限のシーケンスを作成します。
Observable.range
機能プログラマー向けのシンプルで馴染みのある方法。 指定された範囲から値を返します。
Observable<Integer> values = Observable.range(10, 15);
この例では、10から24までの値を順番に返します。
Observable.interval
この関数は、指定された時間間隔で区切られた値の無限シーケンスを作成します。
Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") ); System.in.read();
Received: 0 Received: 1 Received: 2 Received: 3 ...
配信を停止するまで、シーケンスは終了しません。
例の最後で入力をブロックする必要がある理由に注意する必要があります。 これがないと、プログラムは何も印刷せずに終了します。 これは、すべての操作が非ブロッキングであるためですObservable
定期的に作成し、これらの値が到着したときに何らかのアクションを実行するサブスクライバーを登録します。 このいずれも、メインスレッドの終了をブロックしません。
Observable.timer
Observable.timer
は2つのオーバーロードがあります。 最初のオプションは、一定期間後に0L
を発行するObservable
を作成します。
Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: 0 Completed
2番目のオプションは、事前に決められた期間を想定してから、指定された頻度のinterval
と同じ方法で値の生成を開始します。
Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: 0 Received: 1 Received: 2 ...
上記の例は2秒待機してから、1秒ごとにカウントを開始します。
観測可能になる
Javaには、シーケンス、コレクション、および非同期イベントを操作するためのツールがありますが、これらはRxと直接互換性がない場合があります。 次に、これらをRxコードの入力データに変換する方法を見ていきます。
EventHandlerを使用する場合、 Observable.create
を使用して、イベントのシーケンスを作成できます。
Observable<ActionEvent> events = Observable.create(o -> { button2.setOnAction(new EventHandler<ActionEvent>() { @Override public void handle(ActionEvent e) { o.onNext(e) } }); })
特定のイベントに応じて、そのタイプ(この場合はActionEvent
)自体がObservable
タイプになるのに十分な情報を運ぶことができます。 ただし、非常に多くの場合、たとえばイベント発生時の特定のフィールドの値など、何か他のものが必要になる場合があります。 UIスレッドがブロックされ、フィールド値が関連している間に、ハンドラー内でそのようなフィールドの値を取得することが最善です。 そして、最終加入者に到達するまで値が変更されないという保証はありませんが、正しく実装されたRxコードでは、変更は消費者側で制御されます[1]。
Observable.from
create
を使用して、任意の入力をObservable
変換できcreate
。 ただし、一般的なデータ型の場合、このプロセスを容易にするために設計された既製のメソッドがあります。
Future
はJavaの一部であり、マルチスレッドフレームワークで作業しているときに遭遇したに違いありません。 1つの値しか返さないため、Rxよりも強力なマルチスレッドツールではありません。 通常、それらをObservable
に変換します。
FutureTask<Integer> f = new FutureTask<Integer>(() -> { Thread.sleep(2000); return 21; }); new Thread(f).start(); Observable<Integer> values = Observable.from(f); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: 21 Completed
Observable
FutureTask
準備完了の結果をFutureTask
完了します。 タスクがキャンセルされた場合、observableはjava.util.concurrent.CancellationException
エラーをスローします。
限られた時間だけFuture
の結果に興味がある場合、引数としてタイムアウトを設定することができます。
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
この間にFuture
が完了しない場合、observableは結果を無視しTimeoutException
をTimeoutException
。
Observable.from
を使用すると、コレクションをシーケンスに変換できます。 Observable
が作成され、コレクションの各要素が個別にonCompleted
れ、最後にonCompleted
が発行されます。
Integer[] is = {1,2,3}; Observable<Integer> values = Observable.from(is); Subscription subscription = values.subscribe( v -> System.out.println("Received: " + v), e -> System.out.println("Error: " + e), () -> System.out.println("Completed") );
Received: 1 Received: 2 Received: 3 Completed
Observable
はIterable
やStream
と同じではありません。 Observable
プッシュ指向onNext
を呼び出すと、最後のsubscribe
メソッドまで実行するハンドラーのスタックが呼び出されるという意味で。 残りのモデルはプル指向です-一方、モデル内の値は要求され、結果が返されるまで実行はブロックされます。
[1] 消費者 、 Observable
によって与えられた価値を吸収する消費者
現在、プロジェクトには独自のパブリックリポジトリがあり、誰でもRxの詳細なロシア語チュートリアルの作成に参加できます 。 この部分の翻訳はすでにそこにあり、残りはすぐに表示され、あなたの助けを借りて、さらに速くなります。