メヌルボックスは、メヌルボックスではありたせん...

SObjectizerに関する最初の蚘事が2016幎の倏に䜜成されたずき、興味のある読者が「 裏偎 」を芋るこずができるように、時間の経過ずずもにその実装の詳现に぀いおも話したす。 本日の蚘事では、SObjectizerの内臓に぀いお説明したす。 mbox-s「メヌルボックス」のメカニズムに぀いお。これは、アクタヌ甚語でぱヌゞェントの盞互䜜甚を敎理するために䜿甚されたす。



なぜmboxが特別なのですか



なぜなら、このメカニズムがSObjectizerの孊習を匕き受ける人に察しお非垞によく䌌た質問をいく぀提起するのか、私たち自身が驚いおいるからです。 SObjectizerの開発者である私たちによく知られ、理解でき、銎染みのあるものは、初心者にずっお決しおそうではないこずが刀明したした。 そうだずすれば、mboxが䜕であり、どのように機胜するかを理解しおみたしょう。 そしお同時に、独自のmboxを䜜成しおみおください。



なぜmboxが必芁なのですか



゚ヌゞェント間の盞互䜜甚を敎理するには、SObjectizerのメヌルボックスが必芁です。 ゚ヌゞェント間の通信は非同期メッセヌゞによっお構築され、これらの同じメッセヌゞをどこかに送信する必芁がありたす。 「どこに正確に」ずいう疑問が生じたす。



埓来のアクタヌモデルでは、メッセヌゞの受信者は受信者アクタヌです。 すなわち アクタヌAがアクタヌBにメッセヌゞを送信するには、アクタヌAがアクタヌBぞのリンクを持っおいる必芁がありたす。受信者アクタヌぞの参照はありたせん-メッセヌゞを送信する方法はありたせん。 1Nメヌリングを実行する必芁がある堎合、送信者にはすべおの受信者ぞのリンクが必芁です。 これは、叀兞的な俳優のモデルに぀いお話す堎合です。



しかし、我々は異なる特異性を持っおいたした通垞、ビヌトは意識を決定し、解決しなければならないタスクず自由に䜿えるツヌルのニヌズに導かれたした。



たず、C ++がありたす。 ゚ヌゞェントBぞのリンクを゚ヌゞェントAに枡さないずいうだけです。 これが通垞のリンクたたは通垞のベアポむンタヌである堎合、゚ヌゞェントBが砎棄されるず、゚ヌゞェントAはBぞの「ダングリング」リンクを持ちたす。したがっお、通垞のリンク/ポむンタヌの代わりに、スマヌトリンク/ポむンタヌを䜿甚する必芁がありたす。 しかし、単玔なスマヌトポむンタヌは良くありたせん。 ゚ヌゞェントAぱヌゞェントBぞのスマヌトポむンタヌを持っおいる限り、゚ヌゞェントBは削陀されたせん぀たり、所有しおいるリ゜ヌスは解攟されたせん。



したがっお、C ++では、スマヌトポむンタヌだけでなく、いく぀かの特別なスマヌトプロキシリンクを䜿甚する必芁がありたす。 ゚ヌゞェントAはBぞのプロキシリンクを持っおいる堎合がありたすが、Aがただプロキシリンクを持っおいる堎合でも、Bを安党に削陀できたす。 さらに、Aは存圚しない゚ヌゞェントBにメッセヌゞを送信しようずする堎合があり、この詊みは壊滅的な結果他の誰かのメモリを砎損したり、C ++でぶら䞋がっおいるリンクにアクセスするずきのようにアプリケヌション党䜓がクラッシュするなどに぀ながるこずはありたせん。



次に、1察Nの盞互䜜甚が非垞に䞀般的でした。 さらに、最初は䞀般的に゚ヌゞェントが盞互䜜甚する唯䞀の方法でした。 したがっお、゚ヌゞェントAから情報を受信する必芁がある゚ヌゞェントBおよびCが、最初に自分自身ぞのリンクを゚ヌゞェントAに送信するこずを匷制されたくありたせんでした。そしお、その゚ヌゞェントAは、モヌドでAからのメッセヌゞを受信したい゚ヌゞェントのリストを個別に維持する必芁がありたした1N



その結果、「メヌルボックス」の抂念がありたした。これは、a゚ヌゞェントが互いに通信するために䜿甚できる非垞にスマヌトなプロキシリンクであり、bモヌド1での盞互䜜甚を簡玠化するメカニズムであるためだけに䜜成されたしたN



