SObjectizerシンプルから耇雑ぞ。 パヌトIII

SObjectizerに関する次の蚘事では、開発が進むに぀れおたすたす耇雑になる、最初は単玔な゚ヌゞェントの進化を匕き続き監芖したす。 関心がなくなった保留䞭のメッセヌゞの凊理方法を怜蚎したす。 そしお、階局的な有限状態マシンのいく぀かの機胜を䜿甚したす。









前回の蚘事で 、゚ヌゞェントemail_analyzerが存圚するずいう事実に決着したした。この゚ヌゞェントは、その問題を倚かれ少なかれ確実に解決できるず考えるこずができたす。 ただし、圌自身は、電子メヌルをチェックする3぀の段階を順番に実行したす。最初にヘッダヌをチェックし、次にコンテンツ、次に添付ファむルをチェックしたす。







ほずんどの堎合、これらの各操䜜はCPUバりンドではありたせん。 チェックされたフラグメントメッセヌゞヘッダヌなどから䞀郚の倀を分離する可胜性が非垞に高いため、この倀の有効性をチェックするためにどこかでリク゚ストを行う必芁がありたす。 たずえば、送信者のホスト名がブラックリストに茉っおいるかどうかを確認するためのデヌタベヌス内のク゚リ。 このリク゚ストは実行されたすが、たずえば、メッセヌゞのテキストの内容を個別のキヌフレヌズに解析しお、䜕らかの皮類のスパムマヌカヌの蟞曞を䜿甚しおチェックできるようにするなど、他の操䜜を実行するこずも可胜です。 たたは、添付ファむルにアヌカむブがあるかどうかを確認し、りむルス察策スキャンを開始したす。 䞀般に、電子メヌル分析操䜜を䞊列化するこずは理にかなっおいたす。







操䜜ごずに別々の゚ヌゞェントを䜿甚しおみたしょう。 ぀たり 次の圢匏の゚ヌゞェントを䜜成できたす。







class email_headers_checker : public agent_t { public : struct result { check_status status_ }; /*    */ email_headers_checker( context_t ctx, ... /* -  */ ) {...} virtual void so_evt_start() override { ... /*      */ } ... /* -   */ }; class email_body_checker : public agent_t {...}; class email_attachment_checker : public agent_t {...};
      
      





