ReactiveX 2.0とサンプル、またはロッキングリアクティブプログラミング2.0。 パート1:観測可能vs流動可能、背圧

画像

こんにちは、私の名前はアレックスです。 Kotlinでバックエンドを作成し、Androidアプリケーションも開発しています。 私は長い間苦しんでいました。CallBackHell、Androidの必須のスタイル、スレッド同期、およびその他の古典的なJavaの問題に苦しみました。 それは大きな痛みでした。 そして、私は何らかの形でこの痛みを取り除くための解決策を探し始めました。 そして、幸運なチャンスが訪れます-RxJavaの誇大広告に出会います。 試みたが、私はこの日に止まらない。 この記事の執筆時点では、RxJava 2.0がリリースされており、イノベーションを理解したいという強い要望がありました。 公式ソースのGithub Wikiには、RxJava 2.0のヘッド:2.0の違いが掲載されています。 しかし、残念ながら、私は英語が流notではなく、そのような重要なドックを読むのに時間がかかりました。 いくつかのメモが蓄積され、共有したいコンセプトが登場しました。 しかし、「アートスペースのアートディレクター」にならず、平凡な翻訳ではなく、利益をもたらすために、この記事はRxKotlinユーザーケースの実際の例で味付けされたチュートリアルとwikiの翻訳の混合物になります。







WebアプリケーションとAndroidアプリケーションを開発するアプローチは異なり、Rxを使用するコンテキストも異なるため、Android開発のコンテキストで説明します。 興味のある方は、猫の下でお願いします。







簡単な紹介



始める前に、豪華な一連の記事を読むことをお勧めします









また、 この記事も気に入りました。 これも読むことをお勧めします。







RxJava 2.0は、Reactive Streams仕様の上にゼロから完全に書き直されました。 仕様自体はRxJava 1.xから発展し、リアクティブシステムとライブラリの共通のベースラインを提供します。







Reactive-Streamsのアーキテクチャは異なるため、よく知られているRxJavaのいくつかのタイプを変更できます。 いくつかの記事では、変更点を要約し、1.xコードを2.xコードに書き換える方法を説明します。







ヌルポインター傍受



はい、RxJava 2.xはNULL値を受け入れなくなり、次のコードはすぐにNullPointerException



受け取るか、 Emitter



(生成ストリーム)がonErrorイベントをスローします。







 class NullPointerActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) var obsJust: TextView? = null var singleJust: TextView? = null var callable: TextView? = null var nullMaping: TextView? = null verticalLayout { obsJust = textView() singleJust = textView() callable = textView() nullMaping = textView() } try { Observable.just(null).subscribe() } catch (e: Exception) { obsJust?.text = e.localizedMessage e.printStackTrace() } try { Single.just(null).subscribe() } catch (e: Exception) { singleJust?.text = e.localizedMessage e.printStackTrace() } Observable.fromCallable{null}.subscribe({Log.d("NullPointerActivity", it)}, { callable?.text = it.localizedMessage it.printStackTrace() }) Observable.just(1).map{null}.subscribe({Log.d("NullPointerActivity", it)}, { nullMaping?.text = it.localizedMessage it.printStackTrace() }) } }
      
      





つまり、 Observable<Void>



はイベントを生成できなくなり、ストリームは、ホットまたはコールドに関係なく、 onComplete



またはonError



のみ終了できます。 まあ、場合によっては、 Single.just(null)



およびObservable.just(null)



発生するように、ストリームで値をラップする前に単に例外をスローすることができObservable.just(null)









カスタムObservable



設計/実装する場合、 ObservableEmitter<Any>



を置き換えるObservableEmitter<Any>



に特定の型を定義する必要はありません。 たとえば、信号機に似たソースが必要な場合は、 Enumを定義して、そのシングルトンをonNext