mbox-sが存圚する堎合、゚ヌゞェントはメッセヌゞを互いに盎接送信するのではなく、メヌルボックスmbox-sで送信したす。 mboxに送信されたメッセヌゞは、このmboxからのメッセヌゞを賌読しおいる゚ヌゞェントに配信されたす。



したがっお、゚ヌゞェントAが゚ヌゞェントBにメッセヌゞを送信するには、䞡方の゚ヌゞェントが認識しおいるmboxが必芁です。 ゚ヌゞェントAはこのmboxにメッセヌゞを送信し、゚ヌゞェントBはこのmboxからのメッセヌゞをサブスクラむブしたす。 この小さな䟋でわかるように



#include <so_5/all.hpp> class A final : public so_5::agent_t { const so_5::mbox_t to_; public: A(context_t ctx, so_5::mbox_t to) : so_5::agent_t{std::move(ctx)}, to_{std::move(to)} {} virtual void so_evt_start() override { //    B. so_5::send<std::string>(to_, "Hello!"); } }; class B final : public so_5::agent_t { public: B(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event(&B::on_string); } private: void on_string(mhood_t<std::string> cmd) { std::cout << "Message: " << *cmd << std::endl; //   . so_deregister_agent_coop_normally(); } }; int main() { so_5::launch([](so_5::environment_t & env) { //      ,   //  mbox  . env.introduce_coop([&](so_5::coop_t & coop) { //  mbox,     //  A  B. const auto mbox = env.create_mbox(); //    ,    //  mbox  . coop.make_agent<A>(mbox); coop.make_agent<B>(mbox); }); }); return 0; }
      
      





さらに、1Nモヌドでメッセヌゞを送信、受信するこずは、11モヌドでメッセヌゞを送信/受信するこずず倉わりたせん。 䞊蚘の䟋は、゚ヌゞェントAが゚ヌゞェントBずCに同時にメッセヌゞを送信する堎合をどのように探すかを瀺しおいたす。



 #include <so_5/all.hpp> class A final : public so_5::agent_t { const so_5::mbox_t to_; public: A(context_t ctx, so_5::mbox_t to) : so_5::agent_t{std::move(ctx)}, to_{std::move(to)} {} virtual void so_evt_start() override { //    B. so_5::send<std::string>(to_, "Hello!"); } }; class B final : public so_5::agent_t { public: B(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event(&B::on_string); } private: void on_string(mhood_t<std::string> cmd) { std::cout << "(B) Message: " << *cmd << std::endl; //   . so_deregister_agent_coop_normally(); } }; class C final : public so_5::agent_t { public: C(context_t ctx, const so_5::mbox_t & from) : so_5::agent_t{std::move(ctx)} { //       from. so_subscribe(from).event([](mhood_t<std::string> cmd) { //   ,    , //      B. std::cout << "(C) Message: " << *cmd << std::endl; }); } }; int main() { so_5::launch([](so_5::environment_t & env) { //      ,   //  mbox  . env.introduce_coop([&](so_5::coop_t & coop) { //  mbox,     //  A, B, C. const auto mbox = env.create_mbox(); //   ,    //  mbox  . coop.make_agent<A>(mbox); coop.make_agent<B>(mbox); coop.make_agent<C>(mbox); }); }); return 0; }
      
      





mboxはどのように機胜したすか



異なるmbox-sの動䜜は異なりたす:)したがっお、最も広く䜿甚されおいるmbox-sのタむプがどのように機胜するかを知るには、最初にmbox-sの䞀般的な内容に぀いお話す必芁がありたす。



mboxずは䜕ですか



マルチプロデュヌサヌ/マルチコンシュヌマヌ



歎史的に、これはSObjectizer-5に登堎した最初のタむプのmboxです。 誰でもそのようなmboxにメッセヌゞを送信できたす。 誰でもこのmboxからのメッセヌゞを賌読できたす。



マルチプロデュヌサヌ/シングルコンシュヌマヌ



11の察話では、MPSC-mboxを䜿甚できたす。MPSC-mboxでは誰でもメッセヌゞを送信できたすが、MPSC-mboxを所有する1人の゚ヌゞェントはMPSC-mboxからのメッセヌゞをサブスクラむブできたす。



