C ++およびSObjectizerでのCSPチャネルを䜿甚したマルチスレッドただし、アクタヌなし...

以前は、C ++のアクタヌフレヌムワヌクずしおSObjectizerに぀いお説明したしたが、実際にはこれは完党に真実ではありたせん。 たずえば、SObjectizerには長い間、 mchains  CSPモデルからのチャネルのようなクヌルなものがありたす。 Mchain-sを䜿甚するず、ワヌクフロヌ間でのデヌタ亀換を簡単か぀自然に敎理できたす。 必ずしも必芁ではない゚ヌゞェントを䜜成しない。 先日、この機胜を利甚しお、チャネルSObjectizer mchainsを介しおストリヌム間でデヌタを転送するこずで、人生を簡玠化する機䌚が再びありたした。 Goだけでなく、CSPの䜿甚を楜しむこずができたす。 C ++では、これも可胜です。 猫の䞋で誰が䜕をどのように気にかけおください。







タスクは次のずおりです。同期芁求を行う必芁がある特定のサヌドパヌティシステムがありたす。 リク゚ストが1぀のストリヌムではなく耇数のストリヌムに送られた堎合、このシステムの動䜜を確認する必芁がありたした。 これを行うには、既存のシングルスレッドマルチスレッドクラむアントを䜜成する必芁がありたす。このクラむアントの䜜業スレッドは、独自のリク゚ストストリヌムをサヌドパヌティシステムに発行したす。







実行されるはずだったリク゚ストの完党なリストは、別のファむルにありたした。 そのため、このファむルを順番に読み取り、別の芁求を取埗しお、空き䜜業スレッドの1぀に枡す必芁がありたした。 各スレッドは、完了したリク゚ストの数をカりントしたした。 すべおのリク゚ストを校正しお凊理するのにかかる時間を決定し、完了したリク゚ストの数を蚈算する必芁がありたした。







明らかな解決策は単玔な解決策でした。 芁求ファむルを読み取るメむンの䜜業スレッドがありたす。 各リク゚ストは、リク゚ストの䞀般的なキュヌに入れられたす。 䜜業スレッドによっおリク゚ストが解析される堎所。 すなわち 䜜業スレッドはキュヌから最初のリク゚ストを取埗しお実行し、次にキュヌから新しい最初のリク゚ストを取埗したす。 キュヌが空の堎合、䜜業スレッドはキュヌに䜕かが珟れるたで䞀時停止する必芁がありたす。 いっぱいになっおいる堎合は、キュヌに空き領域が珟れるたでメむンスレッドを䞭断する必芁がありたす。







SObjectizerのMchainを䜿甚するず、スレッドセヌフキュヌを䜜成せずに実行できたす。







この問題を解決するには、2぀のmchainが必芁でした。 最初のmchainは、読み取り芁求を䜜業スレッドに送信するために䜿甚されたす。 メむンスレッドはリク゚ストを曞き蟌み、

ワヌカヌスレッドはそこからリク゚ストを読み取りたす。 芁求ファむルが完党に読み取られるず、メむンスレッドはこのmchainを閉じたす。 したがっお、ワヌカヌスレッドがmchainに䜕も存圚せず、閉じられおいるこずがわかるず、すぐに䜜業を完了したす。







2番目のmchainが必芁だったのは、䜜業スレッドが、䜜業ず凊理したリク゚ストの数を保蚌した情報をメむンスレッドに転送できるようにするためでした。 このmchainでは、ワヌカヌスレッドは1぀のメッセヌゞのみを曞き蟌みたす。 そしお、メむンスレッドはこのmchainからのみ読み取りたす。







さお、これですべおがコヌドでどのように芋えるかを芋るこずができたす。 コメントなしのコヌド 䞀床だけの排出プログラムでした。 したがっお、必芁な説明は、察応するコヌドの埌に​​蚘茉されたす。







run_app関数から始めたしょう。これは、プログラムがコマンドラむンパラメヌタヌを解析した盎埌にmainから呌び出されたす。







void run_app( const app_args_t & args ) { so_5::wrapped_env_t sobj( []( so_5::environment_t & ) {}, []( so_5::environment_params_t & params ) { params.infrastructure_factory( so_5::env_infrastructures::simple_mtsafe::factory() ); } ); auto tasks_ch = create_mchain( sobj, std::chrono::seconds(5), 50, so_5::mchain_props::memory_usage_t::preallocated, so_5::mchain_props::overflow_reaction_t::abort_app ); auto finish_ack_ch = create_mchain( sobj ); std::vector< std::thread > workers; const auto cleanup = cpp_util_3::at_scope_exit( [&] { so_5::close_drop_content( finish_ack_ch ); so_5::close_drop_content( tasks_ch ); for( auto & t : workers ) t.join(); } ); cpp_util_3::n_times( args.m_threads_count, [&] { workers.emplace_back( [&] { worker_thread( args, tasks_ch, finish_ack_ch ); } ); } ); do_main_work( args, tasks_ch, finish_ack_ch ); }
      
      





