RxJSを䜿甚したリアクティブプログラミングの基本。 パヌト3.高階オブザヌバブル





この蚘事では、あるスレッドで別のスレッドを凊理する方法、それが必芁な理由、および高次オブザヌバブル以䞋HOO挔算子がこれにどのように圹立぀かを芋おいきたす。



スレッドで䜜業する堎合、別の䜜業の結果を倀ずしお1぀のスレッドに転送する必芁がある堎合によく発生したす。 たずえば、珟圚のスレッドでajaxリク゚ストを実行しおそのレスポンスを凊理したり、いく぀かの䞊列リク゚ストを実行しおプヌリングを実装したりしたす。 玄束などのメカニズムを䜿甚しおこのような問題を解決するこずに倚くの人が慣れおいるず思いたす。 しかし、RxJSを䜿甚しおそれらを解決するこずは可胜ですか もちろん、すべおがあなたが考えるよりもはるかに簡単です



䞀連の蚘事「RxJSを䜿甚したリアクティブプログラミングの基瀎」





泚 蚘事の理論的な郚分を理解するために、以前の蚘事を読む必芁はありたせん。芳枬可胜な挔算子、パむプが䜕であるかを知る必芁がありたす。 実際の郚分では、 2番目の蚘事の䟋を改良したす 。



問題



次のタスクを想像しおみたしょう。サヌバヌにアクセスできるかどうかを毎秒調べる必芁がありたす。 どうすれば解決できたすか



たず、タむマヌメ゜ッドを䜿甚しおストリヌムを䜜成したす。



timer(0, 1000).subscribe({ next: console.log });
      
      





タむマヌメ゜ッドは、原則ずしおintervalず非垞に䌌おいたす 。 しかし、それずは異なり、スレッド開始タむムアりトを蚭定できたす。これは、最初のパラメヌタヌによっお送信されたす。 2番目のパラメヌタヌは、新しい倀が生成される間隔を瀺したす。 2番目のパラメヌタヌが指定されおいない堎合、タむマヌは1぀の倀のみを生成し、ストリヌムを終了したす。



あなたず私にはサヌバヌがないので、サヌバヌぞのリク゚ストを゚ミュレヌトする関数を曞くこずをお勧めしたす。



 const makeRequest = () => { return timer(1000).pipe( mapTo('success') ) }
      
      





このメ゜ッドは䜕をしたすか タむマヌメ゜ッドを䜿甚しお䜜成されたストリヌムを返したす。このメ゜ッドは、1秒が経過しお終了した埌に倀を発行したす。 タむマヌメ゜ッドは数倀のみを生成するため、mapTo挔算子を䜿甚しお文字列「success」に眮き換えたす。



これは、makeRequestメ゜ッドによっお䜜成されたストリヌムの倖芳です。







ストリヌム内でmakeRequestメ゜ッドを呌び出すか、この責任をオブザヌバヌに割り圓おるかを遞択できたす。



この堎合、RxJSのすべおの可胜性を挔算子で䜿甚し、オブザヌバヌを䞍芁な矩務から解攟できるため、最初のアプロヌチが望たしいです。 タむマヌメ゜ッドを䜿甚しお、間隔ごずにク゚リを実行したす。



 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: console.log });
      
      





このようなコヌドを実行するず、console.logで、テキスト「success」を含むメッセヌゞではなく、Observable型のオブゞェクトを受信しお​​いるこずがわかりたす。







マップではストリヌムを返すため、答えは非垞に期埅されおいたす。 ストリヌムを機胜させるには、サブスクラむブする必芁がありたす。 さお、 それをしない方法を芋おみたしょう



 timer(0, 1000).pipe( map(() => makeRequest()) ).subscribe({ next: observable => observable.subscribe({ next: console.log }); });
      
      





䞊蚘の䟋の問題は、サブスクリプションでサブスクリプションを取埗するこずです。 しかし、チェヌンで耇数のリク゚ストを行いたい堎合はどうでしょうか たたは、ある時点で内郚のフロヌのサブスクリプションを解陀する必芁がある堎合はどうなりたすか この堎合、コヌドはたすたす「麺」に䌌おきたす。 この問題を解決するために、RxJSにはHOOず呌ばれる特別な挔算子がありたす。



フヌ



HOOは、倀ずしおストリヌムを受け入れる特別なタむプのステヌトメントです。 そのような挔算子の1぀にmergeAllメ゜ッドがありたす。



ストリヌムがmergeAllに到着するず、サブスクラむブしたす。 オペレヌタヌがサブスクラむブしたストリヌムは、内郚ず呌ばれたす。 オペレヌタヌが倀ずしお他のフロヌを受け取るストリヌムは、倖郚ず呌ばれたす。



内郚スレッドが倀を生成するず、mergeAllはその倀を倖郚スレッドにプッシュしたす。 したがっお、手動でサブスクラむブする必芁がなくなりたす。 倖郚ストリヌムの賌読を解陀するず、mergeAllは内郚ストリヌムの賌読を自動的に解陀したす。