MPSC mboxは、SObjectizer-5のアクティブな䜿甚の開始埌しばらくしおSObjectizer-5に登堎したした。 メッセヌゞが特定の1぀の゚ヌゞェントに宛おられおいる堎合、MPMC-mboxの䜿甚は効果的でないこずが経隓により瀺されおいたす。 そのため、MPSC-mboxは、盞互䜜甚の組織に察する根本的に異なるアプロヌチではなく、コヌドを最適化するための方法であるず蚀えたす。 さらに、ナヌザヌはMPSC-mbox-sを䜜成できたせん。 各゚ヌゞェントのMPSC-mboxは、SObjectizerによっお自動的に䜜成されたす。



so_5_extraラむブラリからの远加のmbox



远加のso_5_extraラむブラリがSObjectizerを介しお構築されたした。これには、SObjectizerをカヌネルに远加するのは䞍合理ず思われるコンポヌネントが含たれおいたす。 いく぀かの远加タむプのmbox-sが含たれおいたす。 䟋





mboxを䜿甚する別の興味深い䟋は、so_5_extra のshutdownerコンポヌネントです。 このコンポヌネントでは、 mboxを䜿甚しお、倧芏暡なSObjectizerアプリケヌションを正しくシャットダりンできるタむミングを決定したす。



ただし、この蚘事ではso_5_extraのmbox-sを詳现に怜蚎したせん。



Multi-Producer / Single-Consumer mboxはどのように機胜したすか



そのため、mbox-sは異なるため、動䜜が異なりたす。 そしお、最も単玔なMPSC-mboxで䜜業の詳现を怜蚎し始めたす。



message_limits これぱヌゞェントを過負荷から保護するメカニズムですやmsg_tracing これはメッセヌゞが受信者に配信される方法を調べる方法ですなどの特定のこずを考慮しない堎合、MPSC-mboxは単玔な「半導䜓」のように機胜したす。送信されたメッセヌゞを受信者゚ヌゞェントに送信しお、受信者がメッセヌゞをキュヌに入れおメッセヌゞ凊理を埅機するようにしたす。



たあ぀たり ここではすべおが非垞に愚かです。送信者からメッセヌゞを受け取り、受信者にそれを枡したした。 これ以䞊。



Multi-Producer / Multi-Consumer mboxはどのように機胜したすか



しかし、MPMC-mboxでは、状況はもう少し耇雑ですここでも、message_limitsやmsg_tracingなどを考慮しおいたせん。 倚くのメッセヌゞ受信者が存圚する可胜性があるため、MPMC-mboxは、サブスクラむバヌずの関連付けコンテナヌを保存したす。 このコンテナ内のキヌはメッセヌゞタむプの識別子であり、芁玠はこのタむプのメッセヌゞのサブスクラむバの実際のリストです。



誰かがタむプMのメッセヌゞを送信するず、MPMC-mboxはそのMのメッセヌゞのサブスクラむバヌのリストを連想コンテナヌで探したす。そのようなリストがある堎合、MPMC-mboxはこのリストを調べお、各サブスクラむバヌにメッセヌゞを送信しようずしたす。



具䜓的には「䞎えようずする」ず蚀われたした。 delivery_filters ぀たり、メッセヌゞの内容に応じお、サブスクラむバヌぞのメッセヌゞ配信を有効たたは無効にするフィルタヌなどがただありたす。 メッセヌゞがサブスクラむブする゚ヌゞェントに配信される前に、MPMC-mboxは、サブスクラむバヌにdelivery_filterがあるかどうかを確認したす。 存圚する堎合、メッセヌゞは最初にフィルタヌに送信されたす。 たた、フィルタヌが゚ヌゞェントぞのメッセヌゞの配信を蚱可する堎合にのみ、このメッセヌゞぱヌゞェントに送信されたす。



䞀般に、MPMC-mboxは特定の皮類のメッセヌゞのサブスクラむバヌのリストを調べ、メッセヌゞの特定のむンスタンスぞの配信がサブスクラむバヌに蚱可されおいる堎合、メッセヌゞはサブスクラむバヌ゚ヌゞェントに送信され、サブスクラむバヌはメッセヌゞの凊理を順番に埅機したす。



通垞のMPMCずMPSC mboxの共通点は䜕ですか



通垞のMPMCおよびMPSC mboxには、それらを結合する1぀の重芁な機胜がありたす。mboxには、mboxに送信されるメッセヌゞの独自のリポゞトリがありたせん。 すなわち 少なくずも通垞のmboxは、メッセヌゞを保存したせん。 䞀般的に。



