本当に便利なストリームセーフ信号

C ++で信号を実装するライブラリは世界中にたくさんあります。 残念ながら、私が遭遇したすべての実装には、これらのライブラリを使用して単純なマルチスレッドコードを書くことを妨げるいくつかの問題があります。 ここでは、これらの問題とその解決方法について説明します。



シグナルとは何ですか?



多くの人はすでにこの概念に精通していると思いますが、念のため、それを書きます。



シグナルは、任意のイベントの通知を、互いに独立して登録できる受信者に送信する方法です。 必要に応じて、多くの受信者とコールバックします。 または、.NET、マルチキャストデリゲートで作業した人向け。



boost :: signals2を使用したいくつかの例
信号の発表:



struct Button { boost::signals2::signal<void()> OnClick; };
      
      





信号への接続と信号からの切断:



 void ClickHandler() { cout << “Button clicked” << endl; } // ... boost::signals2::connection c = button->OnClick.connect(&ClickHandler); // ... c.disconnect();
      
      





シグナルコール:



 struct Button { boost::signals2::signal<void()> OnClick; private: void MouseDownHandler() { OnClick(); } };
      
      







問題について



シングルスレッドのコードでは、すべてが素晴らしく見え、かなりうまく機能しますが、マルチスレッドの場合はどうでしょうか?



残念ながら、異なる実装に共通する3つの問題があります。



  1. アトミックに信号に接続してバインドされた状態を取得する方法はありません
  2. 信号からのノンブロッキング切断
  3. 非同期ハンドラーを無効にしても、すでにスレッドキューにある呼び出しはキャンセルされません。


それぞれを詳細に検討しましょう。 これを行うために、架空のメディアセットトップボックスのファームウェア部分、つまり3つのクラスを作成します。





ここで表示されるコードは非常に単純化されており、これらの問題に集中できるように、余分なものは一切含まれていません。 タイプTypePtrのタイプも表示されます。 これは単なるstd :: shared_ptr <Type>であり 、心配しないでください。



アトミックに信号に接続してバインドされた状態を取得する方法はありません