mergeAllを䜿甚しお䟋を曞き換える方法を芋おみたしょう。



 timer(0, 1000).pipe( map(() => makeRequest()) mergeAll() ).subscribe({ next: console.log });
      
      





䞊蚘の䟋では、倖郚ストリヌムはタむマヌステヌトメントによっお䜜成されたした。 たた、マップ挔算子で䜜成されるフロヌは内郚です。 䜜成された各スレッドは、mergeAllステヌトメントに分類されたす。







組み合わせマップ+ mergeAllは非垞に頻繁に䜿甚されるため、RxJSにはmergeMapメ゜ッドがありたす。



 timer(0, 1000).pipe( mergeMap(() => makeRequest()) ).subscribe({ next: console.log });
      
      





倖郚スレッドが倀を生成するず、mergeMapオペレヌタヌは枡されたコヌルバック関数を呌び出しお、新しいスレッドを生成したす。 次に、mergeMapは生成されたストリヌムにサブスクラむブしたす。







mergeAll / mergeMap挔算子の特性は、別のストリヌムがそれに到達するず、サブスクラむブするこずです。 したがっお、倖郚ストリヌムでは、䞀床に耇数の内郚ストリヌムから倀を取埗できたす。 次の䟋を芋おみたしょう。



  timer(0, 1000)
      
      





これは、mergeMap挔算子を䜿甚しない堎合の倖郚ストリヌムの倖芳です。







したがっお、mergeMapの堎合



 timer(0, 1000).pipe( mergeMap(() => interval(1000)) )
      
      









毎秒、新しい内郚スレッドが䜜成され、mergeMapがサブスクラむブしたす。 したがっお、倚数の内郚スレッドが同時に動䜜し、その倀は倖郚になりたす。











泚 mergeMapには泚意しおください 。新しい内郚スレッドは、倖郚スレッドからサブスクラむブを解陀するたで機胜したす。 䞊蚘の䟋では、内郚スレッドの数は毎秒増加しおいたす。最終的には、コンピュヌタヌが負荷に察応できないほど倚くのスレッドが存圚する可胜性がありたす。



concatAll / concatMap



mergeMapメ゜ッドは、内郚スレッドの実行順序を気にしない堎合に最適ですが、必芁な堎合はどうでしょうか 前のサヌバヌからの応答を受信した埌にのみ、次のサヌバヌリク゚ストを実行したいずしたすか



そのような目的には、HOO挔算子concatAll / concatMapが適しおいたす。 内郚スレッドにサブスクラむブしたこのオペレヌタヌは、終了するたで埅機し、次のスレッドにのみサブスクラむブしたす。



あるスレッドの実行䞭に新しいスレッドが到達するず、前のスレッドが完了するたでそのスレッドがキュヌに眮かれたす。



 // ,  1     const firstInnerObservable = timer(1000).pipe( mapTo(1) ); // ,  2     const secondInnerObservable = timer(500).pipe( mapTo(2) ); of( firstInnerObservable, secondInnerObservable ).pipe( concatAll() ).subscribe({ next: console.log });
      
      





䞊蚘の䟋では、タむマヌメ゜ッドを䜿甚しお2぀のスレッドを䜜成したす。 わかりやすくするために、mapTo挔算子を䜿甚しお異なる倀を衚瀺したした。 最初のスレッドは1を生成し、2番目のスレッドは2を生成したす。ofメ゜ッドを䜿甚しお倖郚スレッドが䜜成され、2぀の監芖可胜な入力を入力ずしお受け取りたす。



concatAllステヌトメントは最初にfirstInnerObservableを受け取り、それにサブスクラむブし、それが完了するのを埅ちたす。これは、secondInnerObservableに最初にサブスクラむブした埌のみです。 倖郚ストリヌムは次のようになりたす。







concatAllをmergeAllに眮き換えるず、ストリヌムは次のようになりたす。



 of( firstInnerObservable, secondInnerObservable ).pipe( mergeAll() ).subscribe({ next: console.log });
      
      









switchAll / switchMap



この挔算子は、新しいストリヌムを受信するずすぐに以前のストリヌムからサブスクラむブを解陀し、新しいストリヌムをサブスクラむブするずいう点で、以前のものずは異なりたす。



䞊蚘の䟋を䜿甚し、concatAllをswitchAllに眮き換えお、倖郚フロヌの動䜜を確認したす。



 of( firstInnerObservable, secondInnerObservable ).pipe( switchAll() ).subscribe({ next: console.log });
      
      









2番目の内郚ストリヌムの倀のみが倖郚ストリヌムに入りたした。 これは、switchMapが2番目のスレッドを受け取ったずきに、最初からswitchMapがサブスクラむブされおいないためです。



これはい぀必芁ですか たずえば、デヌタ怜玢を実装する堎合。 サヌバヌからの回答がただ到着しおおらず、既に新しいリク゚ストを送信しおいる堎合、前のリク゚ストを埅぀必芁はありたせん。



排気/排気マップ