したがっお、「 mboxがオヌバヌフロヌするたで䜕件のメッセヌゞを保存でき、オヌバヌフロヌするずどうなりたすか 」たたは「 メッセヌゞMが送信された埌にメッセヌゞMをサブスクラむブするず、゚ヌゞェントBはメッセヌゞMを受信したすか 」 MPMCずMPSC mboxは意味がありたせん。 これらのmboxに぀いおは、愚かなこずにメッセヌゞを自身の内郚に保存したせん。メッセヌゞは、メッセヌゞに興味がある゚ヌゞェントにすぐに転送されたす。 たたは、このタむプのメッセヌゞの受信者が珟圚いない堎合、メッセヌゞを無芖したす。



たた、他のタむプのmboxの堎合、送信されたメッセヌゞの内郚ストレヌゞは暙準ではなくルヌルの䟋倖です。 実際、mboxの操䜜はプッシュ原則に基づいおいたす。送信されたメッセヌゞはmboxに「詰め蟌たれ」たす。 そしお、これはおそらく、mboxが誰かにメッセヌゞを配信する唯䞀の機䌚です。 誰もmboxに新しいものが珟れたかどうかを確認するためにmboxを定期的にプルするこずはありたせん。 すなわち 誰も、たったく、誰も、プルモヌドのmboxでは動䜜したせん。



結論ずしお、䞀般的に、mbox-sはメッセヌゞを内郚に保存したせん。



䞖界の状況を耇雑にしおいる゚ヌゞェントには独自のメッセヌゞキュヌがない



SObjectizerに盎面した開発者は、mboxがメッセヌゞを保存しないこずをすぐに認識し始めるため、mboxの容量に぀いお質問するこずは意味がありたせん。 しかし、メッセヌゞはmboxには保存されず、゚ヌゞェントず共に保存されるため、゚ヌゞェントのメッセヌゞキュヌの容量に関する質問が始たりたす...



そしお、ここで新芏参入者はもう䞀぀の啓瀺を受け、おそらく倱望するでしょうSObjectizerでは、゚ヌゞェントは䞀般的な堎合、独自のメッセヌゞキュヌを持っおいたせん。



行くぞ すべおではありたせん:)



実際、SObjectizerの゚ヌゞェントのメッセヌゞキュヌはディスパッチャによっお制埡されたす。 ゚ヌゞェントがメッセヌゞを凊理する䜜業コンテキストを゚ヌゞェントに提䟛するのはディスパッチャです。 そしお、結果ずしお、凊理を埅機しおいるメッセヌゞのストレヌゞを線成するのはディスパッチャです。



たずえば、one_thread最も䞀般的に䜿甚されるものの1぀のようなディスパッチャがありたす。 このディスパッチャに関連付けられおいるすべおの゚ヌゞェントは、単䞀の共通の䜜業スレッドで動䜜したす。 そしお、すべおの゚ヌゞェントのすべおのメッセヌゞは、1぀の共通メッセヌゞキュヌに保存されたす。 䜜業スレッドはこのキュヌから次のメッセヌゞを取埗し、受信゚ヌゞェントに枡し、次のメッセヌゞを取埗したす。



active_groupタむプのディスパッチャは、゚ヌゞェントのグルヌプを1぀の共通の䜜業スレッドに関連付けるこずができる同様の方法で動䜜したす。 そしお、この䜜業スレッド䞊のすべおの゚ヌゞェントは、共通のメッセヌゞキュヌを䜿甚したす。



厄介な状況は、thread_poolおよびadv_thread_poolディスパッチャヌの堎合です。 そこで、゚ヌゞェントのFIFOキュヌのパラメヌタヌを蚭定できたす。 それらの1぀は、゚ヌゞェントが䜿甚するキュヌです。 ゚ヌゞェントに独自のキュヌを持たせるこずができたす。このキュヌには、この特定の゚ヌゞェントに宛おられたメッセヌゞのみが存圚したす。 たた、同じ協力関係にある゚ヌゞェントが共通のメッセヌゞキュヌを共有するようにできたす。



゚ヌゞェントの優先順䜍をサポヌトするディスパッチャでさらに楜しくなりたす。 たずえば、ディスパッチャprio_one_thread :: strictly_ordered。 そこでは、同じ優先床を持぀すべおの゚ヌゞェントに共通のメッセヌゞキュヌが1぀ありたす。 ただし、優先順䜍が異なる゚ヌゞェントの堎合、メッセヌゞキュヌは異なりたす。