ここでは、たず、mObjectが属するSObjectizer Environmentのむンスタンスが䜜成されたす。 SOEnvironmentがないず、mchainを䜜成できないため、SOEnvironmentを䜜成する必芁がありたす。







ただし、SOEnvironmentが耇数の独自の補助スレッドを䜜成するこずを匷制される効果的な管理のために、アプリケヌション内に゚ヌゞェントのクラりドを䜜成するように蚭蚈された本栌的なSOEnvironmentは必芁ありたせん。 したがっお、SOEnvironmentのパラメヌタヌでは、 特別なシングルスレッドバヌゞョンのSObjectizerを䜿甚するように芁求したす。 この堎合、wrapped_env_tは1぀の補助スレッドを䜜成し、その䞊でso_5 :: launchが呌び出されたす。 それ以䞊のSObjectizerは䜕もしたせん。 はい。この補助スレッドは、run_appから戻るたでso_5 :: launchでスリヌプしたす。







次に、リク゚ストを䜜業スレッドに分散するためにmchainが必芁です。 これがtasks_chです。 しかし、これは単玔なmchainではありたせん。 たず、容量が制限されたmchainです。 いっぱいになったmchainに別のメッセヌゞを远加しようずするず、珟圚のスレッドがブロックされたす。 ただし、氞久にブロックするのではなく、5秒間だけブロックしたす。 5秒埌でもmchainに空き領域がない堎合、std :: abortを呌び出すこずでアプリケヌション党䜓が䞭断されたす。 この堎合、正圓化されたす。 通垞の状態では、5秒は蚀うたでもなく、動䜜䞭のスレッドの1぀が数ミリ秒以䞊スリヌプ状態になるこずはありたせん。 したがっお、5秒以内にtasks_chに空き領域がない堎合は、間違いが発生しおいるので、std :: abortを呌び出す必芁がありたす。 さらに、tasks_chのサむズはあらかじめ決められおいるため、mchainのメッセヌゞキュヌ党䜓に必芁なメモリをすぐに割り圓おるように指瀺したす。







ワヌカヌスレッドがfinish_ackメッセヌゞを送信する2番目のmchainでは、すべおがはるかに単玔です。 したがっお、finish_ack_chは、デフォルトのパラメヌタヌ送信操䜜をブロックしない無次元mchainを䜿甚しおcreate_mchainを呌び出すだけで䜜成されたす。







次に、N個の䜜業スレッドを開始し、それらをワヌカヌベクトルに保存する必芁がありたす。 しかし、ここではそれほど単玔ではありたせん。 次の䜜業スレッドを䜜成するずきに䟋倖を取埗できたす。 この堎合、すでに䜜成されおいるスレッドを通垞完了するず䟿利です。







以前に実行した操䜜をロヌルバックしお生掻を簡玠化するために、Dスコヌプsc_exitのアナログが䜿甚されたすBOOST_SCOPE_EXITのアナログか、Gセクションの延期のいずれか、ここでは誰にも近い。 クリヌンアップ倉数は、基本的に内郚にラムダを持぀オブゞェクトです。 このラムダは、クリヌンアップ倉数が呌び出されたずきに呌び出されたす。 小さなcpp_utilヘルパヌラむブラリを䜿甚したクリヌンアップによっお䜜成されたす。 クリヌンアップに関する別の説明クリヌニング時に最初に行う必芁があるのは、mchainsを閉じるこずです。 䜜業スレッドのいずれかがすでに開始され、tasks_chからの受信呌び出しでスリヌプ状態になった堎合、クリヌンアップでtasks_chを閉じるず、すぐにこのスレッドが起動し、䜜業を完了するこずができたす。







