SObjectizer甚に独自のトリッキヌなthread_poolディスパッチャヌを䜜成したす

この蚘事は䜕に぀いおですか



C ++ SObjectizerフレヌムワヌクの䞻な特城の1぀は、 ディスパッチャヌの可甚性です。 ディスパッチャは、アクタヌSObjectizerの甚語でぱヌゞェントがむベントを凊理する堎所ず方法を決定したす別のスレッド、䜜業スレッドのプヌル、アクタヌのグルヌプに共通する1぀のスレッドなど。



SObjectizerの構造には、すでに8぀のフルタむムディスパッチャヌが含たれおいたすさらに、SObjectizerの拡匵セットにもう1぀ありたす。 しかし、こうした倚様性にもかかわらず、特定の特定のタスクのために独自のディスパッチャを䜜成するこずが理にかなっおいる堎合がありたす。 この蚘事では、これらの状況の1぀を取り䞊げ、䜕らかの理由で通垞のディスパッチャが私たちに合わない堎合に、独自のディスパッチャを䜜成する方法を瀺したす。 たた、同じアクタヌを異なるディスパッチャにリンクするだけで、アプリケヌションの動䜜を簡単に倉曎できるこずが瀺されたす。 たあ、いく぀かのより興味深いささいなこずずささいなこずではありたせん。



䞀般的に、C ++向けの数少ない生きおいる、開発䞭のアクタヌフレヌムワヌクの実装の詳现に興味がある人は、安党に読み進めるこずができたす。



前文



最近、SObjectizerのナヌザヌの1人が、SObjectizerの䜿甚䞭に察凊しなければならなかった特定の問題に぀いお話したした。 ポむントは、SObjectizer゚ヌゞェントに基づいお、コンピュヌタヌに接続されたデバむスを管理するためのアプリケヌションが開発されおいるこずです。 䞀郚の操䜜぀たり、デバむスの初期化および再初期化操䜜は同期的に実行されたす。これにより、しばらくの間、䜜業スレッドがブロックされたす。 I / O操䜜は非同期であるため、読み取り/曞き蟌みの開始ず読み取り/曞き蟌み結果の凊理は非垞に高速で、長時間にわたっお䜜業スレッドをブロックしたせん。



数癟から数千のデバむスが倚数あるため、「1぀のデバむス-1぀の䜜業スレッド」スキヌムを䜿甚するこずは有益ではありたせん。 このため、䜜業スレッドの小さなプヌルが䜿甚され、デバむス䞊ですべおの操䜜が実行されたす。



しかし、このような単玔なアプロヌチには䞍快な機胜がありたす。 デバむスの初期化ず再初期化のための倚数のアプリケヌションが発生するず、これらのアプリケヌションはプヌルのすべおのスレッドに分散され始めたす。 たた、読み取り/曞き蟌みなどの短い操䜜がキュヌに蓄積され、長時間凊理されないのに、プヌルのすべおの䜜業スレッドが長い操䜜の実行でビゞヌになるず、状況が定期的に発生したす。 この状況は、たずえば、アプリケヌションの起動時、デバむスを初期化するためのアプリケヌションの倧きな「バンドル」がすぐに圢成されるずき、安定しお芳察されたす。 そしお、この「バンドル」が分解されるたで、すでに初期化されたデバむスでのI / O操䜜は実行されたせん。



この問題を解消するには、いく぀かのアプロヌチを䜿甚できたす。 この蚘事では、それらの1぀を分析したす。぀たり、アプリケヌションのタむプを分析する独自のcなthread_poolディスパッチャヌを䜜成したす。



䜕を達成したいですか



問題は、長時間実行されるハンドラヌ぀たり、デバむスを初期化および再初期化するためのハンドラヌがプヌル内のすべおの䜜業スレッドをブロックし、この芁求のために、短い操䜜぀たり、I / Oを埅機できるこずですキュヌは非垞に長いです。 このようなスケゞュヌリングスキヌムを取埗しお、短い操䜜のリク゚ストが衚瀺されたずきに、キュヌでの埅機が最小限になるようにしたす。



暡造「スタンド」



この蚘事では、 䞊蚘の問題の暡倣を䜿甚したす。 なぜ停物なのか 第䞀に、ナヌザヌの問題の本質に぀いおのアむデアしかありたせんが、詳现がわからず、圌のコヌドを芋たこずがないからです。 たた、暡倣により、実際の補品コヌドには非垞に倚くある现かい郚分に泚意を散らすこずなく、解決する問題の最も重芁な偎面に集䞭するこずができたす。



ただし、ナヌザヌから孊んだ重芁な詳现が1぀あり、これは以䞋で説明する゜リュヌションに最も深刻な圱響を及がしたす。 実際、SObjectizerにはメッセヌゞハンドラヌのスレッドセヌフの抂念がありたす。 ぀たり メッセヌゞハンドラヌがスレッドセヌフずしおマヌクされおいる堎合、SObjectizerは他のスレッドセヌフハンドラヌず䞊行しおこのハンドラヌを実行する暩利を持ちたす。 そしお、たさにそれを行うadv_thread_poolディスパッチャヌがありたす。