そのような各゚ヌゞェントは、その操䜜に固有のアクションを実行し、email_analyzerの結果をメッセヌゞずしお送信したす。 email_analyzerは、これらの゚ヌゞェントのむンスタンスを自宅で䜜成し、分析結果を含むそれらからのメッセヌゞを埅぀必芁がありたす。







 void on_load_succeed( const load_email_succeed & msg ) { try { auto parsed_data = parse_email( msg.content_ ); introduce_child_coop( *this, // -checker-      // thread-pool-,     //   . disp::thread_pool::create_disp_binder( "checkers", disp::thread_pool::bind_params_t{} ), [&]( coop_t & coop ) { coop.make_agent< email_headers_checker >( so_direct_mbox(), parsed_data->headers() ); coop.make_agent< email_body_checker >( so_direct_mbox(), parsed_data->body() ); coop.make_agent< email_attach_checker >( so_direct_mbox(), parsed_data->attachments() ); } ); } catch( const exception & ) {...} }
      
      





以前の蚘事を泚意深く読んだ人は、「圌らからのメッセヌゞを埅぀」ずいうフレヌズに泚意する必芁がありたす。 時間制限なしで埅機するのは良くありたせん;それはシステムにぶら䞋がっお䜕もしない゚ヌゞェントを埗る盎接的な方法です。 したがっお、チェッカヌからの回答を埅぀ずき、IO操䜜の結果を埅぀ずきず同じこずを行うのは理にかなっおいたす䜕らかの皮類の保留䞭の信号を自分自身に送信し、それを受信するず、それ以䞊埅機する意味がないこずがわかりたす。 ぀たり 次のようなものを曞く必芁がありたす。







 //    email_analyzer    . class email_analyzer : public agent_t { //     ,    //   IO-    . struct io_agent_response_timeout : public signal_t {}; //     ,    //     email-. struct checkers_responses_timeout : public signal_t {}; ... virtual void so_evt_start() override { ... /*   IO- */ //      -    IO-. send_delayed< io_agent_response_timeout >( *this, 1500ms ); } ... void on_load_succeed( const load_succeed & msg ) { ... /*     checker- */ //     -    -checker-. send_delayed< checkers_responses_timeout >( *this, 750ms ); } ... void on_checkers_responses_timeout() { ... /*   . */ } };
      
      





ただし、このパスに埓っお、レヌキを螏むこずになりたす。チェッカヌからの応答を埅っお、保留䞭のシグナルio_agent_response_timeoutを簡単に受信できたす。 結局、誰もそれをキャンセルしおいたせん。 そしお、この信号が到着するず、おそらく存圚しないI / Oタむムアりトが存圚するため、吊定的な応答を生成したす。 このレヌキを回避しおみたしょう。







倚くの堎合、非同期メッセヌゞングに慣れおいない開発者は、保留䞭の信号をキャンセルしようずしたす。 これは、send_periodicを参照するずきにタむマヌ識別子を保存する堎合に実行できたす。







 //    email_analyzer    //  io_agent_response_timeout. class email_analyzer : public agent_t { struct io_agent_response_timeout : public signal_t {}; ... virtual void so_evt_start() override { ... /*   IO- */ //  ,      // send_periodic  send_delayed,   period //   0,     , //   . io_response_timer_ = send_periodic< io_agent_response_timeout >( *this, 1500ms, 0ms ); } ... void on_load_succeed( const load_succeed & msg ) { //   . io_response_timer_.reset(); ... /*     checker- */ //     -    -checker-. send_delayed< checkers_responses_timeout >( *this, 750ms ); } ... //       -  IO-. timer_id_t io_response_timer_; };
      
      





残念ながら、この単玔な方法は垞に機胜するずは限りたせん。 問題は、email_analyzer゚ヌゞェントがこの遅延信号のタむマヌをリセットする少し前に、遅延信号をemail_analyzer゚ヌゞェントに文字通り送信できるこずです。 䜕もする必芁はありたせん-マルチスレッドの驚異、そうです。







email_analyzer゚ヌゞェントは、その䜜業スレッドのコンテキストでon_load_succeedにアクセスでき、タむマヌのreset呌び出しを入力するこずさえできたす...しかし、その埌、匷制的に削陀され、制埡スレッドはSObjectizerタむマヌスレッドを受信し、遅延信号が送信されたす。 その埌、email_analyzer゚ヌゞェントの䜜業スレッドが再び制埡を受け取り、タむマヌのresetメ゜ッドが既に送信されたシグナルをキャンセルしたす。 ただし、信号はすでに゚ヌゞェントのメッセヌゞキュヌにあり、誰からも送信されたせん。メッセヌゞぱヌゞェントぞのキュヌに到着したため、そこから削陀するこずはできたせん。







この状況で最悪なのは、同様の゚ラヌがずきどき発生するこずです。 䜕のために、䜕が正確に起こっおおり、䜕が正確に゚ラヌであるかを理解するこずは困難です。 したがっお、保留䞭のメッセヌゞをキャンセルしおも、送信されないずいう保蚌はありたせん。







したがっお、保留䞭のメッセヌゞを誀っおキャンセルした堎合、どうすればよいですか







たずえば、゚ヌゞェントの状態を䜿甚できたす。 email_analyzerがIO゚ヌゞェントからの応答を埅っおいるずき、それは1぀の状態にありたす。 IO゚ヌゞェントからの応答が到着するず、email_analyzer゚ヌゞェントは異なる状態になり、チェッカヌからの応答を埅ちたす。 なぜなら 2番目の状態では、email_analyzerはio_agent_response_timeoutシグナルに眲名されおいないため、このシグナルは単に無芖されたす。







email_analyzer゚ヌゞェントに状態を導入するず、次のようになりたす。







 //    email_analyzer   //  . class email_analyzer : public agent_t { struct io_agent_response_timeout : public signal_t {}; struct checkers_responses_timeout : public signal_t {}; // ,       IO-. state_t st_wait_io{ this }; // ,        checker-. state_t st_wait_checkers{ this }; ... virtual void so_define_agent() override { //        . //  ,    ,    //   –    state_t. st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) .event< io_agent_response_timeout >( &email_analyzer::on_io_timeout ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .event< checkers_responses_timeout >( &email_analyzer::on_checkers_timeout ); } ... };
      
      





ただし、SObjectizerを䜿甚するずさらに簡単に実行できたす。゚ヌゞェントが特定の状態に留たるように時間制限を蚭定できたす。 この制限が切れるず、゚ヌゞェントは匷制的に別の状態になりたす。 ぀たり 次のように曞くこずができたす。







 //    email_analyzer     //      . class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_io_timeout{ this }; state_t st_wait_checkers{ this }; state_t st_checkers_timeout{ this }; ... virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) //   . .time_limit( 1500ms, st_io_timeout ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .time_limit( 750ms, st_checkers_timeout ); } };
      
      





しかし、ある状態で費やす時間を制限するだけでは十分ではありたせん。 この時間が経過したら、ただ䜕らかのアクションをずる必芁がありたす。 どうやっおやるの







状態゚ントリハンドラのようなものを䜿甚したす。 ゚ヌゞェントが特定の状態に入るず、ナヌザヌがそのような機胜を割り圓おた堎合、SObjectizerはこの状態に入るハンドラヌ関数を呌び出したす。 これは、ハンドラヌを切断しおst_io_timeoutを入力できるこずを意味したす。これにより、負の結果のcheck_resultが送信され、゚ヌゞェントが終了したす。







 st_io_timeout.on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } );
      
      





st_checkers_timeoutぞの入力でたったく同じハンドラヌを切断したす。 そしお以来 これらのハンドラヌ内のアクションは同じであるため、別のemail_analyzer゚ヌゞェントメ゜ッドに入れお、このメ゜ッドをst_io_timeout状態ずst_checkers_timeout状態の䞡方の入力ハンドラヌずしお指定できたす。







 class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_io_timeout{ this }; state_t st_wait_checkers{ this }; state_t st_checkers_timeout{ this }; ... virtual void so_define_agent() override { ... st_io_timeout .on_enter( &email_analyzer::on_enter_timeout_state ); ... st_checkers_timeout .on_enter( &email_analyzer::on_enter_timeout_state ); }; ... void on_enter_timeout_state() { send< check_result >( reply_to_, email_file_, check_status::check_failure ); so_deregister_agent_coop_normally(); } };
      
      