芁するに、最終行䞀般的に、mboxぱヌゞェントにメッセヌゞを送信し、゚ヌゞェントは既に適切なキュヌにメッセヌゞを保存しおいるディスパッチャにメッセヌゞを送信したす。 したがっお、ここでも、䞀般的な堎合、mbox-sも゚ヌゞェントにもメッセヌゞ甚のストレヌゞがありたせん。



たた、独自のmboxを䜜成するのはどれくらい難しいですか



message_limits、delivery_filters、msg_tracingおよびその他のニュアンスをサポヌトしお「すべおのルヌルで」それを行う堎合、それは非垞に困難です。 たずえば、 so_5_extraのretain_msg mboxの実装の内臓を芋お、これがどれだけ怖いのかを芋るこずができたす:)



ただし、特定のタスクのために独自のmboxが実行された堎合、すべおがそれほどひどいものになるこずはありたせん。 MPSC-mboxを小さな䟋ずしお䜜成しおみたしょう。これにより、゚ヌゞェントが頻繁にメッセヌゞを受信するのを防ぐこずができたす。 たずえば、M1メッセヌゞの埌に250ミリ秒未満でM2メッセヌゞが到着した堎合、そのメッセヌゞは砎棄されたす。 M1が250ミリ秒以䞊経過した埌、M2が受信者に配信されたす。



必芁な説明



それでは、コヌド名anti-jitter-mboxで独自のmboxを䜜成しおみたしょう。 これはMPSC-mboxになり、特定の゚ヌゞェントに関連付ける必芁がありたす。



私たちの生掻を簡玠化するために、MPSC-mboxの独自の完党な実装は䜜成したせん。 代わりに、各゚ヌゞェント甚の既補のMPSC-mboxを䜿甚したす。 anti-jitter-mboxがMPSC-mboxコンストラクタヌに属する゚ヌゞェントを、anti-jitter-mboxのコンストラクタヌに枡すだけです。



独自のクラスanti_jitter_mboxを定矩する必芁がありたす。これは特別なクラスso_5 :: absctract_message_mbox_tの子孫でなければなりたせん。 このクラスでは、absctract_message_mbox_tにある玔粋な仮想メ゜ッドをオヌバヌラむドする必芁がありたす。 SObjectizerバヌゞョン5.5。*では、次のメ゜ッドがありたす。



id 。 䞀意のmbox IDを返す必芁がありたす。 メッセヌゞ配信は、実際にはコンストラクタで枡されるMPSC-mbox゚ヌゞェントによっお実行されるため、この特定のMPSC-mboxのIDを返したす。 すなわち ここでは、単に珟圚のMPSC-mboxに䜜業を委任したす。



subscribe_event_handler 。 このメ゜ッドは、゚ヌゞェントがタむプTのメッセヌゞをサブスクラむブするずきに呌び出されたす。このメ゜ッドにタむプTを登録したす。あるタむプMのメッセヌゞがmboxに到着したずきに、゚ヌゞェントがサブスクラむブしおいるかどうかを確認できるようにしたす。 眲名されおいる堎合、メッセヌゞの配信を詊みるこずができたすそれに応じお、最埌の配信の時刻を修正する必芁がありたす。 眲名されおいない堎合、メッセヌゞは無芖する必芁がありたす。



unsubscribe_event_handlers 。 subscribe_event_handlerずは察照的に、このメ゜ッドは、゚ヌゞェントがタむプTのメッセヌゞをサブスクラむブ解陀するずきに呌び出されたす。このメ゜ッドでタむプTの登録をキャンセルしたす。



query_name 。 このメ゜ッドは、mboxの文字列名を返す必芁がありたす。 このメ゜ッドは、デバッグず蚺断の目的に圹立ちたす。 たずえば、SObjectizerは、発生した゚ラヌに関するメッセヌゞを生成するずきにこのメ゜ッドをプルできたす。



タむプ 。 このメ゜ッドは、mboxタむプを返したす。mboxがマルチプロデュヌサヌ/マルチコンシュヌマヌか、マルチプロデュヌサヌ/シングルコンシュヌマヌかを返したす。 このメ゜ッドは、特定のアクションを実行できるかどうかを確認するためにSObjectizerによっお呌び出されたす。 たずえば、 倉曎可胜なメッセヌゞはMPSC mboxにのみ送信できたす 。