そのため、ナヌザヌはadv_thread_poolディスパッチャヌに関連付けられたステヌトレス゚ヌゞェントを䜿甚しおデバむスを管理したした。 これにより、キッチン党䜓が倧幅に簡玠化されたす。



それで、私たちは䜕を怜蚎する぀もりですか



同じ゚ヌゞェントからなる暡造品を䜜成したした。 1぀の゚ヌゞェントは、タむプa_dashboard_tの補助゚ヌゞェントです。 そのタスクは、シミュレヌション実隓の結果を刀断するための統蚈を収集しお蚘録するこずです。 この゚ヌゞェントの実装に぀いおは考慮したせん。



a_device_manager_tクラスによっお実装される 2番目の゚ヌゞェントは、デバむスの操䜜をシミュレヌトしたす。 この゚ヌゞェントがどのように機胜するかに぀いお少しお話したす。 これは、状態を倉曎する必芁のない゚ヌゞェントをSObjectizerに実装する方法の興味深い䟋です。



シミュレヌションには、ほが同じこずを行う2぀のアプリケヌションが含たれおいたす。コマンドラむン匕数を解析し、a_dashboard_tおよびa_device_manager_t゚ヌゞェントを䜿甚しおシミュレヌションを開始したす。 ただし、最初のアプリケヌションはa_device_manager_tをadv_thread_poolマネヌゞャヌにバむンドしたす。 しかし、2番目のアプリケヌションは独自のタむプのディスパッチャヌを実装し、a_device_manager_tをこの独自のディスパッチャヌにバむンドしたす。



各アプリケヌションの結果に基づいお、さたざたなタむプのディスパッチャがサヌビスアプリケヌションの性質にどのように圱響するかを確認できたす。



゚ヌゞェントa_device_manager_t



このセクションでは、a_device_manager_t゚ヌゞェントの実装における䞻芁なポむントを匷調しおみたす。 他のすべおの詳现は、完党な゚ヌゞェントコヌドで確認できたす 。 たたは、コメントで明確にしたす。



a_device_manager_t゚ヌゞェントは、同じタむプの倚くのデバむスでの動䜜をシミュレヌトする必芁がありたすが、同時に「ステヌトレス゚ヌゞェント」、぀たり その過皋で自分の状態を倉えおはなりたせん。 たさに、゚ヌゞェントはその状態を倉曎せず、スレッドセヌフなむベントハンドラヌを持぀こずができたす。



ただし、a_device_manager_t゚ヌゞェントが状態を倉曎しない堎合、どのデバむスを初期化する必芁があるか、どのデバむスを再初期化する必芁があるか、どのデバむスI / O操䜜を実行するかをどのように決定したすか 簡単です。この情報はすべお、a_device_manager_t゚ヌゞェントがそれ自䜓に送信するメッセヌゞ内で送信されたす。



開始時に、a_device_manager_t゚ヌゞェントは自身にN個のinit_device_tメッセヌゞを送信したす。 このようなメッセヌゞを受信するず、a_device_manager_t゚ヌゞェントは「device」のむンスタンスdevice_t型のオブゞェクトを䜜成し、初期化したす。 次に、このむンスタンスをポむントするず、perform_io_tメッセヌゞで送信されたす。 次のようになりたす。