スローできます。







 enum class Irrelevant { INSTANCE; } val source = Observable.create<Any> {emitter -> Log.d(TAG, "Side-effect 1") emitter.onNext(Irrelevant.INSTANCE) Log.d(TAG, "Side-effect 2") emitter.onNext(Irrelevant.INSTANCE) Log.d(TAG, "Side-effect 3") emitter.onNext(Irrelevant.INSTANCE) } source.subscribe({Log.d(TAG, it.toString())}, Throwable::printStackTrace)
      
      





その結果、予期されていたものがログに記録されます。







D / NullPointerActivity:副作用1

D / NullPointerActivity:INSTANCE

D / NullPointerActivity:副作用2

D / NullPointerActivity:INSTANCE

D / NullPointerActivity:副作用3

D / NullPointerActivity:INSTANCE


観察可能vs. 流動性



残念ながら、Rx 2.0より前では、Backpressureはサポート付きの別のクラスを割り当てるのではなく、Observableに直接実装されていました。 Backpressureの主な問題は、多くのホットなObservableが十分に信頼できず、特定の状況下で予期しないMissingBackpressureExceptionを引き起こす可能性があることです。 そして、一定の蓄積された経験がなければ、そのような例外を予測することは非常に困難です。







Rx 2.0はこの状況を修正します。 Observableは背圧のないクラスであり、新しいFlowableには箱から出した背圧が与えられました。 次に、どこでどのケースでFlowableを使用するか、どこでObservableを使用するかのバリエーションを検討します。







それでは、いつ使用するのでしょうか?



この質問はおそらく最も論理的なものです。 リポジトリクラス、ビジネスロジッククラスを実装し、 Observable



またはFlowable



タイプを受け入れて返すかを決定するとき、 MissingBackpressureExceptionまたはOutOfMemoryErrorなどの例外を取得する際の問題を回避するのに役立ついくつかの要因を考慮する必要があります。なぜなら 間違ったタイプを不正確に使用すると、パフォーマンスが低下します。







Observableを使用する場合



これらの場合、条件付きで反復が1000個以下の要素がある場合、これは最悪のケースであり、スケーリングは想定されませんが、バックプレッシャーは必要ありません。 一般に、特定のケースでOutOfMemoryExceptionの可能性がないと感じた場合 、Observableを使用できる場合と使用する必要がある場合はまさにそうです。 基本的に、これらはUIを使用する場合、クリック、タッチ、ポインター移動などの最も多様なイベントです。 実際、これらは、周波数が1000 Hzを超えないイベントです。 同意して、タッチスクリーンを1秒あたり1000回以上クリックすることはできそうにありません。 ただし、デバウンスステートメントを忘れないでください。







 class MainActivity : AppCompatActivity() { override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) RxView.clicks(backpressure) .debounce(200, TimeUnit.MILLISECONDS) .subscribe ({ openNewScreen(BackpressureExampleActivity::class.java) }, Throwable::printStackTrace) RxView.clicks(nullpointer) .debounce(200, TimeUnit.MILLISECONDS) .subscribe ({ openNewScreen(NullPointerActivity::class.java) }, Throwable::printStackTrace) } }
      
      





RxBinding、 Jake WhartonのライブラリがViewをバインドするときにObservableを返すことは誰もが知っていると思いますが、これはRx2.xの概念を完全に正当化します







Flowableを使用する場合



大量または予測不可能な量のデータ、たとえば10,000を超える要素をすでに処理しているとき、または連続的なデータ生成の状況で。 Flowableを使用するもう1つの理由は、解析、さまざまなストレージメディア(内部/外部ストレージ)からのデータの読み取りです。







SQLiteOpenHelperを使用してデータベースから読み取ることも、Flowableを使用する言い訳です。 したがって、 https://github.com/square/sqlbriteおよびhttps://github.com/pushtorefresh/storioライブラリを使用する場合、ObservableをFlowableにキャストする必要はありません。 Flowableを使用するもう1つの理由は、バックエンドクエリを実行することです。







