痛みのないワークスレッド間の情報交換? 私たちを助けるためのCSPチャンネル

マルチスレッドコードの開発は難しい作業です。 本当に複雑です。 幸いなことに、開発者の生活を簡素化するために、タスクベースの並列処理、map-reduce / fork-join、CSP、アクターなどの高レベルの抽象化がかなり前に考案されました。



しかし、C ++ニックネームが通信するプロファイルフォーラムにアクセスすると、多くの人はstd :: mutex + std :: condition_variableのコンパートメント内のstd ::スレッドよりも単純で便利なものの存在に気付いていないように感じます。 。 このカテゴリからの質問は定期的に満たされます。「いくつかのワークフローを開始する必要があります。1つはあれこれで、2つめはあれこれで、3つめはあれこれで。 このように実行し、このようなスレッド間で情報を交換します。 私は正しいことをしていますか?」



明らかに、そのような質問は初心者によって尋ねられます。 しかし、第一に、ソフトウェアの開発における未熟な若者の数は常に大きく、IT業界の魅力の成長に伴い、この数は増え続けています。 初心者がstd :: threadおよびstd :: mutexを知っているのは悲しいことですが、生活を簡素化できる既製のツール(Intel TBB、HPX、QP / C ++、Boost.Fiber、FastFlow、CAF、 SObjectizerなど)。



そして、第二に、そのような質問に対する答えの中で、アドバイスはかなりまれです。「この既製のツールを使えば、ほんの数行でタスクを解決できます」。 多くの場合、人々は、自家製のスレッドセーフなメッセージキュー実装の低レベルの詳細について議論します。



これはすべて、特定のフレームワークがマルチスレッドに関連する小さくて一見単純なタスクを解決するのにどのように役立つかを簡単な例で示すことが理にかなっていることを示唆しています。 C ++でのマルチスレッドアプリケーションの開発を簡素化するツールとしてSObjectizerを開発しているので、今日は、SObjectizerのチャネルに実装されたCSPチャネルが、マルチスレッドコードを記述する際の頭痛から開発者を救う方法を示します。



簡単なデモ



この記事では、簡単なデモを見ていきます。 メインスレッド上の小さなテストアプリケーションは、ユーザーとの「ダイアログ」です。 ユーザーが「exit」という単語を入力すると、アプリケーションは終了します。



アプリケーションには2つの追加のワークフローがあります。 1つは、特定のセンサーの定期的な「問い合わせ」をシミュレートすることです。 2番目のワークフローでは、センサーから「取得」された情報がファイルに「書き込まれ」ます。



当然、センサーとデータファイルの実際の作業は行われず、代わりに、作業スレッドをしばらくブロックするプログラムで遅延が編成されます。 これにより、外部デバイスおよびファイルとの同期操作がシミュレートされます。



読者のこのような粗雑な模倣を混同しないようにしてください。 この記事の目的は、CSPチャネル( SObjectizerではmchainsと呼ばれます)を介したワークフロー間の相互作用を示すことであり、関連するコンテンツでワークスレッドを埋めることではありません。



「指の上」の例の動作原理



したがって、この例では、メインスレッドに加えて、さらに2つの作業スレッドがあります。



meter_reader_threadと呼ぶ最初の作業スレッドは、センサーを「ポーリング」するためのものです。 このスレッドには2つのmchainが必要です。 最初のmchainは、meter_reader_threadスレッド自体にコマンドを送信するために使用されます。 特に、タイプmeter_reader_threadが「ポーリング」を実行するのを受信すると、acquisition_turn型のメッセージがこのタイマーチャネルに配置されます。



2番目のmchainは、センサーから「取得」された2番目の作業スレッドの情報を送信するために、meter_reader_threadに必要です。 file_writer_threadと呼ばれる2番目の作業スレッドは、ファイルに情報を「書き込む」役割を果たします。 2番目の作業スレッドは、mchainからコマンドを読み取って情報を書き込み、それらを「実行」します。 mchainにはコマンドはありませんが、file_writer_threadスレッドは新しいコマンドを待ってスリープしています。