do_deliver_message 。 このメ゜ッドは、受信゚ヌゞェントにメッセヌゞを送信したす。 このメ゜ッドでは、送信されたメッセヌゞのタむプが登録されおいるかどうかを確認する必芁がありたす。 そうでない堎合、メッセヌゞは無芖されたす。 登録されおいお、最埌にメッセヌゞが配信されおから十分な時間が経過した堎合、メッセヌゞを受信者に配信する必芁がありたすこの堎合、配信時間を修正したす。 配信自䜓は、実際のMPSC-mbox゚ヌゞェントに委任されたす。



do_deliver_service_request 。 このメ゜ッドはdo_deliver_messageに䌌おいたすが、゚ヌゞェントAが゚ヌゞェントBに同期リク゚ストを行ったずきに呌び出されたす぀たり、send_messageの代わりにrequest_futureたたはrequest_valueが䜿甚されたす。 簡単にするため、anti-jitter-mboxの同期リク゚スト機胜はサポヌトしたせん。



set_delivery_filterおよびdrop_delivery_filter これらのメ゜ッドは、メッセヌゞ配信フィルタヌを蚭定および削陀するために䜿甚されたす。 配信フィルタヌはMPSC-mbox向けではないため、この䟋ではこの機胜をサポヌトしたせん。



いく぀かのabstract_message_mbox_tメ゜ッドの䞀貫性の説明



この䟋の実装では、do_deliver_messageおよびdo_deliver_service_requestメ゜ッドが定数ずしお宣蚀されおいるこずがわかりたす。 しかし、なぜなら do_deliver_messageでは、anti-jitter-mboxの内郚状態を倉曎する必芁がありたす。次に、mboxクラスの説明でこの同じ状態を可倉ずしおマヌクする必芁がありたす。



これは、abstract_message_mbox_tクラスのむンタヌフェヌスの圢成における叀代のアヌキテクチャの誀算の結果です。 このクラスが䜕幎も前に圢成されたずき、誰かが独自のタむプのmboxを䜜成する必芁があるずは考えおいたせんでした。



1幎半前に必芁であるだけでなく、時には非垞に必芁であるこずが刀明したずき、SObjectizer-5.5ブランチ内の互換性を砎るか、すべおをそのたたにしお、将来のメゞャヌリリヌスでabstract_message_mbox_tむンタヌフェヌスを倉曎するずいう遞択に盎面したしたSObjectizer-5.6など。 同じブランチ内のリリヌス間の互換性を維持するこずに぀いおは非垞に重芁なので、SObjectizer-5.5ではすべおをそのたたにするこずにしたした。 そのため、独自のmboxを実装する堎合、いく぀かのabstract_message_mbox_tメ゜ッドの䞍倉性を考慮し、mutableキヌワヌドを䜿甚する必芁がありたす。



独自のanti-jitter-mboxの実装



さお、私たちは自分のmboxがどのようになるかをすでに芋るこずができたす。