よく見ると、これらすべての場合に共通する詳細に気づくことができます-UIストリームをブロックします。 したがって、MainThreadをブロックする操作を実行する場合、これはFlowableを使用する機会です。







そして最後に。 最終的にノンブロッキングリアクティブAPI(Retrofit)を取得できる多くのブロッキングおよび/またはプルベースのデータソースも、Flowableを使用するための基盤です。







バックプレッシャーの詳細



バックプレッシャーは、一部の非同期操作が値を十分に速く処理できず、製造業者の速度を落とす必要がある産卵ストリームで発生する現象です。







生成ストリームが熱いときにバックプレッシャーを使用する必要がある典型的なケース:







 class BackpressureExampleActivity : AppCompatActivity() { private val n = 10000000 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_backpressure) val source = PublishProcessor.create<Int>() source .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) for(i in 0..n) { source.onNext(i) if(i == n) { source.onComplete() } } } private fun addToIntListAdapter(number: Int?) { Log.d("number", number.toString()) // do something } private fun onComplete() { textView?.text = "completed" } }
      
      





PublishProcessorは、私たちがよく知っているObservableのわずかに異なる形式です。 ご覧のように、より命令的な形式になっています。







この例では、メインスレッドはN個の要素を返します。 addToIntListAdapter(int number)



メソッドが次の着信要素をRecyclerView



接続されているアダプターに追加することを想像してaddToIntListAdapter(int number)



。 これにはすべて時間がかかり、現在のリクエストスタックのオーバーヘッドは実行時間よりも長くなる可能性があります。 ただし、 for



ループを持つプロデューススレッドはこれを認識できず、 onNext



呼び出しを続けます。







非同期ステートメント内には、処理されるまでそのような要素を保管するためのバッファーがあります。 RxJava 1.xでは、これらのバッファーは無制限でした。つまり、例のn個の要素がすべて含まれている可能性が高いということです。 この問題は、n = 1000000から始まります。従来の表現では、これはOutOfMemoryError



つながるか、原則として、GCの過度のリソース集中作業のためにパフォーマンスがフリーズし、その結果、頻繁に呼び出されます。







エラー処理がRxの不可欠な部分になり、( onErrorReturn



onErrorResumeNext



doOnError



を介して)作業するための演算子を受け取ったように、Backpressureは開発者がonBackpressureBuffer



onBackpressureDrop



を通じて直接考えて処理するデータストリームの別のプロパティです。 onBackpressureLast









n = 1,000,000の場合、次のようになります。







W / System.err:io.reactivex.exceptions.MissingBackpressureException:リクエストがないため値を出力できませんでした

W / System.err:io.reactivex.processors.PublishProcessor $ PublishSubscription.onNext(PublishProcessor.java:322)

W / System.err:io.reactivex.processors.PublishProcessor.onNext(PublishProcessor.java:198)

W / System.err:mobile.geekbit.rx20habrahabrproject.backpressure.BackpressureExampleActivity.onCreate(BackpressureExampleActivity.kt:30)

W / System.err:android.app.Activity.performCreate(Activity.java:6679)

W / System.err:android.app.Instrumentation.callActivityOnCreate(Instrumentation.java:1118)

W / System.err:android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2618)

W / System.err:android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:2726)

W / System.err:android.app.ActivityThread.-wrap12(ActivityThread.java)

W / System.err:android.app.ActivityThread $ H.handleMessage(ActivityThread.java:1477)

W / System.err:android.os.Handler.dispatchMessage(Handler.java:102)

W / System.err:android.os.Looper.loop(Looper.java:154)

W / System.err:android.app.ActivityThread.main(ActivityThread.java:6119)

W / System.err:java.lang.reflect.Method.invoke(ネイティブメソッド)

W / System.err:com.android.internal.os.ZygoteInitで$ MethodAndArgsCaller.run(ZygoteInit.java:886)

W / System.err:at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)

また、UIスレッドでThread.sleep(10000)



を呼び出すと、当然、アクティビティがひどいフリーズで開始され、GCが7〜8回発生します。