ここにそのような単純なスキームがあります:







メインワークフローでmchainsが閉じられるとすぐに、両方のスレッドの作業は終了します。



簡単なサンプルテキストの解析



簡単な例の完全なソースコードは、作成のために作成されたリポジトリで表示できます。 単純なものから複雑なものへと進みます。 file_writer_threadおよびmeter_reader_threadスレッドの作業を実行する関数から分析を開始します。その後、メイン()関数の実装を確認します。この関数では、マルチスレッドに関連するいくつかのトリックを考慮する必要があります。



関数file_writer_thread()



この例では、file_writer_thread()関数が最も単純です。 全文は次のとおりです。



// ,    . void file_writer_thread( //        . so_5::mchain_t file_write_ch) { //       ,    . //        receive. receive(from(file_write_ch), //         //   write_data. [&](so_5::mhood_t<write_data> cmd) { //    . std::cout << cmd->file_name_ << ": write started" << std::endl; std::this_thread::sleep_for(350ms); std::cout << cmd->file_name_ << ": write finished" << std::endl; }); }
      
      





file_writer_thread()が行うことは、receive()呼び出し内でハングすることだけです。 receive()関数は、メッセージがチャネルに到着するのを待機し、メッセージがチャネルに到着すると、receive()に渡されたハンドラーからこのメッセージのハンドラーを検索します。



この場合、write_dataタイプのメッセージの場合、1つのハンドラーのみが渡されます。 このタイプのメッセージがチャネルに到着すると、このハンドラーが呼び出されます。 実際、このハンドラー内では、すべての「ビジネスロジック」が収集されます。 読み取りデータをファイルに書き込むシミュレーション。



SObjectizerのreceive()関数には2つのバージョンがあります。 この例では使用しなかった最初のバージョンは、チャネルから1つのメッセージのみを待機して取得します。 上記の2番目のバージョンは、チャネルからすべてのメッセージを抽出し、チャネルが閉じられたときにのみ制御を返します。 つまり この場合、file_writer_thread()からの出口は、receive()呼び出しが作業を完了したときにのみ発生します。 これは、誰かがfile_write_chチャネルを閉じたときに起こります。



関数meter_reader_thread()



meter_reader_thread関数はもう少し複雑です:



 //     . void meter_reader_thread( // ,    . so_5::mchain_t timer_ch, // ,        . so_5::mchain_t file_write_ch) { //      . struct acquisition_turn : public so_5::signal_t {}; //   .      . int ordinal = 0; //  . auto timer = so_5::send_periodic<acquisition_turn>(timer_ch, 0ms, 750ms); //       ,    . //        receive. receive(from(timer_ch), //         //   acquire_turn. [&](so_5::mhood_t<acquisition_turn>) { //   . std::cout << "meter read started" << std::endl; std::this_thread::sleep_for(50ms); std::cout << "meter read finished" << std::endl; //      . so_5::send<write_data>(file_write_ch, "data_" + std::to_string(ordinal) + ".dat"); ++ordinal; }); }
      
      





ここでは、まず、センサーの「ポーリング」を模倣するために、時々取得されるacquisition_turn信号のタイプを決定します。



次に、send_periodic()を呼び出して、この同じ定期的なAcquisition_turn信号をトリガーします。 これにより、SObjectizerは750msごとに1回acquisition_turnをtimer_chに送信します。



その後、timer_chチャネルが閉じられたときにのみ終了するreceive()呼び出しを既に知っています。 receive()の内部で、acquisition_turnシグナルハンドラを実装しました。 このハンドラーでは、センサーの「ポーリング」をシミュレートしてから、file_write_chチャネルにwrite_dataメッセージを送信して、file_writer_threadスレッドの「収集」データを書き込むコマンドを与えます。