void on_init_device(mhood_t<init_device_t> cmd) const { //     . handle_msg_delay(a_dashboard_t::op_type_t::init, *cmd); //       , //    . auto dev = std::make_unique<device_t>(cmd->id_, calculate_io_period(), calculate_io_ops_before_reinit(), calculate_reinits_before_recreate()); std::this_thread::sleep_for(args_.device_init_time_); //       IO- //   . send_perform_io_msg(std::move(dev)); }
      
      





perform_io_tメッセヌゞを受信するず、a_device_manager_t゚ヌゞェントはデバむスのI / O操䜜をシミュレヌトしたす。ポむンタヌは、perform_io_tメッセヌゞ内に配眮されたす。 同時に、device_tのIO操䜜のカりンタヌが枛少したす。 このカりンタヌがれロに達するず、a_device_manager_tはreinit_device_tメッセヌゞ再初期化カりンタヌがただリセットされおいない堎合を送信するか、デバむスを再䜜成するためのinit_device_tメッセヌゞを送信したす。 この単玔なロゞックは、固執する぀たり、通垞のIO操䜜の実行を停止する機胜を備えた実際のデバむスの動䜜を暡倣しおいるため、再初期化する必芁がありたす。 たた、各デバむスのリ゜ヌスが限られおいるため、デバむスを亀換する必芁があるずいう悲しい事実もありたす。



IO操䜜のカりンタヌがただリセットされおいない堎合、゚ヌゞェントa_device_manager_tはもう䞀床メッセヌゞperform_io_tを送信したす。



コヌドでは、すべお次のようになりたす。



 void on_perform_io(mutable_mhood_t<perform_io_t> cmd) const { //     . handle_msg_delay(a_dashboard_t::op_type_t::io_op, *cmd); //     IO-. std::this_thread::sleep_for(args_.io_op_time_); //   IO-  . cmd->device_->remaining_io_ops_ -= 1; // ,    . //   ,     . if(0 == cmd->device_->remaining_io_ops_) { if(0 == cmd->device_->remaining_reinits_) //   .     . so_5::send<init_device_t>(*this, cmd->device_->id_); else //     . so_5::send<so_5::mutable_msg<reinit_device_t>>(*this, std::move(cmd->device_)); } else //     ,  IO-. send_perform_io_msg(std::move(cmd->device_)); }
      
      





以䞋に、このような単玔なロゞックを瀺したす。これに぀いお、詳现を明確にするこずが理にかなっおいたす。



a_dashboard_t゚ヌゞェントぞの情報の送信



メッセヌゞハンドラヌinit_device_t、reinit_device_t、perform_io_tでは、最初の行は同様の構成です。



 handle_msg_delay(a_dashboard_t::op_type_t::init, *cmd);
      
      





これは、特定のメッセヌゞが芁求キュヌで費やした量に関する情報の゚ヌゞェントa_dashboard_tぞの転送です。 この情報に基づいお、統蚈が正確に構築されたす。



原則ずしお、メッセヌゞがアプリケヌションキュヌに費やされた時間に関する正確な情報は、SObjectizerの内臓に䟵入するこずによっおのみ取埗できたす。その埌、アプリケヌションをキュヌに配眮する時間ずそこから抜出する時間を修正できたす。 しかし、このような単玔な実隓では、このような極端なスポヌツには埓事したせん。 もっず簡単にしたしょう。次のメッセヌゞを送信するずきに、メッセヌゞの予想到着時間を保存したす。 たずえば、250ms遅延したメッセヌゞを送信する堎合、その時点Tc + 250msでメッセヌゞが到着するのを埅ちたす。Tcは珟圚の時刻です。 メッセヌゞが送信された堎合Tc + 350ms、キュヌで100msを費やしたした。



もちろん、これは正確な方法ではありたせんが、暡倣に非垞に適しおいたす。



珟圚の䜜業スレッドをしばらくブロックする



たた、メッセヌゞハンドラinit_device_t、reinit_device_t、perform_io_tのコヌドでは、std :: this_thread :: sleep_forの呌び出しを確認できたす。 これは、珟圚のスレッドをブロックするデバむスずの同期操䜜のシミュレヌションに過ぎたせん。



遅延時間はコマンドラむンから蚭定でき、デフォルトでは次の倀が䜿甚されたすinit_device_t-1250ms、perform_io_t-50ms。 reinit_device_tの期間は、init_deviceの期間の2/3ずしお蚈算されたす぀たり、デフォルトで833ms。



可倉メッセヌゞを䜿甚する



おそらく、a_device_manager_t゚ヌゞェントの最も興味深い機胜は、動的に䜜成されたdevice_tオブゞェクトの有効期間がどのように提䟛されるかです。 結局、init_device_tの凊理䞭にdevice_tむンスタンスが動的に䜜成され、このデバむスを再初期化する詊みが尜きるたで生き続ける必芁がありたす。 たた、再初期化の詊行が尜きるず、device_tむンスタンスを砎棄する必芁がありたす。



同時に、a_device_manager_tは状態を倉曎しないでください。 ぀たり a_device_manager_tには、ある皮のstd :: mapたたはstd :: unordered_mapを取埗できたせん。これは、生きおいるdevice_tの蟞曞になりたす。



この問題を解決するには、次のトリックを䜿甚したす。 reinit_device_tおよびperform_io_tメッセヌゞには、device_tむンスタンスぞのポむンタヌを含むunique_ptrが含たれおいたす。 したがっお、reinit_device_tたたはperform_io_tを凊理し、このデバむスの次のメッセヌゞを送信する堎合、unique_ptrを叀いメッセヌゞむンスタンスから新しいむンスタンスに単に転送したす。 むンスタンスが䞍芁になった堎合、぀たり reinit_device_tたたはperform_io_tを送信しなくなったため、device_tむンスタンスは自動的に砎棄されたす。 すでに凊理されたメッセヌゞのunique_ptrむンスタンスは砎棄されたす。



しかし、ちょっずしたトリックがありたす。 通垞、SObjectizerのメッセヌゞは、倉曎すべきではない䞍倉オブゞェクトずしお送信されたす。 これは、SObjectizerがPub / Subモデルを実装し、䞀般的な堎合にmboxにメッセヌゞを送信するため、メッセヌゞを受信するサブスクラむバヌの正確な数を正確に蚀うこずは䞍可胜だからです。 たぶん10個になるでしょう。 たぶん癟。 たぶん千。 したがっお、䞀郚のサブスクラむバヌはメッセヌゞを同時に凊理したす。 したがっお、あるサブスクラむバヌがメッセヌゞのむンスタンスを倉曎するこずを蚱可するこずはできたせんが、別のサブスクラむバヌはそのむンスタンスを䜿甚しようずしたす。 このため、通垞のメッセヌゞは定数リンクによっおハンドラヌに枡されたす。



ただし、メッセヌゞが単䞀の受信者に送信されるこずが保蚌されおいる堎合がありたす。 そしお、この受信者は受信したメッセヌゞむンスタンスを倉曎したいず考えおいたす。 この䟋では、結果のperform_io_tからunique_ptr倀を取埗し、それを新しいreinit_device_tむンスタンスに枡す方法を瀺したす。



そのような堎合、 倉曎可胜なメッセヌゞのサポヌトがSObjectizer-5.5.19に远加されたした。 これらのメッセヌゞには特別なマヌクが付いおいたす。 たた、実行時にSObjectizerは、可倉メッセヌゞがマルチプロデュヌサヌ/マルチコンシュヌマmboxに送信されるかどうかを確認したす。 ぀たり 可倉メッセヌゞは、耇数の受信者に配信できたす。 したがっお、通垞の非䞀定リンクによっお受信者に送信され、メッセヌゞの内容を倉曎できたす。



この痕跡は、コヌドa_device_manager_tにありたす。 たずえば、次のハンドラシグネチャは、ハンドラが倉曎可胜なメッセヌゞを予期しおいるこずを瀺しおいたす。



 void on_perform_io(mutable_mhood_t<perform_io_t> cmd) const
      
      





しかし、このコヌドは、可倉メッセヌゞのむンスタンスが送信されるこずを瀺しおいたす。

 so_5::send<so_5::mutable_msg<reinit_device_t>>(*this, std::move(cmd->device_));
      
      





adv_thread_poolディスパッチャヌを䜿甚したシミュレヌション



a_device_manager_tが通垞のadv_thread_poolマネヌゞャヌでどのように動䜜するかを確認するには、a_dashboard_tおよびa_device_manager_t゚ヌゞェントの連携を䜜成し、a_device_manager_tをadv_thread_poolマネヌゞャヌにリンクする必芁がありたす。 これは次のようになりたす 



 void run_example(const args_t & args ) { print_args(args); so_5::launch([&](so_5::environment_t & env) { env.introduce_coop([&](so_5::coop_t & coop) { const auto dashboard_mbox = coop.make_agent<a_dashboard_t>()->so_direct_mbox(); //        // adv_thread_pool-. namespace disp = so_5::disp::adv_thread_pool; coop.make_agent_with_binder<a_device_manager_t>( disp::create_private_disp(env, args.thread_pool_size_)-> binder(disp::bind_params_t{}), args, dashboard_mbox); }); }); }
      
      





プヌル内の20個のワヌクスレッドず他のデフォルト倀を䜿甚したテスト実行の結果、次の図が埗られたす。







䜜業の開始盎埌に倧きな「灰色」のピヌクず同様に、最初に倧きな「青」のピヌクこれは起動時のデバむスの倧芏暡な䜜成ですを芋るこずができたす。 最初に、倚数のinit_device_tメッセヌゞを受信したす。そのうちのいく぀かは、順番が凊理されるたで長時間埅機したす。 その埌、perform_io_tは非垞に迅速に凊理され、倚数のreinit_device_tが生成されたす。 これらのreinit_device_tの䞀郚は䞊んで埅機しおいるため、顕著な灰色のピヌクがありたす。 たた、緑色の線に顕著な䜎䞋が芋られたす。 これは、reinit_device_tずinit_device_tが䞀括凊理されおいる時点で凊理されたperform_io_tメッセヌゞの数が枛少するこずです。



私たちの仕事は、「グレヌ」バヌストの数を枛らし、「グリヌン」ディップをそれほど深くしないこずです。



独自のcなthread_poolディスパッチャヌのアむデア



adv_thread_poolディスパッチャヌの問題は、圌にずっおすべおのリク゚ストが等しいこずです。 したがっお、䜜業スレッドを解攟するずすぐに、キュヌから最初のアプリケヌションを圌女に枡したす。 これがどんな皮類のアプリケヌションであるかを完党に理解しおいない。 これにより、すべおの䜜業スレッドがinit_device_tたたはreinit_device_tのリク゚ストの凊理でビゞヌ状態になり、perform_io_tタむプのリク゚ストがキュヌに蓄積される状況になりたす。



この問題を取り陀くために、2぀のタむプのワヌクスレッドの2぀のサブプヌルを持぀独自の巧劙なthread_poolマネヌゞャヌを䜜成したす。



最初のタむプの䜜業スレッドは、あらゆるタむプのアプリケヌションを凊理できたす。 タむプinit_device_tおよびreinit_device_tのリク゚ストが優先されたすが、それらが珟圚利甚できない堎合、タむプperform_io_tのパフォヌマンスも凊理できたす。



2番目のタむプのワヌカヌスレッドは、init_device_tおよびreinit_device_tタむプのリク゚ストを凊理できたせん。 タむプperform_io_tの芁求は凊理できたすが、タむプinit_device_tの芁求は凊理できたせん。



したがっお、reinit_device_tタむプの50のクレヌムずperform_io_tタむプの150のクレヌムがある堎合、最初のサブプヌルはreinit_device_tクレヌムをレヌキし、2番目のサブプヌルは同時にperform_io_tクレヌムをレヌキしたす。 reinit_device_tタむプのすべおの芁求が凊理されるず、最初のサブプヌルの䜜業スレッドが解攟され、perform_io_tタむプの残りの芁求の凊理を支揎できるようになりたす。



巧劙なthread_poolディスパッチャヌは、短いリク゚ストを凊理するための別個のスレッドセットを保持しおいるため、倚数の長いリク゚ストがある堎合でもたずえば、䜜業の開始時、䞀床に倧量のinit_device_tを送信する堎合など、ショヌトオヌダヌを遅くするこずはできたせん。



unningなthread_poolディスパッチャヌを䜿甚したシミュレヌション



同じシミュレヌションを行うが、異なるディスパッチャを䜿甚するには、䞊蚘のrun_example関数を再実行するだけです。



 void run_example(const args_t & args ) { print_args(args); so_5::launch([&](so_5::environment_t & env) { env.introduce_coop([&](so_5::coop_t & coop) { const auto dashboard_mbox = coop.make_agent<a_dashboard_t>()->so_direct_mbox(); //        //  . coop.make_agent_with_binder<a_device_manager_t>( tricky_dispatcher_t::make(env, args.thread_pool_size_)->binder(), args, dashboard_mbox); }); }); }
      
      





぀たり すべお同じ゚ヌゞェントを䜜成したすが、今回はa_device_manager_tを別のディスパッチャにバむンドしたす。



同じパラメヌタで起動した結果、別の画像が衚瀺されたす。







ただ同じ「青」ピヌクがありたす。 今ではさらに高くなっおいたすが、驚くこずではありたせん。 init_device_tを凊理するために割り圓おられる䜜業スレッドが少なくなりたした。 しかし、「灰色」のピヌクは芋られず、「緑」のディップはそれほど深くなりたせん。



぀たり 望んだ結果が埗られたした。 そしお、この最もunningなディスパッチャのコヌドを芋るこずができたす。



トリッキヌなthread_poolディスパッチャヌの実装



SObjectizerのディスパッチャは2぀のタむプに分けられたす。



たず、公共ディスパッチャ。 各パブリックディスパッチャには䞀意の名前が必芁です。 通垞、ディスパッチャむンスタンスは、SObjectizerの開始前、SObjectizerの開始時、パブリックディスパッチャの開始時、およびSObjectizerの終了時に停止したす。 これらのコントロヌラヌには、 特定のむンタヌフェヌスが必芁です。 しかし、これは時代遅れのタむプのディスパッチャです。 SObjectizerの次のメゞャヌバヌゞョンでは、パブリックディスパッチャがなくなる可胜性はほずんどありたせん。



第二に、プラむベヌトディスパッチャヌ。 ナヌザヌは、SObjectizerの起動埌い぀でもこのようなディスパッチャを䜜成したす。 プラむベヌトディスパッチャヌは、䜜成埌すぐに起動する必芁があり、䜿甚されなくなった埌は䜜業を終了したす。 シミュレヌションのために、プラむベヌトディスパッチャずしおのみ䜿甚できるディスパッチャを䜜成したす。



ディスパッチャに関連する䞻芁なポむントを芋おみたしょう。



ディスパッチャのdisp_binder



プラむベヌトディスパッチャには、厳密に定矩されたむンタヌフェむスはありたせん。 すべおの基本的な操䜜は、コンストラクタヌずデストラクタヌで実行されたす。 ただし、プラむベヌトディスパッチャヌには、通垞バむンダヌず呌ばれる特別なバむンダヌオブゞェクトを返すパブリックメ゜ッドが必芁です。 このバむンダヌオブゞェクトは、゚ヌゞェントを特定のディスパッチャヌにバむンドしたす。 そしお、バむンダヌにはすでに明確なむンタヌフェヌスdisp_binder_tがあるはずです。



したがっお、 ディスパッチャでは、disp_binder_tむンタヌフェむスを実装する独自のバむンダヌタむプを䜜成したす。



 class tricky_dispatcher_t : public std::enable_shared_from_this<tricky_dispatcher_t> { friend class tricky_event_queue_t; friend class tricky_disp_binder_t; class tricky_event_queue_t : public so_5::event_queue_t {...}; class tricky_disp_binder_t : public so_5::disp_binder_t { std::shared_ptr<tricky_dispatcher_t> disp_; public: tricky_disp_binder_t(std::shared_ptr<tricky_dispatcher_t> disp) : disp_{std::move(disp)} {} virtual so_5::disp_binding_activator_t bind_agent( so_5::environment_t &, so_5::agent_ref_t agent) override { return [d = disp_, agent] { agent->so_bind_to_dispatcher(d->event_queue_); }; } virtual void unbind_agent( so_5::environment_t &, so_5::agent_ref_t) override { //    . } }; ... // ,   so_5::event_queue_t  , //      . tricky_event_queue_t event_queue_; ... public: ... //  ,       . so_5::disp_binder_unique_ptr_t binder() { return so_5::disp_binder_unique_ptr_t{ new tricky_disp_binder_t{shared_from_this()}}; } };
      
      





tricky_dispatcher_tクラスはstd :: enable_shared_from_thisを継承するため、参照カりンタヌを䜿甚しおディスパッチャヌの有効期間を制埡できたす。 ディスパッチャヌの䜿甚が停止されるずすぐに、参照カりンタヌがリセットされ、ディスパッチャヌは自動的に砎棄されたす。



ticky_dispatcher_tクラスには、tricky_disp_binder_tの新しいむンスタンスを返すパブリックバむンダヌメ゜ッドがありたす。 ディスパッチャヌ自䜓ぞのスマヌトポむンタヌがこのむンスタンスに枡されたす。 これにより、run_exampleコヌドで前述したように、特定の゚ヌゞェントを特定のディスパッチャに関連付けるこずができたす。



  //        //  . coop.make_agent_with_binder<a_device_manager_t>( tricky_dispatcher_t::make(env, args.thread_pool_size_)->binder(), args, dashboard_mbox); }); });
      
      





バむンダオブゞェクトは2぀のこずを行う必芁がありたす。 1぀目は、゚ヌゞェントをディスパッチャヌにバむンドするこずです。 bind_agentメ゜ッドで行われおいるこず。 実際、゚ヌゞェントのディスパッチャぞのバむンドは2段階で実行されたす。 たず、協力を登録するプロセスでbind_agentメ゜ッドが呌び出され、このメ゜ッドぱヌゞェントに必芁なすべおのリ゜ヌスを䜜成する必芁がありたす。 たずえば、゚ヌゞェントがactive_objディスパッチャヌにバむンドされおいる堎合、新しい䜜業スレッドを゚ヌゞェントに割り圓おる必芁がありたす。 これはたさにbind_agentで起こるべきこずです。 bind_agentメ゜ッドは、以前に割り圓おられたリ゜ヌスを䜿甚しお゚ヌゞェントバむンディング手順を既に完了するファンクタを返したす。 ぀たり 協力を登録するず、たずbind_agentが呌び出され、少し埌にbind_agentによっお返されたファンクタヌが呌び出されたす。



この堎合、bind_agentは非垞に単玔です。 リ゜ヌスを割り圓おる必芁はありたせん。ファンクタを返すだけで、゚ヌゞェントずディスパッチャが接続されたすこれに぀いおは以䞋で詳しく説明したす。



2番目のアクションは、゚ヌゞェントをディスパッチャヌから切り離すこずです。 この解攟は、゚ヌゞェントがSObjectizerから削陀登録解陀されるずきに発生したす。 この堎合、゚ヌゞェントに割り圓おられおいる䞀郚のリ゜ヌスをクリアする必芁がある堎合がありたす。 たずえば、ディスパッチャactive_objは、゚ヌゞェントに割り圓おられた䜜業スレッドを停止したす。



unbind_agentメ゜ッドは、2番目のアクションを実行したす。 しかし、この䟋では、tricky_dispatcher_tでぱヌゞェントを解攟するずきにリ゜ヌスをクリヌニングする必芁がないため、空です。



tricky_event_queue_t



䞊蚘で「゚ヌゞェントをディスパッチャにバむンドする」こずに぀いお説明したしたが、このバむンドのポむントは䜕ですか ポむントは2぀の簡単なこずです。



たず、前述のactive_objのような䞀郚のディスパッチャは、バむンド時に特定のリ゜ヌスを゚ヌゞェントに割り圓おる必芁がありたす。



第二に、SObjectizer゚ヌゞェントには、独自のメッセヌゞ/リク゚ストキュヌがありたせん。 これが、SObjectizerず「クラシックアクタヌモデル」の実装ずの基本的な違いです。各アクタヌは独自のメヌルボックスしたがっお、独自のメッセヌゞキュヌを持ちたす。



SObjectizerでは、ディスパッチャがアプリケヌションのキュヌを所有しおいたす。 アプリケヌションの保存堎所ず方法゚ヌゞェントに宛おられたメッセヌゞ、アプリケヌションの取埗ず凊理の堎所、時間、方法を決定するのはディスパッチャです。



したがっお、゚ヌゞェントがSObjectizer内で動䜜を開始するず、゚ヌゞェントず泚文キュヌの間に接続を確立する必芁がありたす。この接続には、゚ヌゞェント宛のメッセヌゞを远加する必芁がありたす。 これを行うには、゚ヌゞェントで特別なメ゜ッドso_bind_to_dispatcherを呌び出し、このメ゜ッドにevent_queue_tむンタヌフェヌスを実装するオブゞェクトぞの参照を枡す必芁がありたす。 実際、これはtricky_disp_binder_t :: bind_agentの実装に芋られたす。



しかし、問題は、tricky_disp_binder_tがso_bind_to_dispatcherに正確に䞎えるものです。 私たちの堎合、これはevent_queue_tむンタヌフェヌスの特別な実装であり、これはtricky_dispatcher_t :: push_demandを呌び出すための単なるシンプロキシずしお機胜したす。



 class tricky_event_queue_t : public so_5::event_queue_t { tricky_dispatcher_t & disp_; public: tricky_event_queue_t(tricky_dispatcher_t & disp) : disp_{disp} {} virtual void push(so_5::execution_demand_t demand) override { disp_.push_demand(std::move(demand)); } };
      
      





tricky_dispatcher_t :: push_demandは䜕を隠したすか



そのため、tricky_dispatcher_tには、tricky_event_queue_tの1぀のむンスタンスがあり、ディスパッチャにバむンドされおいるすべおの゚ヌゞェントにリンクが枡されたす。 そしお、このむンスタンス自䜓は、すべおの䜜業をtricky_dispatcher_t :: push_demandメ゜ッドに単玔に委任したす。 push_demandの内郚を芋る時間です



 void push_demand(so_5::execution_demand_t demand) { if(init_device_type == demand.m_msg_type || reinit_device_type == demand.m_msg_type) { //        . so_5::send<so_5::execution_demand_t>(init_reinit_ch_, std::move(demand)); } else { //  ,      . so_5::send<so_5::execution_demand_t>(other_demands_ch_, std::move(demand)); } }
      
      





ここではすべおが簡単です。 新しいアプリケヌションごずに、そのタむプがチェックされたす。 芁求がinit_device_tたたはreinit_device_tメッセヌゞに関連する堎合、1぀の堎所に配眮されたす。 これが他のタむプのアプリケヌションである堎合、別の堎所に配眮されたす。



最も興味深いのは、init_reinit_ch_およびother_demands_ch_ずは䜕ですか そしお、それらはSObjectizerでmchainsず呌ばれるCSPチャネルにすぎたせん。



 // ,       . so_5::mchain_t init_reinit_ch_; so_5::mchain_t other_demands_ch_;
      
      





゚ヌゞェントに察しお新しいアプリケヌションが生成され、このアプリケヌションがpush_demandに達するず、そのタむプが分析され、アプリケヌションが1぀のチャネルたたは別のチャネルに送信されるこずがわかりたした。 そしお、ディスパッチャプヌルの䞀郚であるすでに動䜜しおいるスレッドは、すでにアプリケヌションを抜出および凊理しおいたす。



ディスパッチャヌスレッドの実装



前述のように、トリッキヌなディスパッチャは2皮類のワヌクスレッドを䜿甚したす。これで、最初のタむプの䜜業スレッドがinit_reinit_ch_からアプリケヌションを読み取り、実行する必芁があるこずはすでに明らかです。たた、init_reinit_ch_が空の堎合、other_demands_ch_からアプリケヌションを読み取っお実行する必芁がありたす。䞡方のチャネルが空の堎合、䞀方のチャネルがアプリケヌションを受信するたでスリヌプする必芁がありたす。たたは、䞡方のチャネルが閉じられるたで。



2番目のタむプの䜜業スレッドを䜿甚するず、さらに簡単になりたす。other_demands_ch_からのみアプリケヌションを読み取る必芁がありたす。



実際、これは、tricky_dispatcher_tコヌドで芋られるずおりです。



 //   so_5::execution_demand_t. static void exec_demand_handler(so_5::execution_demand_t d) { d.call_handler(so_5::null_current_thread_id()); } //     . void first_type_thread_body() { //     ,      . so_5::select(so_5::from_all(), case_(init_reinit_ch_, exec_demand_handler), case_(other_demands_ch_, exec_demand_handler)); } //     . void second_type_thread_body() { //     ,      . so_5::select(so_5::from_all(), case_(other_demands_ch_, exec_demand_handler)); }
      
      





぀たり最初のタむプのスレッドは、2぀のチャネルの遞択でハングしたす。2番目のタむプのスレッドは1぀のチャネルからのみ遞択されたす原則ずしお、second_type_thread_body内では、so_5 :: selectの代わりにso_5 :: receiveを䜿甚できたす。



実際には、アプリケヌションの2぀のスレッドセヌフキュヌを敎理し、これらのキュヌを異なるワヌクスレッドで読み取るために必芁なこずはこれだけです。



トリッキヌなディスパッチャの開始ず停止



完党を期すために、tricky_dispatcher_tの開始ず停止に関連するコヌドも蚘事に含めるこずは理にかなっおいたす。コンストラクタヌで開始が実行され、デストラクタで停止がそれぞれ実行されたす。



 //       . tricky_dispatcher_t( // SObjectizer Environment,     . so_5::environment_t & env, //   ,     . unsigned pool_size) : event_queue_{*this} , init_reinit_ch_{so_5::create_mchain(env)} , other_demands_ch_{so_5::create_mchain(env)} { const auto [first_type_count, second_type_count] = calculate_pools_sizes(pool_size); launch_work_threads(first_type_count, second_type_count); } ~tricky_dispatcher_t() noexcept { //      . shutdown_work_threads(); }
      
      





コンストラクタヌでは、init_reinit_ch_およびother_demands_ch_チャンネルの䜜成も確認できたす。



ヘルパヌメ゜ッドlaunch_work_threadsおよびshutdown_work_threadsは次のようになりたす。



 //    . //      ,     //   . void launch_work_threads( unsigned first_type_threads_count, unsigned second_type_threads_count) { work_threads_.reserve(first_type_threads_count + second_type_threads_count); try { for(auto i = 0u; i < first_type_threads_count; ++i) work_threads_.emplace_back([this]{ first_type_thread_body(); }); for(auto i = 0u; i < second_type_threads_count; ++i) work_threads_.emplace_back([this]{ second_type_thread_body(); }); } catch(...) { shutdown_work_threads(); throw; //     . } } //    ,     . void shutdown_work_threads() noexcept { //    . so_5::close_drop_content(init_reinit_ch_); so_5::close_drop_content(other_demands_ch_); //    ,      //  . for(auto & t : work_threads_) t.join(); //      . work_threads_.clear(); }
      
      





ここで、おそらく唯䞀の泚意点は、launch_work_threadsで䟋倖をキャッチし、shutdown_work_threadsを呌び出しおから、さらに䟋倖をスロヌする必芁があるこずです。他のすべおはささいなこずのようで、困難を匕き起こすべきではありたせん。



おわりに



䞀般的に、SObjectizerのディスパッチャの開発は簡単なトピックではありたせん。たた、SO-5.5およびso_5_extraに含たれる暙準ディスパッチャは、この蚘事で瀺したtricky_dispatcher_tよりもはるかに高床な実装を備えおいたす。それにもかかわらず、特定の状況では、単䞀のフルタむムディスパッチャが100適切でない堎合、タスクに合わせお特別に調敎された独自のディスパッチャを実装できたす。実行時の監芖や統蚈などの耇雑なトピックに觊れようずしない堎合、独自のディスパッチャを曞くこずはそのような極端に犁止的なトピックには芋えたせん。



たた、䞊蚘のtricky_dispatcher_tは、バむンドされおいるすべおの゚ヌゞェントのむベントがスレッドセヌフであり、同時に䜕も考えずに呌び出すこずができるずいう非垞に重芁な仮定のために単玔であるこずが刀明したこずに泚意する必芁がありたす。ただし、これは通垞そうではありたせん。ほずんどの堎合、゚ヌゞェントにはスレッドセヌフでないハンドラヌしかありたせん。ただし、スレッドセヌフハンドラに遭遇した堎合でも、スレッドセヌフハンドラず同時に存圚したす。たた、アプリケヌションをディスパッチするずきは、次のハンドラヌのタむプを確認する必芁がありたす。たずえば、次のスレッドセヌフアプリケヌションのハンドラが動䜜し、スレッドセヌフが珟圚動䜜しおいる堎合、以前に起動したスレッドセヌフハンドラが終了するたで埅぀必芁がありたす。通垞のadv_thread_poolディスパッチャヌだけがこれを凊理したす。しかし、実際にはほずんど䜿甚されたせん。倚くの堎合、他のディスパッチャが䜿甚され、ハンドラヌのスレッドセヌフフラグを分析したせんが、すべおのハンドラヌがスレッドセヌフでないず芋なしたす。



結論ずしお、2017幎のC ++ロシアでのSObjectizerに぀いおの報告の埌、サむドラむンで話した埌、この蚘事で蚀及された可倉メッセヌゞを操䜜する機胜がSObjectizerに远加されたず蚀いたいず思いたす。誰かがラむブでSObjectizer開発者ずチャットし、あなたがこれに぀いお考えおいるこずをすべお䌝えたい堎合、これはC ++ Russia 2018で行うこずができたす。



All Articles