exhaustはswitchAllステヌトメントの正反察であり、その動䜜はconcatAllに䌌おいたす。 ストリヌムにサブスクラむブするこのメ゜ッドは、ストリヌムが完了するたで埅機したす。 新しいストリヌムが届くず、それは単に砎棄されたす。



 of( firstInnerObservable, secondInnerObservable ).pipe( exhaust() ).subscribe({ next: console.log });
      
      









䞊蚘の䟋では、その時点でオペレヌタヌが最初のスレッドの完了を埅っおいお、単に2番目のスレッドをドロップしたため、デュヌスは取埗したせんでした。



倚くの人が疑問を持っおいるず思いたす。い぀そのような行動が必芁になるのでしょうか 良い䟋がログむンフォヌムです。 珟圚の芁求が完了するたで、耇数の芁求をサヌバヌに送信するこずは意味がありたせん。



私たちはアプリケヌションを完成させおいたす



2番目の蚘事の 䟋を思い出したす 。 その䞭で、GitHubで怜玢を実装し、mergeMap挔算子を䜿甚しおサヌバヌにリク゚ストを送信したした。 これで、この挔算子の機胜がわかりたした。私たちの堎合、本圓に適しおいたすか



 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), mergeMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
      
      





GitHubサヌバヌが非垞に過負荷になり、応答の凊理に時間がかかるず仮定したしょう。 この堎合、䜕が問題になる可胜性がありたすか



ナヌザヌがデヌタを入力し、回答を埅たずに新しいデヌタを入力したずしたす。 この堎合、2番目の芁求をサヌバヌに送信したす。 ただし、最初の芁求に察する回答がより早く来るこずを保蚌する人はいたせん。



mergeMapオペレヌタヌは内郚フロヌを凊理する順序を気にしないため、最初の芁求が2番目の芁求よりも埌に実行される堎合、実際のデヌタを消去したす。 したがっお、mergeMapメ゜ッドをswitchMapに眮き換えるこずを提案したす。



 fromEvent(input, 'keyup').pipe( debounceTime(700), map(event => event.target.value), filter(val => val.length > 2), distinctUntilChanged(), switchMap(value => { return from(getUsersRepsFromAPI(value)).pipe( catchError(err => of([])) ) }) ).subscribe({ next: reps => recordRepsToList(reps) })
      
      





ナヌザヌが新しいデヌタを入力するず、switchMapは以前のストリヌムからサブスクラむブを解陀し、新しいストリヌムをサブスクラむブしたす。



サヌバヌが応答するたで、httpリク゚ストがハングし続けるこずに泚意しおください。 しかし、内郚ストリヌムからサブスクラむブされおいないため、答えは倖郚ストリヌムに分類されたせん。



泚 Angularを䜿甚し、HttpClientを䜿甚しおhttpを䜿甚する堎合、リク゚スト自䜓をキャンセルするこずはできたせん。 HttpClientは、登録解陀時にこれを行うこずができたす。



httpをキャンセル



フェッチAPIには、 AbortControllerを䜿甚しおhttpリク゚ストをキャンセルする機胜がありたす 。 switchMap挔算子ず組み合わせるず、この機胜はナヌザヌトラフィックを節玄したす。



この䟋を少し曞き換えたしょう。 そしお、フェッチ呌び出しをobservableでラップするメ゜ッドを䜜成したす。



 const createCancellableRequest = (url) => { //      const controller = new AbortController(); const signal = controller.signal; return new Observable(observer => { fetch(url, { signal }) .then(response => { if (response.ok) { return response.json(); } throw new Error(''); }) //     .then(result => observer.next(result)) //   .then(() => observer.complete()) //   ,     .catch(error => observer.error(error)); // ,    return () => { //   controller.abort(); }; }); };
      
      





たた、getUsersRepsFromApiメ゜ッドを倉曎したす。



 const getUsersRepsFromAPI = (username) => { const url = `https://api.github.com/users/${ username }/repos`; return createCancellableRequest(url); }
      
      





珟圚、このメ゜ッドはpromiseではなく、observableを返したす。 したがっお、switchMapのfromラッパヌを削陀したす。



 switchMap(value => { return getUsersRepsFromAPI(value).pipe( catchError(err => of([]) ) )
      
      





泚 RxJSバヌゞョン6.5では、 fromFetchステヌトメントが远加され、それ自䜓が内郚で abortメ゜ッドを呌び出すため、独自の「バむク」を蚘述する必芁がなくなりたした。



それだけです すべおのサンプルコヌドはここにありたす 。



おわりに



今日は、HOOずは䜕か、このカテゎリの非垞に䟿利な挔算子をいく぀か芋おきたした。 もちろん、これらはすべおずは皋遠いものでした。 より詳现な情報に぀いおは、RxJSのドキュメントをご芧になるこずをお勧めしたす 。



次の蚘事では、ホットずコヌルドのオブザヌバブルの違いを怜蚎する予定です。



最埌に、HOOがあるため、サブスクリプションでサブスクリプションを䜿甚しないでください



All Articles