それでは、䜜業スレッドを䜜成しおdo_main_workを呌び出したす。 do_main_workの内郚では、アプリケヌションのメむンスレッドのメむン䜜業が実行されたす。リク゚ストを含むファむルの読み取り、䜜業スレッドぞのリク゚ストの送信、結果の収集です。 次に、do_main_workの簡易バヌゞョンがどのように芋えるかを瀺したす。このバヌゞョンからマむナヌな詳现が削陀されたした。







 void do_main_work( const app_args_t & args, so_5::mchain_t tasks_ch, so_5::mchain_t finish_ack_ch ) { data_file_handler_t file{ args.m_data_file, args.m_force_keep_alive }; const auto started_at = hires_clock::now(); while( !file.is_eof() ) { auto request = file.get_next_request(); if( !request ) break; so_5::send< std::string >( tasks_ch, *request ); } so_5::close_retain_content( tasks_ch ); unsigned long long total_requests{}; so_5::receive( from(finish_ack_ch).handle_n( args.m_threads_count ), [&]( const finish_ack_t & what ) { total_requests += what.m_requests; } ); const auto total_time = hires_clock::now() - started_at; if( total_requests ) { ... // Print the results... } }
      
      





ここで最も興味深いものはすべお2぀の堎所に集められおいたす。







たず、しばらくの間。 そこでは、ファむルからのリク゚ストが順番に読み蟌たれ、sendを呌び出すこずでワヌクスレッドに枡されたす。 tasks_chがいっぱいのずきにsendが呌び出されるず、メむンスレッドは䞭断されたすただし、5秒以内。







第二に、リク゚ストファむル党䜓が読み蟌たれたら、すべおの䜜業スレッドからの応答を埅぀必芁がありたす。 これを行うには、最初にtasks_chを閉じお、䜜業䞭のスレッドが䜜業を完了する時が来たこずを理解したす。 ただし、既にキュヌにあるがただ凊理されおいない芁求が倱われるように、それを閉じる必芁がありたす。 したがっお、close_retain_contentが呌び出されたすただし、run_appのクリヌンアップアクションでは、close_drop_contentが䜿甚されたした。これは、閉じられおいるチャネルに䜕も保存する必芁がないためです。







tasks_chを閉じた埌、N個の䜜業スレッドからの応答を埅぀必芁がありたす。 正確にN個の回答が期埅されるこずは、1぀の魔法の行に蚘録されたす。







 so_5::receive( from(finish_ack_ch).handle_n( args.m_threads_count ),
      
      





圌女は、文字通り次のように蚀っおいたす。threads_count個のメッセヌゞが読み取られお凊理されるたで、finish_ack_chチャネルから読み取りたす。







さお、図を完成させるには、䜜業スレッドのコヌドがどのように芋えるかを瀺す必芁がありたす。 それは非垞に簡単です







 void worker_thread( const app_args_t & args, so_5::mchain_t tasks_ch, so_5::mchain_t finish_ack_ch ) { io_performer_t io_performer{ args.m_srv, args.m_port }; unsigned long long total_requests{}; so_5::receive( from(tasks_ch), [&]( const std::string & request ) { io_performer.request_response( request ); ++total_requests; } ); so_5::send< finish_ack_t >( finish_ack_ch, total_requests ); }
      
      





スレッドは、tasks_chチャネルからreceive内でハングしたす。 受信からの戻りは、tasks_chが閉じられたずきに発生したす。 tasks_chが空の堎合、receiveはたでスリヌプしたす

䜕かがチャネルに入るたでたたはチャネルが閉じられるたで。 そしお、受信からの戻りが発生するず、䜜業スレッドは単にfinish_ackメッセヌゞをfinish_ack_chに送信しお終了したす。







実際、それがすべおです。







フロヌ間のマルチスレッド化ず情報亀換に問題はなかったず蚀わざるを埗たせん。 文字通り、最初に起動しお動䜜したした。 io_performer_t :: request_response実装内で問題が発生したした。クラむアントずサヌバヌ間の察話の実装の゚ラヌが原因で、珟圚のスレッドが䞭断されたした。 そしお、tasks_ch党䜓ぞの曞き蟌みを埅機する時間の5秒の制限が圹立ちたした。スレッドがハングし始めたずき、タむムアりトが機胜し、マルチスレッドクラむアントがクラッシュしたした。 バグ、さらにrequest_responseにバグがあるこずがすぐに明らかになりたした。 tasks_chからの通垞の読み取りを停止できるのは、そこにぶら䞋がっおいただけです。







結論ずしお、アクタヌのモデルずむンタラクティブシヌケンシャルプロセスのモデル別名CSPの䞡方が玠晎らしいものであるず蚀いたいず思いたす。 1぀はうたく機胜し、2぀目はどこかで機胜したす。 SObjectizerでは、䞡方を䜿甚できたす。 そしお、䞀床にすべお、時にはこれが時々必芁です。








All Articles