しかし、それだけではありたせん。 ゚ヌゞェントの状態ずその機胜のトピックに觊れたので、さらに開発しおemail_analyzerコヌドをリファクタリングできたす。







コヌドでは、check_resultメッセヌゞの送信ず゚ヌゞェントの協力の登録解陀ずいう2぀のアクションがしばしば重耇しおいるこずに気付くのは簡単です。 そのような耇補は良くありたせん;それを取り陀くべきです。







実際、email_analyzer゚ヌゞェントの䜜業は、゚ヌゞェントが次の2぀の状態のいずれかになるこずを保蚌するために削枛されたすすべおが正垞に終了し、肯定的な結果が送信された埌、䜜業を​​完了するか、たたはすべおが゚ラヌで終了した堎合、吊定的な結果を送信する必芁がありたす同じように、゚ヌゞェントをシャットダりンしたす。 そのため、2぀の゚ヌゞェント状態st_successおよびst_failureを䜿甚しお、コヌドでこれを盎接衚珟したしょう。







 //    email_analyzer    //  st_success  st_failure. class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_wait_checkers{ this }; state_t st_failure{ this }; state_t st_success{ this }; ... virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) //   . .time_limit( 1500ms, st_failure ); st_wait_checkers .event( &email_analyzer::on_header_check_result ) .event( &email_analyzer::on_body_check_result ) .event( &email_analyzer::on_attach_check_result ) .time_limit( 750ms, st_failure ); st_failure .on_enter( [this]{ send< check_result >( reply_to_, email_file_, status_ ); so_deregister_agent_coop_normally(); } ); st_success .on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::safe ); so_deregister_agent_coop_normally(); } ); }; ... //        . check_status status_{ check_status::check_failure }; };
      
      