前述のPublishProcessor



加えて、 PublishProcessor



サポートしない他のオペレーターがあります。 たとえば、 Observable.interval



は、処理Observable.interval



な値よりも高速に定期的に値を生成します。







RxJava 2.xでは、ほとんどの非同期ステートメントの内部バッファーが制限されており、このバッファーをオーバーフローさせようとすると、MissingBackpressureExceptionでシーケンス全体が完了します。 ドキュメントには、Backpressureとそれをサポートするオペレーターに関するセクションがありますhttp://reactivex.io/documentation/operators/backpressure.html







ただし、背圧は、より予期しない場所でも発生します。通常のコールドフローでは、MissingBackpressureExceptionが発生しないため、発生しないはずです。 上記の例を使用して、リファクタリングする場合:







 class BackpressureExampleActivity : AppCompatActivity() { private val n = 10000000 override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_backpressure) Flowable.range(1, n) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) } private fun addToIntListAdapter(number: Int?) { Log.d("number", number.toString()) // do something } private fun onComplete() { textView?.text = "completed" } }
      
      





すべてはMissingBackpressureException



なしで開始され、パフォーマンスは安定したfpsでスムーズに、適切なメモリ使用量で動作します。その量ははるかに少なく割り当てられます。 その理由は、多くの製造オペレーターがオンデマンドで値を「生成」し、オペレーターを監視できるためです。 Flowable.range(0, n)



は、バッファがオーバーフローなしで保持できる数の値を生成すると言うことができます。 より具体的な例を見てみましょう:







  Flowable.range(1, n) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object : DisposableSubscriber<Int>() { public override fun onStart() { request(1) } override fun onNext(v: Int?) { addToIntListAdapter(v) request(1) } override fun onError(ex: Throwable) { ex.printStackTrace() } override fun onComplete() { onComplete() } })
      
      





ここで、 onStart



実装は、最初の値を作成する範囲を指定し、その値はonNext



受け入れられonNext



。 操作が完了すると、範囲から別の値が要求されます。 典型的な範囲の実装では、そのような呼び出しは再帰的にonNextを呼び出しますが、これはStackOverflowError



につながりますが、これはもちろん望ましくありません。







StackOverflowError



を防ぐために、オペレーターはトランポリンを使用します(翻訳では、用語はトランポリンとして定義できます)。 これは条件付きロジックと呼ばれ、繰り返しの呼び出しを防ぎます。 Clojure、Scalaなどの関数型言語、特に再帰呼び出しでよく見られます。 範囲に関しては、トランポリンはonNext()を呼び出している間にrequest(1)の呼び出しがあったことを記憶します。 そして、onNext()が返るとすぐに、次の値で次のonNext()を呼び出します。 したがって、場所を入れ替えても、上記の例は同じように機能します。







 override fun onNext(v: Int?) { addToIntListAdapter(v) request(1) }
      
      





ただし、これはonStartでは機能しません。 Flowableフレームワークは、各サブスクライバに対して複数回呼び出されないことを保証しますが、要求(1)を呼び出すと、すぐに新しいアイテムが生成されます。 onNextに必要な要求(1)の呼び出し後に初期化ロジックがある場合、例外が発生する可能性があります。







 class IntMapper { private val KOFF = 2 fun map(int: Int?): Int = int ?: 0 * KOFF } Flowable.range(1, n) .subscribe(object : DisposableSubscriber<Int>() { lateinit var mapper: IntMapper public override fun onStart() { request(1) mapper = IntMapper() } override fun onNext(v: Int?) { addToIntListAdapter(mapper.map(v)) request(1) } override fun onError(ex: Throwable) { ex.printStackTrace() } override fun onComplete() { onComplete() } })
      
      





この場合、NullPointerExceptionをスローできます。 そして、KotlinのNull Safety機能を考えると、この例外を達成しようとする必要がありました。 そして、1つの機能を考慮してください-私は演算子を削除しました







 .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread())
      
      