mboxが凊理するデヌタから始めたしょう



 using namespace std::chrono; using clock_type = steady_clock; class anti_jitter_mbox : public so_5::abstract_message_box_t { //        ,   //           // . struct data { //      . struct item { //      .  0 , //          . std::size_t subscribers_{0}; //    . //   ,       . std::optional<clock_type::time_point> last_received_{}; }; //   ,     //   . using message_table = std::map<std::type_index, item>; //   mutex    mbox-   // . std::mutex lock_; //  ,    . message_table messages_; }; //  mbox-. //  mbox,       . const so_5::mbox_t mbox_; //     "" . const clock_type::duration timeout_; //   mbox-.  SObjectizer-5.5    //  mutable, ..         // const-. mutable data data_;
      
      





受信者゚ヌゞェントぞのメッセヌゞ配信が行われる実際のmbox、「䞍芁な」メッセヌゞを遮断するための時間しきい倀、および実際にメッセヌゞのタむプず最埌に受信された時間に関する情報が必芁です。 さらに、mboxメ゜ッドは異なる䜜業スレッドで呌び出すこずができ、mboxにスレッドセヌフを提䟛する必芁があるため、ミュヌテックスが必芁です。



ずころで、ほずんどのメ゜ッドでスレッドセヌフであるずいう理由だけで、mboxの内郚ミュヌテックスをキャプチャする必芁がありたす。 私たちの生掻を簡玠化するために、補助テンプレヌトメ゜ッドを䜜成したす。このメ゜ッドは、ミュヌテックスをキャプチャし、キャプチャしたミュヌテックスの䞋で必芁なアクションを実行したす。



  template<typename Lambda> decltype(auto) lock_and_perform(Lambda l) const noexcept { std::lock_guard<std::mutex> lock{data_.lock_}; return l(); }
      
      





原則ずしお、その存圚は必芁ありたせん。 しかし、次の理由でそれを䜿甚するこずにしたした。実装を簡単にするために、䟋倖の安党性などを気にしたせん。 䞀郚のアクション䞭に䟋倖が発生した堎合、アプリケヌション党䜓を䞭断するだけです。 lock_and_performはnoexceptずしおマヌクされ、この動䜜を提䟛したす-ラムダが䟋倖をスロヌするず、C ++ランタむム自䜓がstd :: terminateを呌び出したす。



さお、mbox自䜓の実際の実装を芋るこずができたす



 public: // .    MPSC-mbox,  //      . anti_jitter_mbox( so_5::mbox_t actual_mbox, clock_type::duration timeout) : mbox_{std::move(actual_mbox)} , timeout_{timeout} {} //  ID mbox-.     ID  mbox-. so_5::mbox_id_t id() const override { return mbox_->id(); } //    . void subscribe_event_handler( const std::type_index & msg_type, const so_5::message_limit::control_block_t * limit, so_5::agent_t * subscriber ) override { lock_and_perform([&]{ //      .    //   ,     . auto & msg_data = data_.messages_[msg_type]; msg_data.subscribers_ += 1; //     mbox-. mbox_->subscribe_event_handler(msg_type, limit, subscriber); }); } //   . void unsubscribe_event_handlers( const std::type_index & msg_type, so_5::agent_t * subscriber ) override { lock_and_perform([&]{ //      . //    ,     . auto it = data_.messages_.find(msg_type); if(it != data_.messages_.end()) { auto & msg_data = it->second; --msg_data.subscribers_; if(!msg_data.subscribers_) //    ,    //       . data_.messages_.erase(it); //  mbox      . mbox_->unsubscribe_event_handlers(msg_type, subscriber); } }); } //   mbox-. std::string query_name() const override { return "<mbox:type=anti-jitter-mpsc:id=" + std::to_string(id()) + ">"; } //   mbox-.  ,    . so_5::mbox_type_t type() const override { return mbox_->type(); } //     . void do_deliver_message( const std::type_index & msg_type, const so_5::message_ref_t & message, unsigned int overlimit_reaction_deep ) const override { lock_and_perform([&]{ //       . //    ,      //    . auto it = data_.messages_.find(msg_type); if(it != data_.messages_.end()) { auto & msg_data = it->second; const auto now = clock_type::now(); // ,    . //      (..  last_received_ //  ),   . bool should_be_delivered = true; if(msg_data.last_received_) { should_be_delivered = (now - *(msg_data.last_received_)) >= timeout_; } //  - ,     mbox  //        // . if(should_be_delivered) { msg_data.last_received_ = now; mbox_->do_deliver_message(msg_type, message, overlimit_reaction_deep); } } }); } //    . void do_deliver_service_request( const std::type_index & /*msg_type*/, const so_5::message_ref_t & /*message*/, unsigned int /*overlimit_reaction_deep*/ ) const override { //  ,    so_5::exception_t   //  ,     SObjectizer-. SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support service requests"); } //    MPSC-mbox-  .   //   . void set_delivery_filter( const std::type_index & /*msg_type*/, const so_5::delivery_filter_t & /*filter*/, so_5::agent_t & /*subscriber*/ ) override { SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support delivery filters"); } void drop_delivery_filter( const std::type_index & /*msg_type*/, so_5::agent_t & /*subscriber*/ ) noexcept override { SO_5_THROW_EXCEPTION(so_5::rc_not_implemented, "anti-jitter-mbox doesn't support delivery filters"); } };
      
      





さお、mboxの動䜜をテストするには、互いに類䌌した2぀のテスト゚ヌゞェントを䜜成する必芁がありたす。最初の゚ヌゞェントのみが、通垞のMPSC-mboxからメッセヌゞを受信する必芁がありたす。



 class ordinary_subscriber final : public so_5::agent_t { const std::string name_; public: ordinary_subscriber(context_t ctx, //  ,    . std::string name) : so_5::agent_t{std::move(ctx)} , name_{std::move(name)} { so_subscribe_self().event([&](mhood_t<std::string> cmd) { std::cout << name_ << ": signal received -> " << *cmd << std::endl; }); } // Mbox,      . auto target_mbox() const { return so_direct_mbox(); } };
      
      





そしお、この目的のための2番目の゚ヌゞェントはanti-jitter-mboxを䜿甚したす。



 class anti_jitter_subscriber final : public so_5::agent_t { const std::string name_; const so_5::mbox_t anti_jitter_mbox_; public: anti_jitter_subscriber(context_t ctx, //  ,    . std::string name, //  ,     //  "" . clock_type::duration jitter_threshold) : so_5::agent_t{std::move(ctx)} , name_{std::move(name)} , anti_jitter_mbox_{ new anti_jitter_mbox{so_direct_mbox(), jitter_threshold}} { //     mbox. so_subscribe(anti_jitter_mbox_).event([&](mhood_t<std::string> cmd) { std::cout << name_ << ": signal received -> " << *cmd << std::endl; }); } // Mbox,      . auto target_mbox() const { return anti_jitter_mbox_; } };
      
      





さお、これがテスト実行のためにすべお起動される方法です



 //       . void generate_msg_sequence( so_5::environment_t & env, const so_5::mbox_t & ordinary_mbox, const so_5::mbox_t & anti_jitter_mbox) { std::vector<milliseconds> delays{ 125ms, 250ms, 400ms, 500ms, 700ms, 750ms, 800ms }; for(const auto d : delays) { const std::string msg = std::to_string(d.count()) + "ms"; so_5::send_delayed<std::string>(env, ordinary_mbox, d, msg); so_5::send_delayed<std::string>(env, anti_jitter_mbox, d, msg); } } int main() { //  SObjectizer    . so_5::launch([](so_5::environment_t & env) { //    mbox-.      //     . so_5::mbox_t ordinary, anti_jitter; //    ,      //  mbox. env.introduce_coop([&](so_5::coop_t & coop) { ordinary = coop.make_agent<ordinary_subscriber>( "ordinary-mbox")->target_mbox(); anti_jitter = coop.make_agent<anti_jitter_subscriber>( "anti-jitter-mbox", 250ms)->target_mbox(); }); //     . generate_msg_sequence(env, ordinary, anti_jitter); //       . std::this_thread::sleep_for(1250ms); //    . env.stop(); }); return 0; }
      
      





サンプルを実行した結果、anti-jitter-mboxを䜿甚する゚ヌゞェントは、通垞のmboxを䜿甚する゚ヌゞェントよりも少ないメッセヌゞを凊理するこずがわかりたす。



 通垞のmbox受信信号-> 125ms
anti-jitter-mbox受信信号-> 125ms
通垞のmbox受信した信号-> 250ms
通垞のmbox受信した信号-> 400ms
anti-jitter-mbox受信信号-> 400ms
通垞のmbox信号を受信-> 500ms
通垞のmbox受信信号-> 700ms
anti-jitter-mbox受信信号-> 700ms
通垞のmbox信号受信-> 750ms
通垞のmbox受信した信号-> 800ms 


䟋付きのリポゞトリ



この蚘事の䟋の完党な゜ヌスコヌドは、このリポゞトリにありたす。



゚ピロヌグ



蚘事の終わりに、最埌たで読み通す忍耐を持った読者に感謝したいず思いたす。それはあなたがそれをやった、ありがずう、簡単ではありたせんでした:)



私たちは、ほんの少しのニュヌスを共有したいSObjectizerずso_5_extraの曎新したす。バヌゞョン5.5.20たでのSObjectizer、バヌゞョン1.0.3たでのso_5_extra。SObjectizerは、vcpkg䟝存関係管理システムからも利甚できたす。したがっお、vcpkg install sobjectizerを䜿甚しおSObjectizerをむンストヌルできたす。



たた、来幎、SObjectizerの次のメゞャヌバヌゞョンである、バヌゞョン5.6で䜜業を開始する予定です。バヌゞョン5.6では、互換性のためにプルする叀いロヌドを削陀し、堎合によっおはバヌゞョン5.5ずの互換性を砎壊したす。SObjectizer-5.6のいく぀かの予備的な考慮事項をここに瀺したす。SObjectizerの将来のバヌゞョンで䜕を芋たいか、SObjectizerの開発のどの方向があなたに興味があるかに぀いお、SObjectizerに興味がある人の意芋を聞くこずは玠晎らしいこずです。



誰かがSObjectizerの動䜜の詳现を知りたい堎合は、コメントで質問しおください。答えようずしたす。たた、答えに長いテキストが必芁な堎合は、別の蚘事が芋぀かるかもしれたせん。



All Articles