これにより、゚ヌゞェントコヌドの状態を倉曎するだけで、䜕らかの方法で゚ヌゞェントをシャットダりンできたす。







 void on_load_failed( const load_email_failed & ) { st_failure.activate(); } void on_checker_result( check_status status ) { //        . if( check_status::safe != status ) { status_ = status; st_failure.activate(); } else { ++checks_passed_; if( 3 == checks_passed_ ) //   .     //  . st_success.activate(); } }
      
      





しかし、さらに先ぞ進むこずができたす。 st_failureおよびst_success状態の堎合、これらの状態のいずれかを入力するずきに実行する必芁がある共通アクションが1぀ありたす-so_deregister_agent_coop_normallyを呌び出したす。 たた、これらの状態の䞡方が゚ヌゞェントをシャットダりンする責任があるため、これは偶然ではありたせん。 その堎合、ネストされた状態を䜿甚できたす。 ぀たり st_finiure状態を導入したす。st_failureおよびst_successはサブ状態になりたす。 st_finishingを入力するず、so_deregister_agent_coop_normallyが呌び出されたす。 たた、st_failureずst_successを入力するず、察応するメッセヌゞのみが送信されたす。







なぜなら st_failure状態ずst_success状態はst_finishingにネストされおいるため、これらのいずれかを入力するず、st_finishingの゚ントリハンドラヌが最初に呌び出され、その埌でのみst_failureたたはst_successの゚ントリハンドラヌが呌び出されたす。 st_finishingを入力するず゚ヌゞェントの登録が解陀され、st_failureたたはst_successを入力するずcheck_resultメッセヌゞが送信されたす。







ネストされた状態、状態゚ントリハンドラ、状態で費やされる時間の制限に぀いお蚀及するずきに読者の1人が䞍安を感じる堎合、階局型有限状態マシンのトピックに関する基本的な蚘事の1぀を読むのが理にかなっおいたす David Harel、StatechartsA visual formatism耇雑なシステム甚。 コンピュヌタプログラミングの科孊 。 SObjectizerの゚ヌゞェント状態は、そこに蚘述されおいるかなりの量の機胜を実装しおいたす。







これらすべおの倉換の結果ずしお、email_analyzer゚ヌゞェントは以䞋に瀺す圢匏を取りたす。







 //    email_analyzer,      //  email-    . class email_analyzer : public agent_t { state_t st_wait_io{ this }; state_t st_wait_checkers{ this }; state_t st_finishing{ this }; state_t st_failure{ initial_substate_of{ st_finishing } }; state_t st_success{ substate_of{ st_finishing } }; public : email_analyzer( context_t ctx, string email_file, mbox_t reply_to ) : agent_t(ctx), email_file_(move(email_file)), reply_to_(move(reply_to)) {} virtual void so_define_agent() override { st_wait_io .event( &email_analyzer::on_load_succeed ) .event( &email_analyzer::on_load_failed ) //  -   . .time_limit( 1500ms, st_failure ); st_wait_checkers .event( [this]( const email_headers_checker::result & msg ) { on_checker_result( msg.status_ ); } ) .event( [this]( const email_body_checker::result & msg ) { on_checker_result( msg.status_ ); } ) .event( [this]( const email_attach_checker::result & msg ) { on_checker_result( msg.status_ ); } ) //   -  . .time_limit( 750ms, st_failure ); //  ,     , //     . st_finishing.on_enter( [this]{ so_deregister_agent_coop_normally(); } ); st_failure.on_enter( [this]{ send< check_result >( reply_to_, email_file_, status_ ); } ); st_success.on_enter( [this]{ send< check_result >( reply_to_, email_file_, check_status::safe ); } ); } virtual void so_evt_start() override { //      ,  //      . st_wait_io.activate(); //       IO-   //  email . send< load_email_request >( so_environment().create_mbox( "io_agent" ), email_file_, so_direct_mbox() ); } private : const string email_file_; const mbox_t reply_to_; //      ,   //      st_failure. check_status status_{ check_status::check_failure }; int checks_passed_{}; void on_load_succeed( const load_email_succeed & msg ) { //   ..    . st_wait_checkers.activate(); try { auto parsed_data = parse_email( msg.content_ ); introduce_child_coop( *this, // -checker-      // thread-pool-,     //   . disp::thread_pool::create_disp_binder( "checkers", disp::thread_pool::bind_params_t{} ), [&]( coop_t & coop ) { coop.make_agent< email_headers_checker >( so_direct_mbox(), parsed_data->headers() ); coop.make_agent< email_body_checker >( so_direct_mbox(), parsed_data->body() ); coop.make_agent< email_attach_checker >( so_direct_mbox(), parsed_data->attachments() ); } ); } catch( const exception & ) { st_failure.activate(); } } void on_load_failed( const load_email_failed & ) { st_failure.activate(); } void on_checker_result( check_status status ) { //        . if( check_status::safe != status ) { status_ = status; st_failure.activate(); } else { ++checks_passed_; if( 3 == checks_passed_ ) //   .     //  . st_success.activate(); } } };
      
      





さお、結果のemail_analyzer゚ヌゞェントのコヌドを芋お、単玔だが重芁な質問を自問するのは理にかなっおいたす。







明らかに、この質問に察する答えはそれほど単玔ではありたせん。 しかし、これに぀いおは次の蚘事で説明したす。 ここでは、゜フトりェアシステムの開発にSObjectizerを10幎以䞊䜿甚した埌に孊んだ教蚓のトピックを取り䞊げたす。







蚘事に瀺されおいる䟋の゜ヌスコヌドは、 このリポゞトリにありたす 。








All Articles