これらの演算子の性質は非同期であるため、これらの演算子ではNullPointerExceptionが発生しないためです。

そして、この同期の場合、onStart呼び出しが行われるとすぐに、すぐにNullPointerExceptionを受け取ります。 さらに、要求(1)呼び出しが別のスレッドでonNextの非同期呼び出しを伴う場合、このエラーはキャッチするのがより難しく、これが競合状態が発生する理由です。







したがって、onStartですべての初期化操作を実行する必要があります。それよりも前に、さらにrequest()を要求する必要があります。







OnBackpressureBuffer、onBackpressureDrop、onBackpressureLastステートメント



ほとんどの開発者は、アプリケーションがMissingBackpressureExceptionでクラッシュしたときに背圧に直面し、ログは通常observeOnステートメントを指します。 実際の理由は、原則として、PublishProcessor、Observable、タイマー()、間隔()またはカスタムリフト()演算子、およびcreate()で発生する操作の使用です。







このような状況を解決するにはいくつかの方法がありますが、ここでそれらを検討します。







  1. バッファサイズを増やす


潜在的に危険なソースが原因で、このようなオーバーフローが発生することがあります。 たとえば、突然ユーザーが画面をタップするのが速すぎて、observeOnバッファーが非常にすばやくアクティブにオーバーフローしています。







バッファー自体について説明します。 対応する質問が発生する可能性がありますが、それは何であり、Rxメカニズムにどのように関与していますか? 実際、すべてが非常に簡単です。 まず、デフォルトのバッファディメンションはMath.max(1, Integer.getInteger("rx2.buffer-size", 128))



として計算され、私の観察によると、このメソッドは常に128を返します(キャプテンは居眠りしません)。 その結果、128 ... mm ...の寸法のバッファーが得られます。 少し? バイト? オウム? この質問に対する答えはこちらから入手できます。







  @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.CUSTOM) public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
      
      





bufferSize



パラメーターbufferSize



モジュロでチェックされ、 FlowableObserveOn



インスタンスに送信されます。







  final int prefetch; //    bufferSize public FlowableObserveOn( Flowable<T> source, Scheduler scheduler, boolean delayError, int prefetch) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.prefetch = prefetch; }
      
      





さらにサブスクライブすると、すでにおなじみのものが表示されます。







 @Override public void onSubscribe(Subscription s) { // -  s.request(prefetch); // -  }
      
      





そして、実際にバッファをすでに使用していることがわかりましたが、それについては疑っていませんでした。

要求(1)覚えていますか? だから、これはSubscription



インターフェースが実際にどのように見えるかです







 public interface Subscription { public void request(long n); public void cancel(); }
      
      





request(long n)



、上流の要求の要素の数にすぎません。

したがって、バッファは同じrequest(n)



あると結論付けることができます。 しかし、Flowableにそのバッファーサイズを伝える方法は? 以下について。







RxJavaの最近のバージョンのほとんどの背圧に敏感なステートメントでは、開発者が内部バッファーのサイズを指定できるようになりました。 関連するパラメーターは、bufferSize、prefetch、またはcapacityHintと呼ばれます。 導入部の混雑した例を考えると、PublishProcessorのバッファーサイズを増やすだけで、すべての値に十分なスペースを確保できます。







 val source = PublishProcessor.create<Int>() source .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread(), false, 1024) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) for(i in 0..n) { source.onNext(i) if(i == n) { source.onComplete() } } Thread.sleep(10000)
      
      





ただし、プロデューサが予測バッファのサイズを超えてスポーンするとオーバーフローが発生する可能性があるため、これは原則として松葉杖です。 この場合、次のセクションのいずれかの演算子を使用できます。







  1. 標準演算子で値をグループ化/スキップ/バッファリング