そのため、meter_reader_threadはreceive()内で常にスリープ状態になり、acquisition_turnを受信すると定期的に起動し、次にwrite_dataメッセージをfile_write_ch(つまりfile_writer_threadスレッド)に送信し、次のAcquisition_turnまで再びスリープします。 または、timer_chが閉じられるまで。



メイン関数()



メイン()コードを調べる前に、このコードのどの部分が明確ではないかを説明せずに、いくつかの小さな微妙な点を説明する必要があります。



スレッドとCSPチャネルで作業するときに解決しなければならない主な問題は、作業スレッドが正しくタイムリーに完了することです。 つまり std :: threadのインスタンスを作成し、それを使用して作業スレッドを開始する場合、std :: thread :: join()を呼び出して作業スレッドの終了を待機する必要があります(ここでは分離されたスレッドは使用されません)。 最も簡単な方法は、main()関数の最後でstd :: thread :: join()を手動で呼び出すことです。 次のようなもの:



 int main() { ... std::thread file_writer{file_writer_thread}; ... file_writer.join(); }
      
      





しかし、悪いことは、そのような素朴なアプローチでは、例外やスコープからの早すぎる終了(通常の戻りなど)から保護されないことです。



ここで、ある種のヘルパークラスを使用すると、デストラクタでstd :: thread :: join()を呼び出すことができます。 たとえば、次のようなことができます。



 class auto_joiner { std::thread & t_; ... //   /. public: auto_joiner(std::thread & t) : t_{t} {} ~auto_joiner() { t_.join(); } }; int main() { ... std::thread file_writer{file_writer_thread}; auto_joiner file_writer_joiner{file_writer}; ... }
      
      





SObjectizerを使用する場合、SObjectizerにはすでに同様のツールがあるため、このようなauto_joinerを自分で記述する必要はありません。 main()コードで使用するだけです。 上に示したものとは異なり、1つのstd :: threadオブジェクトではなく、複数のjoin()を呼び出すことができます。



ただし、この例では、std :: thread :: join()を呼び出して作業スレッドを正しく停止することに加えて、もう1つのニュアンスを考慮する必要があります:receive()が呼び出されるスレッド内で作業を完了するには、mchainを閉じます。 これが行われない場合、receive()からの戻りは発生せず、std :: thread :: join()への呼び出しで永遠にスリープ状態になります。



これは、main()を終了するときにmchainsを自動的に閉じることに注意する必要があることを意味します。 そしてここで、std :: thread :: join()の呼び出しと同じアプローチを適用します:ヘルパーオブジェクトを使用します。このオブジェクトは、デストラクタでmchainのclose()を呼び出します。 つまり 私たちは次のようなことをします:



 int main() { ... auto ch = so_5::create_mchain(...); auto_closer ch_closer{ch}; ... }
      
      





繰り返しますが、この補助クラスauto_closerの実装を行う必要はありません。 SObjectizerはすでに準備ができています。



作業スレッドのjoin()を呼び出してmchainsを自動的に閉じるという心配を取り除く方法をすでに理解しています。 しかし、もう1つの非常に重要なポイントがありました。これらの操作をどの順序で実行する必要があるかです。 というのも、この非常にシンプルで理解しやすいシーケンスを記述すると、



 int main() { ... auto ch = so_5::create_mchain(...); auto_closer ch_closer{ch}; ... std::thread work_thread{[ch]{ receive(from(ch), ...); }}; auto_joiner work_thread_joiner{work_thread}; ... }
      
      





次に、古典的なデッドロックとauto_joinerデストラクタのハングが発生します。



問題は、auto_closerデストラクタの前にauto_joinerデストラクタが呼び出されることです。 つまり まだ閉じられていないmchainからreceive()でハングする作業スレッドに参加しようとします。



したがって、作業スレッドに対してjoin()が呼び出される前にmchainsが自動的に閉じられるようにするには、プログラムでエンティティを作成する順序を変更する必要があります。



 int main() { ... //    .       . std::thread work_thread; auto_joiner work_thread_joiner{work_thread}; ... //       . auto ch = so_5::create_mchain(...); auto_closer ch_closer{ch}; ... //       . work_thread = std::thread{[ch]{ receive(from(ch), ...); }}; ... }
      
      





そして今、主なニュアンスを説明した後、メイン()関数自体のコードを見ることができます:



 int main() { //  SObjectizer. so_5::wrapped_env_t sobj; // -     ... std::thread meter_reader, file_writer; // ...      joiner. //     join()      // main.    ,      main : // -    - /. auto joiner = so_5::auto_join(meter_reader, file_writer); //  ,     . auto timer_ch = so_5::create_mchain(sobj); auto writer_ch = so_5::create_mchain(sobj); //         main. //    ,       // receive()  join()    . auto closer = so_5::auto_close_drop_content(timer_ch, writer_ch); //      . meter_reader = std::thread(meter_reader_thread, timer_ch, writer_ch); file_writer = std::thread(file_writer_thread, writer_ch); //        exit  //      . std::cout << "Type 'exit' to quit:" << std::endl; std::string cmd; while(std::getline(std::cin, cmd)) { if("exit" == cmd) break; else std::cout << "Type 'exit' to quit" << std::endl; } //   main.      // (  closer),      // (  joiner). return 0; }
      
      





このコードがほとんど明確であることを願っています。 また、2つの小さな点がない限り、明確化が必要になる場合があります。



最初は、メインの先頭にso_5 :: wrapped_env_tのインスタンスを作成することです。 このインスタンスはSObjectizer Environmentを隠します。 また、mchainsの作成とタイマーのサービスの両方にSObjectizer環境が必要です(meter_reader_threadでsend_periodic()を呼び出すと、SObjectizerタイマーへの呼び出し自体が非表示になります)。



次に、auto_close_drop_contentの呼び出しです。 一方で、彼にとっては明らかです。この関数は、デストラクタ内のmchainsを自動的に閉じるauto_closerオブジェクトを返します。 しかし、この場合も、この関数の名前でdrop_contentは何を意味しますか?



実際、SObjectizerでは、2つのモードでmchainを閉じることができます。 最初のモードでは、mchainはreceive()によってまだ処理されていないmchain内のすべてのメッセージを破棄することにより閉じます。 たとえば、close()を呼び出した時点で、mchainには100500のメッセージが含まれています。 これらのメッセージはすべて破棄され、受信者には届きません。 このモードはdrop_contentと呼ばれ、auto_close_drop_content関数はauto_closerを作成するだけで、drop_contentモードでmchainを閉じます。



対照的に、2番目のmchainクローズモードは、すべてのメッセージをmchainに保存します。 これにより、receive()関数がmchainコンテンツの処理を完了することができます。 ただし、mchainの新しいメッセージを追加することはできなくなります。 mchainは既に閉じています(書き込み用)。 このモードは、それぞれretain_contentと呼ばれます。



mchainのクローズモード、drop_contentおよびretain_contentは両方とも、異なる状況で適切です。 この例では、drop_contentが必要です。これがauto_close_drop_contentが使用される理由です。



最初の例の結果



最初の例を実行すると、非常に期待される画像が表示されます。







ここでは、これらの「投票」の結果を連続して「投票」し、「記録」しています。



簡単な例の複雑さ:file_writer_threadの負荷を制御します



2番目の例の完全なソースコードは、 ここにあります


この例の最初のバージョンは非常に理想的なものでした。センサーから「取得した」データの記録は、常に次の「調査」までに完了すると考えています。 しかし、実際には、ほとんどの場合、外部デバイスの操作時間はかなり広い範囲で「変動」する可能性があります。 これは、ファイルへの「書き込み」に時間がかかり、write_dataメッセージでmchainにメッセージが蓄積し始める状況を処理することが理にかなっていることを意味します。



同様の状況をシミュレートするには、上記のmeter_reader_thread()およびfile_writer_thread()の関数をわずかに変更します。 meter_reader_thread()で、acquisition_turnシグナルが到着するレートを増やすだけです。



 auto timer = so_5::send_periodic<acquisition_turn>(timer_ch, 0ms, 300ms);
      
      





ただし、file_writer_thread()では、「書き込み」操作時間を[295ms、1s]の範囲からランダムに選択します。 つまり 「記録」操作が「ポーリング」の間隔に収まる場合もありますが、ほとんどの場合は収まりません。 あまり合わない場合もあります。 したがって、ここでfile_writer_thread()を変更します。



 // ,    . void file_writer_thread( //        . so_5::mchain_t file_write_ch) { //      . std::mt19937 rd_gen{std::random_device{}()}; //         //  [295ms, 1s]. std::uniform_int_distribution<int> rd_dist{295, 1000}; //       ,    . //        receive. receive(from(file_write_ch), //         //   write_data. [&](so_5::mhood_t<write_data> cmd) { //     "". const auto pause = rd_dist(rd_gen); //    . std::cout << cmd->file_name_ << ": write started (pause:" << pause << "ms)" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds{pause}); std::cout << cmd->file_name_ << ": write finished" << std::endl; }); }
      
      





これで、raw_ writeメッセージがfile_write_chに蓄積できるようになりました。 狭い円で広く知られている過負荷の問題が発生します。これは、データプロバイダーがデータコンシューマーが処理できるよりも速いペースで新しいデータを生成する場合です。 問題は不快です、あなたはそれと戦う必要があります。



たとえば、背圧メカニズムを実装できます。 つまり データプロバイダーがコンシューマーに過負荷をかけ始めると、何らかの形でコンシューマーがサプライヤにそのことを知らせます。 CSPチャネルの場合、「バックプレッシャー」を実装する完全に自然な方法は、コンシューマがプロバイダから次のデータバッチを受信できるようになるまで、データプロバイダによるチャネルへの書き込みをブロックすることです。



ちなみに、この点に関して、一部のデータ処理シナリオのCSPモデルは、アクターモデルよりもはるかに便利です。 実際、アクターのモデルでは、サプライヤとコンシューマ間のデータ交換は非同期メッセージを介してのみ実行されます。 つまり 消費者に次のメッセージを送信するサプライヤは、消費者がどれだけロードされているか、次のメッセージが過負荷につながるかどうか、もしそうなら、次のメッセージを送信する前にどれだけの時間待つ必要があるかを知りません 一方、CSPモデルでは、サプライヤはチャネルへの書き込み操作を「スリープ状態」にし、消費者が負荷を整理した後にサプライヤを「起動」できます。
そのため、サプライヤー、つまり この場合、meter_reader_threadは、file_writer_threadが以前file_write_chに送信されたメッセージを解析および処理する時間がない場合、スリープ状態になりました。 SObjectizer mchainsはこれを提供できますか?



はい



これを行うには、mchainを作成するときにmchainに追加のプロパティを設定する必要があります。 この例の最初のバージョンでは、次のように最も簡単な方法でmchainを作成しました。



 auto writer_ch = so_5::create_mchain(sobj);
      
      





この場合、空きRAMのサイズが許す限り多くのメッセージをそのようなチャネルにプッシュできるため、「無次元」チャネルが作成されます。



「背圧」が必要なため、「無次元」チャネルは適していません。 そのため、処理のチャネルで待機できるメッセージの数を制限する必要があります。



また、情報提供者がいっぱいになったチャンネルに書き込もうとするとき、「眠りに落ちる」ことを望みます。 これには問題はありませんが、SObjectizerではこの期待値の上限を設定する必要があります。 たとえば、チャネルへの書き込みを試行中にスリープ状態になりますが、スリープ状態は5秒以内(またはタスクに応じて5時間)になります。



SObjectizerでは、開発者がいっぱいになったチャネルへの書き込みの最大遅延を制限する必要があります。この制限がないと、デッドロックを簡単にキャッチできるためです。 たとえば、スレッドT1がスレッドT2のオーバーフローチャネルC1にメッセージを書き込もうとしているとします。この時点では、スレッドT3のオーバーフローチャネルC2にメッセージを書き込もうとしています。 そして、この時点でスレッドT3は、スレッドT1の混雑したチャネルC0にメッセージを書き込もうとしています。 最大待機時間を制限する場合、そのようなデッドロックは時間とともに自動的に解除されます。



そのため、チャネルサイズと最大レイテンシを設定しますが、疑問は残ります。「待機してもチャネル内のスペースが解放されない場合、混雑したチャネルへの書き込み操作をどうするか」



SObjectizerでは、待機した後でもチャネルのスペースが解放されない場合の処理​​を選択できます。 たとえば、チャネルにある最も古いメッセージをスローできます。 または、チャネルに挿入しようとした新しいメッセージを無視することもできます。 または、この場合、send()関数に例外をスローさせることができます。



この例では、最も古いメッセージを破棄するなどの反応を使用します。 この場合、それは非常に論理的です、なぜなら すでにセンサーから「新しいデータ」があります;それらを記録することは、古いデータを保存することよりも重要です。 したがって、更新された例では、次のようにwrite_dataメッセージのチャネルを作成します。



 //        ,   //      mchain      // ,       . auto writer_ch = so_5::create_mchain(sobj, //      300ms. 300ms, //   mchain-    2- . 2, //   mchain  . so_5::mchain_props::memory_usage_t::preallocated, //    mchain-     , //       mchain-. so_5::mchain_props::overflow_reaction_t::remove_oldest);
      
      





追加の説明は、引数so_5 :: mchain_props :: memory_usage_t :: preallocatedでのみ指定できます。 この引数は、チャネル自体内のメッセージをキューに入れるためのメモリの割り当て方法を決定します。 なぜなら チャネルは固定されており、サイズが小さいため、メッセージキューの場所をすぐに割り当てるのが理にかなっています。 この場合に私たちがしていること。



スレッドチャネル制限meter_reader_thread



2番目の例では、write_dataメッセージのチャネルサイズを制限しました。 しかし、acquisition_turn信号用のチャネルもあります。 制限するのも理にかなっているでしょうか?



確かに、それは理にかなっています。 Acquisition_turnの場合、1つのメッセージの容量を持つチャネルがあれば十分です。 Acquisition_turn信号がすでにチャネルにある場合、新しいものを追加しても意味がありません。



したがって、このチャネルを作成するコードフラグメントを変更します。



 //        , //        mchain    //   . auto timer_ch = so_5::create_mchain(sobj, //      . 1, //   mchain  . so_5::mchain_props::memory_usage_t::preallocated, //   ,     . so_5::mchain_props::overflow_reaction_t::drop_newest);
      
      





ここには2つの重要な違いがあります。





2番目の例の結果



2番目の例を開始すると、すでに次の図が表示されています。







file_writer_threadスレッドのデバッグ出力の数字の一部が消えていることがわかります。 たとえば、data_24.datを書き込んだ後、data_26.datを書き込む必要があります。 ただし、data_25.datレコードはありません。 これは、data_25.datのwrite_dataメッセージがいっぱいになったときにチャネルからスローされたためです。



さらに、file_writer_threadスレッドが長時間レコードを「残す」と、この間にmeter_reader_threadスレッドがいくつかの「ポーリング」を実行することがわかります。



再び例を複雑にします:meter_reader_threadコントロールを追加します



3番目の例の完全なソースコードは、 ここにあります
もう一度例を複雑にする誘惑を否定することはできません。今回はmeter_reader_threadスレッドを制御する機能を追加します。確かに、なぜセンサーの「ポーリング」の期間を増減できるようにしないのですか?やってみましょう。



ユーザーとの対話中のメインスレッドに、「終了」コマンド(アプリケーションの終了)だけでなく、「inc」コマンド(ポーリング期間を1.5倍延長)および「dec」(ポーリング期間を1.5倍短縮)も理解させます。



この場合に解決しなければならない主な問題は、メインアプリケーションスレッドからmeter_reader_threadスレッドにincおよびdecコマンドを配信する問題です。しかし、実際には-これは問題ではありません。 2つの新しいシグナルを開始するだけです。



 // ,         //   . struct dec_read_period : public so_5::signal_t {}; // ,         //   . struct inc_read_period : public so_5::signal_t {};
      
      





ユーザーがコマンドを入力すると、メインスレッドはこれらの信号を対応するチャネルに送信します。



 //        exit  //      . bool stop_execution = false; while(!stop_execution) { std::cout << "Type 'exit' to quit, 'inc' or 'dec':" << std::endl; std::string cmd; if(std::getline(std::cin, cmd)) { if("exit" == cmd) stop_execution = true; else if("inc" == cmd) so_5::send<inc_read_period>(control_ch); else if("dec" == cmd) so_5::send<dec_read_period>(control_ch); } else stop_execution = true; }
      
      





しかし、これらの信号をどのチャネルに送信しますか?この質問はもっと興味深いです。



基本的に、定期的なacquisition_turnとinc_ / dec_read_periodの両方に同じチャネルを使用できます。ただし、mchainを操作するときにSObjectizerが他にできることを示すために、meter_reader_threadに2つの異なるチャネルを使用します。





3番目の例を実装しやすくするために、これらのチャネルは両方ともmain()関数で作成および閉じられ、パラメーターとしてmeter_reader_thread()関数に渡されます。



 //  ,     . //    meter_reader_thread.  - . auto control_ch = so_5::create_mchain(sobj); // ,      acquisition_turn. auto timer_ch = so_5::create_mchain(control_ch->environment(), //      . 1, //   mchain  . so_5::mchain_props::memory_usage_t::preallocated, //   ,     . so_5::mchain_props::overflow_reaction_t::drop_newest); ... //         main. //    ,       // receive()  join()    . auto closer = so_5::auto_close_drop_content(control_ch, timer_ch, writer_ch); //      . meter_reader = std::thread(meter_reader_thread, control_ch, timer_ch, writer_ch); ...
      
      







meter_reader_thread()の修正バージョン



meter_reader_thread()関数は、より多くのアクションを実行する必要があるため、サイズが大幅に増加します。テキスト自体が1つの画面に収まらない関数はあまり好きではありませんが、この場合、ビジネスロジックの断片を補助関数に塗りつぶさないように、このような膨大な関数を記述する必要がありました。



最初と2番目の例と比較して、meter_reader_threadの3番目の例では、2つの基本的に重要な変更がありました。



まず、センサーの「ポーリング」の期間を変更できます。このため、acquisition_turnを定期的なメッセージとして実行することは有益ではありません。期間が変更されたときに再起動する必要があるたび。したがって、今度は別の方法に進みます。次のAcquisition_turnを処理するとき、次の「ポーリング」とwrite_dataの送信に費やされた時間を追跡します。その後、「調査」にあまりにも多くの時間を費やした場合、すぐにacquisition_turnを遅滞なく送信します。または、遅延したAcquisition_turnを送信しますが、配信の遅延は、現在のポーリング期間と実際に費やされた時間との差分になります。



acqusition_turnを処理するために次のフラグメントを取得します。



 //         //   acquire_turn. [&](so_5::mhood_t<acquisition_turn>) { //   ,      //  .   . const auto started_at = std::chrono::steady_clock::now(); //   . std::cout << "meter read started" << std::endl; std::this_thread::sleep_for(50ms); std::cout << "meter read finished" << std::endl; //      . so_5::send<write_data>(file_write_ch, "data_" + std::to_string(ordinal) + ".dat"); ++ordinal; //         // . const auto duration = std::chrono::steady_clock::now() - started_at; //    ,    //   . if(duration >= current_period) { std::cout << "period=" << current_period.count() << "ms, no sleep" << std::endl; so_5::send<acquisition_turn>(timer_ch); } else { //        "". const auto sleep_time = to_ms(current_period - duration); std::cout << "period=" << current_period.count() << "ms, sleep=" << sleep_time.count() << "ms" << std::endl; so_5::send_delayed<acquisition_turn>(timer_ch, current_period - duration); } }
      
      





次に、1つのチャネルからではなく、同時に2つのチャネルからのメッセージを待機する必要があります。メッセージが最初にどのチャネルに来たのか、これからメッセージを取得し、取得したメッセージを処理する必要があります。



これを行うには、関数so_5 :: select()を使用します。これは、前に示したso_5 :: receive()に似ています。ただし、receive()とは異なり、select()関数は複数のチャネルからの着信メッセージを待機できます。



その結果、meter_reader_threadで次のselect()呼び出しを行います(概略的に、ハンドラー実装の詳細を省略します)。



 //       ,    . so_5::select(so_5::from_all(), //     . case_(timer_ch, //         //   acquire_turn. [&](so_5::mhood_t<acquisition_turn>) { ... //  . }), //     . case_(control_ch, //    . [&](so_5::mhood_t<inc_read_period>) { ... //  . }, //    . [&](so_5::mhood_t<dec_read_period>) { ... //  . }) );
      
      





つまりselect()は、すべてのチャネルが閉じられるまで、以下にリストされているすべてのチャネルからのメッセージを待機する必要があると言います。次に、case_セクションには、チャネルの列挙(セクションごとに1つのチャネル)と、各チャネルからのメッセージのハンドラーのリストがあります。



そのため、timer_chチャネルからはAcquisition_time信号のみを処理し、control_chチャネルからはinc_read_periodおよびdec_read_period信号を使用します。



3番目の例では、meter_read_thread()関数は、コントロールがselect()を返した後にのみコントロールを返します。また、timer_chとcontrol_chの両方が閉じられると、select()が終了します。main()で何が起こるか-アプリケーションが終了したとき。



3番目の例の結果



3番目の例を実行し、いくつかのincコマンドを発行すると、次の図が表示されます。







おわりに



SObjectizerは、マルチスレッドアプリケーションの開発を簡素化するツールとして開発されており、コンカレントコンピューティングの問題に対するいずれかのアプローチの実装としては開発されていません。したがって、SObjectizerでは、アクターのモデル、パブリッシュ/サブスクライブ、およびCSPのトレースを見つけることができます。先に、アクターとパブ/サブのモデルに関連するSObjectizerのその部分について詳しく説明しました。今日は、読者にCSPチャネルを簡単に紹介しようとしました。これは2回目の試みで、最初の試みは昨年でした



私たちのストーリーで何かが理解できないままである場合、コメントの質問に喜んでお答えします。誰かが私たちに絵/図で伝えられたことを説明してほしいなら、それを正確に教えてください-私たちは適切なイラストを作り、それをテキストに追加しようとします。



誰かに示されている例が面白くなく、人生と離婚しているように見える可能性があります。それでも、私たちの目標はそのわかりやすさでした。しかし、CSPチャネルを使用してソリューションを説明するために、読者の1人がより重要な別の例を提供できる場合は、読者が提案した例に対するソリューションを作成し、このソリューションを後続の記事で説明します。



さて、記事の最後に、SObjectizerを実際に試して、印象を共有することを皆に提供しますフィードバックは私たちにとって非常に重要です。SObjectizerを開発し、より強力で便利なものにできるのは、あなたの希望/コメントです。



All Articles