だから、 StorageManager 。 すでにコンソールに挿入されているメディアのゲッターと、新しいメディアを通知する信号が必要です。



 class StorageManager { public: std::vector<StoragePtr> GetStorages() const; boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded; // ... };
      
      





残念ながら、このようなインターフェイスは競合状態が発生しない限り使用できません。



その順序では機能しません...



 storageManager->OnStorageAdded.connect(&StorageHandler); //      ,     for (auto&& storage : storageManager->GetStorages()) StorageHandler(storage);
      
      





...そして、この順序では機能しません。



 for (auto&& storage : storageManager->GetStorages()) StorageHandler(storage); //        ,      storageManager->OnStorageAdded.connect(&StorageHandler);
      
      





共通の解決策



明らかに、競合状態になったため、ミューテックスが必要です。



 class StorageManager { mutable std::recursive_mutex _mutex; std::vector<StoragePtr> _storages; public: StorageManager() { /* ... */ } boost::signals2::signal<void(const StoragePtr&)> OnStorageAdded; std::recursive_mutex& GetMutex() const { return _mutex; } std::vector<StoragePtr> GetStorages() const { std::lock_guard<std::recursive_mutex> l(_mutex); return _storages; } private: void ReportNewStorage(const StoragePtr& storage) { std::lock_guard<std::recursive_mutex> l(_mutex); _storages.push_back(storage); OnStorageAdded(storage); } }; // ... { std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex()); storageManager->OnStorageAdded.connect(&StorageHandler); for (auto&& storage : storageManager->GetStorages()) StorageHandler(storage); }
      
      





このコードは機能しますが、いくつかの欠点があります。





より良い方法は?



connect呼び出し(mutexを取得してコレクションを走査する)の周りで行うすべてを内部に転送しましょう。



現在の状態を取得するアルゴリズムは、この状態自体の性質に依存することを理解することが重要です。 これがコレクションである場合、各要素のハンドラーを呼び出す必要があります。たとえば、enumの場合は、ハンドラーを1回だけ呼び出す必要があります。 したがって、抽象化が必要です。



シグナルにポピュレーターを追加します。これは、現在接続されているハンドラーを受け入れる関数で、シグナルの所有者(この場合はStorageManager)に現在の状態をこのハンドラーに送信する方法を決定させます。



 template < typename Signature > class signal { using populator_type = std::function<void(const std::function<Signature>&)>; mutable std::mutex _mutex; std::list<std::function<Signature> > _handlers; populator_type _populator; public: signal(populator_type populator) : _populator(std::move(populator)) { } std::mutex& get_mutex() const { return _mutex; } signal_connection connect(std::function<Signature> handler) { std::lock_guard<std::mutex> l(_mutex); _populator(handler); //        _handlers.push_back(std::move(handler)); return signal_connection([&]() { /*    _handlers */ } ); } // ... };
      
      





signal_connectionクラスは現在 、シグナル内のリストからハンドラーを削除するラムダ関数を受け入れます。 後でもう少し完全なコードを提供します。



この新しい概念を使用して、 StorageManagerを書き換えます



 class StorageManager { std::vector<StoragePtr> _storages; public: StorageManager() : _storages([&](const std::function<void(const StoragePtr&)>& h) { for (auto&& s : _storages) h(s); }) { /* ... */ } signal<void(const StoragePtr&)> OnStorageAdded; private: void ReportNewStorage(const StoragePtr& storage) { //      ,     , //          _storages std::lock_guard<std::mutex> l(OnStorageAdded.get_mutex()); _storages.push_back(storage); OnStorageAdded(storage); } };
      
      





C ++ 14を使用する場合、ポピュレーターは非常に短くなります。



 StorageManager() : _storages([&](auto&& h) { for (auto&& s : _storages) h(s); }) { }
      
      





ポピュレーターが呼び出されると、ミューテックスはsignal :: connectメソッドでキャプチャされるため、ポピュレーターの本体では必要ないことに注意してください。



クライアントコードは非常に短くなります。



 storageManager->OnStorageAdded.connect(&StorageHandler);
      
      





1行で、同時に信号に接続し、オブジェクトの現在の状態を取得します。 いいね!



信号からのノンブロッキング切断



ここで、 MediaScannerを作成します。 コンストラクターで、シグナルStorageManager :: OnStorageAddedに接続し、デストラクターで切断します。



 class MediaScanner { private: boost::signals2::connection _connection; public: MediaScanner(const StorageManagerPtr& storageManager) { _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); } ~MediaScanner() { _connection.disconnect(); //        ,  . //   ,        MediaScanner. } private: void StorageHandler(const StoragePtr& storage) { /*  -  */ } };
      
      





悲しいかな、このコードは時々落ちるでしょう。 その理由は、私が知っているすべての実装で、 disconnectメソッドがどのように機能するかです。 次回シグナルが呼び出されたときに、対応するハンドラーが機能しないことを保証します。 この場合、この時点でハンドラーが別のスレッドで実行されると、ハンドラーは中断されず、破棄されたMediaScannerオブジェクトで引き続き動作します。



Qtのソリューション



Qtでは、すべてのオブジェクトは特定のスレッドに属し、そのハンドラーはそのスレッドで排他的に呼び出されます。 シグナルから安全に切断するには、 QObject :: deleteLaterメソッドを呼び出して、目的のスレッドから実際の削除が行われ、削除後にハンドラーが呼び出されないようにします。



 mediaScanner->deleteLater();
      
      





これは、Qtと完全に統合する準備ができている場合に適したオプションです(プログラムのコアではstd :: threadを放棄して、QObjectやQThreadなどを優先します)。



Boostのソリューション:: Signals2



この問題を解決するために、 boostでは、スロット(つまりハンドラー)でtrack / track_foreignメソッドを使用することをお勧めします 。 これらのメソッドは任意のオブジェクトに対してweak_ptrを使用し、各オブジェクトが生きている間、ハンドラーとシグナルの接続が存在し、スロットはそれを「監視」します。



これは非常に簡単に機能します。各スロットには、監視対象オブジェクトのweak_ptrのコレクションがあり、ハンドラの期間中「ロック」(申し訳ありません)します。 したがって、これらのオブジェクトは、ハンドラーコードがアクセスできる限り、破棄されることは保証されていません。 オブジェクトのいずれかがすでに破棄されている場合、接続は切断されます。



問題は、このために、署名されるオブジェクトにweak_ptrが必要であるということです。 私の意見では、これを実現する最も適切な方法は、 MediaScannerクラスでファクトリメソッドを作成することです。このメソッドでは、作成したオブジェクトに、関心のあるすべての信号に署名します。



 class MediaScanner { public: static std::shared_ptr<MediaScanner> Create(const StorageManagerPtr& storageManager) { std::lock_guard<std::recursive_mutex> l(storageManager->GetMutex()); MediaScannerPtr result(new MediaScanner); boost::signals2::signal<void(const StoragePtr&)>::slot_type slot(bind(&MediaScanner::StorageHandler, result.get(), _1)); slot.track_foreign(result); storageManager->OnStorageAdded.connect(slot); for (auto&& storage : storageManager->GetStorages()) result->StorageHandler(storage); return result; } private: MediaScanner() //  ! { /*  ,    */ } void StorageHandler(const StoragePtr& storage); { /*  -  */ } };
      
      





したがって、欠点は次のとおりです。





より良い方法は?



disconnectメソッドブロックを作成して、コントロールを返した後、シグナルハンドラがアクセスしたすべてのものを破棄できることを保証します。 std :: thread :: joinメソッドのようなもの。



今後は、このために3つのクラスが必要だと言います。





コードクラスsignal_connection



 class signal_connection { life_token _token; std::function<void()> _eraseHandlerFunc; public: signal_connection(life_token token, std::function<void()> eraseHandlerFunc) : _token(token), _eraseHandlerFunc(eraseHandlerFunc) { } ~signal_connection(); { disconnect(); } void disconnect() { if (_token.released()) return; _token.release(); //   ,     (. . ) _eraseHandler(); //   -,      } };
      
      





ここで私はRAII接続オブジェクトのサポーターであると言わなければなりません。 これについては詳しく説明しませんが、この文脈では重要ではないとしか言​​いません。



信号クラスも少し変更されます:



 template < typename Signature > class signal { using populator_type = std::function<void(const std::function<Signature>&)>; struct handler { std::function<Signature> handler_func; life_token::checker life_checker; }; mutable std::mutex _mutex; std::list<handler> _handlers; populator_type _populator; public: // ... signal_connection connect(std::function<Signature> handler) { std::lock_guard<std::mutex> l(_mutex); life_token token; _populator(handler); _handlers.push_back(Handler{std::move(handler), life_token::checker(token)}); return signal_connection(token, [&]() { /*    _handlers */ } ); } template < typename... Args > void operator() (Args&&... args) const { for (auto&& handler : _handlers) { life_token::checker::execution_guard g(handler.life_checker); if (g.is_alive()) handler.handler_func(forward<Args>(args)...); } } };
      
      





さて、各ハンドラの横に、 signal_connectionにあるlife_tokenを参照するlife_token :: checkerオブジェクトがあります。 オブジェクトlife_token :: checker :: execution_guardを使用して、ハンドラーの期間中にキャプチャします



これらのオブジェクトの実装をネタバレの下に隠します。 疲れている場合は、スキップできます。
life_tokenは、次のものが必要です。



  • life_token :: releaseで待機するある種のプリミティブオペレーティングシステム(ここでは、簡単にするためにmutexを使用します)
  • ライブ/デッドフラグ
  • execution_guardによるロックカウンター(簡単にするためにここでは省略)


 class life_token { struct impl { std::mutex mutex; bool alive = true; }; std::shared_ptr<impl> _impl; public: life_token() : _impl(std::make_shared<impl>()) { } ~life_token() { release(); } bool released() const { return !_impl; } void release() { if (released()) return; std::lock_guard<std::mutex> l(_impl->mutex); _impl->alive = false; _impl.reset(); } class checker { shared_ptr<impl> _impl; public: checker(const life_token& t) : _impl(t._impl) { } class execution_guard { shared_ptr<Impl> _impl; public: execution_guard(const checker& c) : _impl(c._impl) { _impl->mutex.lock(); } ~execution_guard() { _impl->mutex.unlock(); } bool is_alive() const { return _impl->alive; } }; }; };
      
      





ミューテックスは、 execution_guardライフタイムの間キャプチャされます。 したがって、この時点でlife_token :: releaseメソッドが別のスレッドで呼び出された場合、同じmutexのキャプチャでブロックされ、シグナルハンドラーの実行が完了するまで待機します。 その後、 生きているフラグをクリアし、シグナルへの以降のすべての呼び出しはハンドラーの呼び出しにつながりません。



MediaScannerコードは現在どのように見えますか? まさに最初にそれを書きたかった方法:



 class MediaScanner { private: signals_connection _connection; public: MediaScanner(const StorageManagerPtr& storageManager) { _connection = storageManager->OnStorageAdded.connect([&](const StoragePtr& s) { this->StorageHandler(s); }); } ~MediaScanner() { _connection.disconnect(); } private: void StorageHandler(const StoragePtr& storage) { /*  -  */ } };
      
      





非同期ハンドラーを無効にしても、すでにスレッドキューにある呼び出しはキャンセルされません。



見つかったメディアファイルに応答し、それらを表示する行を追加するMediaUiModelを作成します。



これを行うには、次の信号をMediaScannerに追加します。



 signal<void(const MediaPtr&)> OnMediaFound;
      
      





ここには2つの重要なことがあります。





 class MediaUiModel : public UiModel<MediaUiModelRow> { private: boost::io_service& _uiThread; boost::signals2::connection _connection; public: MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner) : _uiThread(uiThread) { std::lock_guard<std::recursive_mutex> l(scanner.GetMutex()); scanner.OnMediaFound.connect([&](const MediaPtr& m) { this->MediaHandler(m); }); for (auto&& m : scanner.GetMedia()) AppendRow(MediaUiModelRow(m)) } ~MediaUiModel() { _connection.disconnect(); } private: //      MediaScanner',        UI. void MediaHandler(const MediaPtr& m) { _uiThread.post([&]() { this->AppendRow(MediaUiModelRow(m)); }); } };
      
      





前の問題に加えて、もう1つあります。 シグナルがトリガーされるたびに、ハンドラーをUIストリームに転送します。 ある時点でモデルを削除すると(たとえば、Galleryアプリケーションを離れた場合)、これらのハンドラーはすべて、後でデッドオブジェクトに到達します。 そして再び秋。



Qtのソリューション



同じ機能を持つすべて同じdeleteLater



Boostのソリューション:: Signals2



幸運で、UIフレームワークでdeleteLaterモデルを指定できる場合は、保存されます。 最初にモデルをシグナルから切断し、次にdeleteLaterを呼び出すパブリックメソッドを作成するだけで、Qtと同じ動作が得られます。 確かに、前の問題を解決する必要があります。 これを行うには、信号をサブスクライブするモデル内にshared_ptrモデルを作成する可能性があります。 コードはそれほど小さくありませんが、これは技術の問題です。



が悪く、UIフレームワークでモデルを削除する必要がある場合は、 life_tokenを作成します。



たとえば、次のようなものです(疲れている場合は読まないことをお勧めします)。
 template < typename Signature_ > class AsyncToUiHandlerWrapper { private: boost::io_service& _uiThread; std::function<Signature_> _realHandler; bool _released; mutable std::mutex _mutex; public: AsyncToUiHandlerWrapper(boost::io_service& uiThread, std::function<Signature_> realHandler) : _uiThread(uiThread), _realHandler(realHandler), _released(false) { } void Release() { std::lock_guard<std::mutex> l(_mutex); _released = true; } template < typename... Args_ > static void AsyncHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args) { auto self = selfWeak.lock(); std::lock_guard<std::mutex> l(self->_mutex); if (!self->_released) // AsyncToUiHandlerWrapper   ,  _uiThread       self->_uiThread.post(std::bind(&AsyncToUiHandlerWrapper::UiThreadHandler<Args_&...>, selfWeak, std::forward<Args_>(args)...))); } private: template < typename... Args_ > static void UiThreadHandler(const std::weak_ptr<AsyncToUiHandlerWrapper>& selfWeak, Args_&&... args) { auto self = selfWeak.lock(); if (!self) return; if (!self->_released) // AsyncToUiHandlerWrapper   , , ,  _realHandler,   self->_realHandler(std::forward<Args_>(args)...); } }; class MediaUiModel : public UiModel<MediaUiModelRow> { private: using AsyncMediaHandler = AsyncToUiHandlerWrapper<void(const MediaPtr&)>; private: std::shared_ptr<AsyncMediaHandler> _asyncHandler; public: MediaUiModel(boost::io_service& uiThread, const MediaScanner& scanner) { try { _asyncHandler = std::make_shared<AsyncMediaHandler>(std::ref(uiThread), [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); }); std::lock_guard<std::recursive_mutex> l(scanner.GetMutex()); boost::signals2::signal<void(const MediaPtr&)>::slot_type slot(std::bind(&AsyncMediaHandler::AsyncHandler<const MediaPtr&>, std::weak_ptr<AsyncMediaHandler>(_asyncHandler), std::placeholders::_1)); slot.track_foreign(_asyncHandler); scanner.OnMediaFound.connect(slot); for (auto&& m : scanner.GetMedia()) AppendRow(MediaUiModelRow(m)); } catch (...) { Destroy(); throw; } } ~MediaUiModel() { Destroy(); } private: void Destroy() { if (_asyncHandler) _asyncHandler->Release(); //      MediaUiModel   ,       _asyncHandler.reset(); } };
      
      





このコードについてはコメントしませんが、少し悲しましょう。



より良い方法は?



とても簡単です。 まず、タスクキューとしてスレッドのインターフェイスを作成します。



 struct task_executor { virtual ~task_executor() { } virtual void add_task(const std::function<void()>& task) = 0; };
      
      





次に、シグナルにオーバーロードされた接続メソッドを作成し、ストリームを受け入れます。



 signal_connection connect(const std::shared_ptr<task_executor>& worker, std::function<Signature> handler);
      
      





このメソッドでは、 _handlersコレクション内のハンドラーにラッパーを配置します。これは、呼び出されると、ハンドラーと対応するlife_token ::チェッカーのペアを目的のストリームに転送します。 最終スレッドで実際のハンドラーを呼び出すには、前と同じようにexecution_guardを使用します。



したがって、 disconnectメソッドは、とりわけ、シグナルから切断した後、非同期ハンドラーも呼び出されないことを保証します。



ここでは、ラッパーとオーバーロードされた接続メソッドのコードは提供しません。 私はその考えが明確だと思います。



モデルコードは非常に単純になります。



 class MediaUiModel : public UiModel<MediaUiModelRow> { private: signal_connection _connection; public: MediaUiModel(const std::shared_ptr<task_executor>& uiThread, const MediaScanner& scanner) { _connection = scanner.OnMediaFound.connect(uiThread, [&](const MediaPtr& m) { this->AppendRow(MediaUiModelRow(m)); }); } ~MediaUiModel() { _connection.reset(); } };
      
      





ここでは、 AppendRowメソッドはUIスレッドで厳密に呼び出され、切断するまでのみ呼び出されます。



まとめると



そのため、信号を使用してより簡単なコードを作成できるようにする3つの重要なことがあります。



  1. ポピュレーターにより、信号に接続しながら現在の状態を簡単に取得できます
  2. 切断ブロックメソッドを使用すると、オブジェクトを独自のデストラクタでサブスクライブ解除できます。
  3. 前の項目が非同期ハンドラーに当てはまるように、 切断は、ストリームキューに既に存在する呼び出しを「無関係」としてマークする必要もあります。


もちろん、私がここに持ってきた信号コードは非常にシンプルで原始的であり、非常に速く動作しません。 私の目標は、今日の支配的なアプローチよりも魅力的な代替アプローチについて話すことでした。 実際には、これらすべてのことをはるかに効率的に記述できます。



私たちはプロジェクトでこのアプローチを約5年間使用しており、非常に満足しています。



すぐに使える



私は、C ++ 11をゼロから書き直して、私たちが持っていたシグナルを改善し、実装の長い間改善する価値のあった部分を改善しました。

健康に使用: https : //github.com/koplyarov/wigwag



ミニFAQ



redditとTwitterでの人々の反応から判断すると、基本的に3つの質問が全員に関係しています。



Q:すぐに、各ハンドラーの呼び出しでlife_tokenをブロックする必要があります。 遅いでしょうか?

A:奇妙なことに、いいえ。 ミューテックスの代わりにアトミック変数を使用できます。ハンドラーが実行された時点でまだ切断呼び出しがあった場合は、 std :: condition_variableで待機しますその結果、まったく逆になります:track / track_foreignweak_ptrコレクションを操作する必要があります)の形式のオーバーヘッドが欠落しているため、この実装はメモリ:: speed2をメモリと速度ではるかに残し、Qtよりも優れています。

ベンチマークはここにあります



Q:ブロックの切断方法によるデッドロックはありますか?

A:はい。デッドロックは、ブーストやQtよりも簡単に取得できます。私の意見では、これは信号を使用するためのよりシンプルなコードと彼らの仕事のより速い速度で報われます。さらに、誰が誰をフォローしているかを注意深く監視する場合、そのような状況は例外である可能性が高くなります。



もちろん、デッドロックをキャッチして修復する必要があります。 Linuxでは、これにHelgrindをお勧めします。 Windowsの場合、Intel InspectorCHESSによって2分間のGoogle検索が提供されます。



何らかの理由で上記のいずれも購入できない場合(たとえば、プラットフォームにhelgrindやある種の限界オペレーティングシステムを実行するための十分なメモリがない場合)、この(再び、単純化された)mutexクラスの形式の松葉杖があります:



 class mutex { private: std::timed_mutex _m; public: void lock() { if (_m.try_lock()) return; while (!_m.try_lock_for(std::chrono::seconds(10))) Logger::Warning() << "Could not lock mutex " << (void*)this << " for a long time:\n" << get_backtrace_string(); } // ... };
      
      





Visual StudioとGCCの両方に、コードでバックトレースを取得する機能があります。さらに、優れたlibunwindがあります。

このアプローチを使用すると、デッドロックのほとんどがQAによってキャッチされ、ログを一目で見ると、すべてがブロックされた場所がわかります。修理が必要なだけです。



Q: 1つのミューテックスを複数の信号に使用できますか?私が望む方法で例外を処理することは可能ですか?同期を使用せず、高速のシングルスレッド信号を取得することは可能ですか?

A:できる、できる、できる。これにはテンプレート戦略があります。詳細はドキュメントをご覧ください。



All Articles