グループ化された形式でソースデータをより効率的に処理できる場合、標準のバッチ処理演算子(サイズまたは時間)のいずれかを使用して、MissingBackpressureExceptionの可能性を減らすことができます。







 val source = PublishProcessor.create<Int>() source .buffer(1024*1024) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread(), false, 1024) .flatMap { PublishProcessor.fromIterable(it) } .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) for(i in 0..n) { source.onNext(i) if(i == n) { source.onComplete() } } Thread.sleep(10000)
      
      





一部の値を安全に無視できる場合は、サンプリング(サンプル演算子)とスロットル演算子(throttleFirst、throttleLast、throttleWithTimeout)を使用できます。







 val source = PublishProcessor.create<Int>() source .sample(1, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread(), false, 1024) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete) for(i in 0..n) { source.onNext(i) if(i == n) { source.onComplete() } }
      
      





ただし、これらの演算子はダウンストリームコストの受信率を下げるだけであるため、MissingBackpressureExceptionが発生する可能性があります。







  1. OnBackpressureBuffer()ステートメント


この演算子は、基本形式では、ソースソースとダウンストリーム演算子の間に無制限のバッファーを再挿入します。 これは、メモリが使い果たされるまで、継続的に生成されるソースから来るほぼすべての量を処理できることを意味します。







 Flowable.range(1, n) .onBackpressureBuffer() .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)
      
      





この例では、 observeOn



関数のバッファサイズは非常に小さくなっていますが(デフォルトでは128であることを思い出します)、 onBackpressureBuffer



n個の値( val n = 1000000



)をすべて吸収してにonBackpressureBuffer



ため、 MissingBackpressureException



例外はありません

小さなバッチでのバッチ値の実行。

onBackpressureBuffer



オペレーターが生成ソースを無制限に、つまりバックプレッシャーを適用せずに消費することは注目に値します。 これにより、range()などのメーカーでさえも完全に実行されます。







onBackpressureBuffer



演算子を使用したオーバーロードされたフォームもいくつかあります。 最も必要なものをあげます。







onBackpressureBuffer(int容量)







これは、バッファが指定された容量に達するとBufferOverflowException



を通知する限定バージョンです。







 Flowable.range(1, n) .onBackpressureBuffer(16) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)
      
      





ますます多くの演算子がバッファサイズの設定を許可するようになったため、このステートメントの妥当性は低下しました。 それ以外の場合、これにより、内部バッファを拡張できます。







onBackpressureBuffer(int容量、アクションonOverflow)







このオーバーロードされたメソッドは、オーバーフローの場合にコールバックを呼び出します。 アクション自体は表さない

珍しいことは何もない







 public interface Action { void run() throws Exception; }
      
      





ただし、現在の呼び出しスタック以外のオーバーフローに関する情報はないため、その有用性はかなり限られていることに注意してください。







onBackpressureBuffer(int capacity、Action0 onOverflow、BackpressureOverflowStrategyストラテジー)







しかし、このオプションは本当に興味深いものです。 容量に達した場合の対処方法を決定できるため、その有用性はわずかに高くなります。 BackpressureOverflowStrategyは、次の場合に典型的なアクションを表す実装を持つ3つの静的フィールドを提供する列挙型です。







ERROR — , BufferOverflowException.







DROP_OLDES — , , .







DROP_LATEST — , , , , , .







 Flowable.range(1, n) .onBackpressureBuffer(1024, { Toast.makeText(baseContext, BufferOverflowException::class.simpleName, Toast.LENGTH_SHORT).show() }, BackpressureOverflowStrategy.DROP_LATEST) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)
      
      





, , . , BufferOverflowException.







BackpressureOverflowStrategy. onBackpressureDrop() onBackpressureLatest(). .







 Flowable.range(1, n) .onBackpressureDrop() .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)
      
      





 Flowable.range(1, n) .onBackpressureLatest() .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(this::addToIntListAdapter, Throwable::printStackTrace, this::onComplete)
      
      





, , backpressure . , , , .. ... , onBackpressure



, .







backpressured datasources, . .







https://github.com/scrobot/Rx2.0habrahabrproject
















All Articles