Rxの根底にある主なアイデアの1つは、シーケンスが新しい値または終了をいつ生成するかが正確にわからないということです。 ただし、これらの値の取得を開始または終了する時間を制御する機能はあります。 さらに、サブスクライバーが外部リソースを使用する場合、特定のシーケンスの最後に外部リソースを解放することをお勧めします。
内容:
- パート1-はじめに
- パート2-シーケンス
- シーケンスを作成する
- シーケンスフィルタリング
- リサーチ
- 集計
- 変換
- パート3-シーケンス管理
- パート4-並行性
定期購読
同じ機能を実行するオーバーロードされたObservable :: subscribeメソッドがいくつかあります。
Subscription subscribe() Subscription subscribe(Action1<? super T> onNext) Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError) Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete) Subscription subscribe(Observer<? super T> observer) Subscription subscribe(Subscriber<? super T> subscriber)
subscribe()はイベントを吸収しますが、それ自体では即時のアクションを実行しません。 タイプがActionのパラメーターを少なくとも1つ持っているオーバーロードバージョンは、 Subscriberオブジェクトを作成します。 onErrorおよびonCompletedイベントに関数を渡さない場合、それらは単に無視されます。
Subject<Integer, Integer> s = ReplaySubject.create(); s.subscribe( v -> System.out.println(v), e -> System.err.println(e)); s.onNext(0); s.onError(new Exception("Oops"));
おわりに
0 java.lang.Exception: Oops
エラーを処理するための関数を渡さないと、プロバイダー側でs.onErrorが発生した場所でOnErrorNotImplementedExceptionがスローされます。 この場合、プロバイダー[1]とコンシューマ[2]は同じコードブロックにあり、従来のtry-catchを使用できます。 ただし、実際には、プロバイダーとコンシューマーは異なる場所にいる場合があります。 この場合、コンシューマーがエラーを処理する機能を提供しない場合、いつ、どのような理由でシーケンスが終了したかを知ることはありません。
登録解除
シーケンスが終了する前にデータの受信を停止できます。 subscribeメソッドのすべてのオーバーロードは、2つのメソッドを持つSubscribtionインターフェイスオブジェクトを返します。
boolean isUnsubscribed() void unsubscribe()
購読解除コールは、イベントがオブザーバに送信されるのを停止します。
Subject<Integer, Integer> values = ReplaySubject.create(); Subscription subscription = values.subscribe( v -> System.out.println(v), e -> System.err.println(e), () -> System.out.println("Done") ); values.onNext(0); values.onNext(1); subscription.unsubscribe(); values.onNext(2);
おわりに
0 1
1人のサブスクライバーのサブスクリプションを解除することにより、同じovbservableの他のサブスクライバーに影響を与えません。
Subject<Integer, Integer> values = ReplaySubject.create(); Subscription subscription1 = values.subscribe( v -> System.out.println("First: " + v) ); Subscription subscription2 = values.subscribe( v -> System.out.println("Second: " + v) ); values.onNext(0); values.onNext(1); subscription1.unsubscribe(); System.out.println("Unsubscribed first"); values.onNext(2);
おわりに
First: 0 Second: 0 First: 1 Second: 1 Unsubscribed first Second: 2
onErrorおよびonCompleted
onErrorとonCompletedは、シーケンスの完了を意味します。 Rx契約に続く真正なオブザーバブルは、これらのイベントのいずれかが発生した後に値の発行を停止します。 これは、独自のObservableを作成するときに常に覚えておく必要があるものです。
Subject<Integer, Integer> values = ReplaySubject.create(); Subscription subscription1 = values.subscribe( v -> System.out.println("First: " + v), e -> System.out.println("First: " + e), () -> System.out.println("Completed") ); values.onNext(0); values.onNext(1); values.onCompleted(); values.onNext(2);
おわりに
First: 0 First: 1 Completed
リソースのリリース
サブスクリプションは、関連付けられているリソースをメモリに保持します。 これらのリソースは、 Subscriptionオブジェクトが範囲外になると自動的に解放されません。 サブスクライブメソッドを呼び出した後に戻り値を無視した場合、 サブスクライブを解除する唯一の機会を失うリスクがあります。 サブスクリプションは引き続き存在しますが、サブスクリプションへのアクセスは失われ、メモリリークと不要なアクションが発生する可能性があります。
例外があります。 たとえば、 Subscriberオブジェクトを暗黙的に構築するオーバーロードされたサブスクライブメソッドを呼び出すと、シーケンスが終了したときにサブスクライバーを自動的に解放するメカニズムが作成されます。 ただし、この場合でも、無限のシーケンスに留意する必要があります。 ある時点でそれらからのデータの受信を停止するには、引き続きSubscriptionオブジェクトが必要です。
Subscriptionインターフェースにはいくつかの実装があります。
- ブールサブスクリプション
- CompositeSubscription
- MultipleAssignmentSubscription
- RefCountSubscription
- SafeSubscriber
- Scheduler.Worker
- SerializedSubscriber
- SerialSubscription
- 加入者
- テスト購読者
今後の記事でそれらに会います。 SubscriberはSubscriptionも実装していることは注目に値します。つまり、 Subscriberオブジェクトへのリンクを使用して、 購読を解除することもできます。
サブスクリプションのライフサイクルを理解することで、それらに関連付けられたリソースを制御できます。 これにより、プログラムの予測性、保守性、拡張性が向上し、バグが発生しにくくなることが期待されます。
次のパートでは、強力なライブラリツールを使用してシーケンスを作成および処理する方法を学習します。
[1]オブザーバブルを制御(作成)する人-注。
[2] observableが提供する値を使用する人-注。