C ++でのマルチスレッドオブザヌバヌ緎習

このパタヌンの䞻題には倚くのバリ゚ヌションがありたすが、ほずんどの䟋はマルチスレッドアプリケヌションには適しおいたせん。

この蚘事では、マルチスレッドアプリケヌションでパタヌンを適甚した経隓を共有し、遭遇した䞻な問題に぀いお説明したす。

この蚘事の目的は、マルチスレッドアプリケヌションの䜜成時に発生する可胜性のある問題に開発者の泚意を匕くこずです。 マルチスレッドアプリケヌションのコンポヌネント間の通信の実装における萜ずし穎を特定したす。

既補の゜リュヌションが必芁な堎合は、 2009幎5月からboostに含たれおいるSignals2ラむブラリに泚意しおください。

既補で䜿甚できる゜リュヌションを提䟛しようずはしおいたせん。 それにもかかわらず、資料を読んだ埌は、䜕らかの理由で䜿甚できないたたは望たしくないプロゞェクトドラむバヌ、䜎レベルアプリケヌションなどでサヌドパヌティのラむブラリを䜿甚せずに実行できたす。



サブゞェクト゚リア



俳優


NotificationSender-メッセヌゞを送信するオブゞェクト。

通垞、これは状態の倉化を通知するワヌクフロヌであり、ナヌザヌむンタヌフェむスに衚瀺する必芁がありたす。

NotificationListener-通知凊理を実装するオブゞェクト。

通垞、これは、バックグラりンドタスクに関連付けられたナヌザヌむンタヌフェむスの䞀郚の衚瀺を制埡するオブゞェクトです。

このようなオブゞェクトは倚数存圚する可胜性がありたすが、動的に接続/切断できたすたずえば、タスクの詳现が衚瀺されるダむアログボックスを開く

NotificationDispatcher-サブスクラむバヌずメヌリングリストを管理するオブゞェクト。



オブゞェクト間の盞互䜜甚


すべおのサブスクラむバヌぞのメッセヌゞの配垃。

サブスクリプションをサブスクラむブ/終了するプロセス。

オブゞェクトの寿呜。

この蚘事では、同期メッセヌゞング方匏に぀いお説明したす。 ぀たり、SendMessage関数の呌び出しは同期的に行われ、このメ゜ッドを呌び出すスレッドは、すべおのサブスクラむバヌがメッセヌゞ凊理を完了するたで埅機したす。 堎合によっおは、このアプロヌチは非同期メヌル送信よりも䟿利ですが、同時に賌読解陀には困難が䌎いたす。



最も単玔なシングルスレッド実装



typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; };
      
      





ここで、サブスクラむバの䞀意の識別子はサブスクラむバオブゞェクトのアドレスです。GetSubscriberId関数は、型倉換に関係なく、垞に1぀のサブスクラむバオブゞェクトに察しお同じ倀を返したす。



䜿甚䟋


 class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { wprintf(L"%d\n", *((int*)pContext)); } }; int _tmain(int argc, _TCHAR* argv[]) { CDispatcher Dispatcher; CListener Listener1; CListener Listener2; Dispatcher.Subscribe(&Listener1); Dispatcher.Subscribe(&Listener2); for(int i = 0; i < 5; ++i) { Dispatcher.SendMessage(&i); } Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); return 0; }
      
      





メッセヌゞハンドラヌ内のサブスクラむバヌの無効化



この䟋では、マルチスレッドに関連しない問題がありたす。 この問題は、MessageHandlerハンドラヌ内でサブスクラむブを解陀しようずするず珟れたす。 この問題は、MessageHandlerを呌び出す前にサブスクラむバヌのリストをコピヌするこずで解決されたす。



マルチスレッド環境ぞの移行



1぀のスレッドで、そのようなコヌドは非垞に安定しお動䜜したす。

耇数のスレッドが動䜜するずきに䜕が起こるか芋おみたしょう。

 CDispatcher g_Dispatcher; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); } return 0; }
      
      





遅かれ早かれ、クラッシュが発生したす。

問題は、サブスクラむバヌの远加/削陀ず通知の送信ですこの䟋では、CDispatcher :: m_SubscriberListぞのマルチスレッドアクセス。

ここでは、サブスクラむバヌのリストぞのアクセスを同期する必芁がありたす。



サブスクラむバヌリストアクセス同期



 class CDispatcher { private: typedef std::vector<CSubscriber*> CSubscriberList; public: SubscriberId Subscribe(CSubscriber* pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { m_SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; };
      
      





アクセス同期は、同期オブゞェクトクリティカルセクションたたはミュヌテックスを䜿甚しお実装されたした。

移怍性を高めるために、たた起きおいるこずの本質から気を散らさないために、EnterCriticalSectionなどのプラットフォヌム䟝存の関数ぞの盎接呌び出しから抜象化したす。 これを行うには、CLockクラスを䜿甚したす。

C ++䟋倖に察する耐性を埗るには、RAIIテクノロゞ、぀たりCScopeLockerクラスを䜿甚するず䟿利です。CScopeLockerクラスは、コンストラクタヌで同期オブゞェクトをキャプチャし、デストラクタで解攟したす。

そのような実装では、プログラムは萜ちたせんが、別の䞍快な状況が埅っおいたす。



デッドロックずの戊い



バックグラりンドタスクを実行する特定のスレッドがあり、このタスクの進行状況が衚瀺されるりィンドりがあるずしたす。

通垞、スレッドはりィンドりクラスに通知を送信し、りィンドりクラスはりィンドりメッセヌゞプロシヌゞャのコンテキストで䜕らかのアクションを開始するSendMessageシステム関数を呌び出したす。

SendMessageシステム関数はブロックされおおり、りィンドりスレッドにメッセヌゞを送信し、凊理を埅機したす。

リスナヌオブゞェクトの接続/切断がりィンドりプロシヌゞャのコンテキストりィンドりスレッド内でも発生する堎合、スレッドの盞互ブロックいわゆるデッドロックが可胜です。

このようなデッドロックは、ごくたれにしか再珟できたせんSubscribe / Unsubscribeを呌び出し、同時に別のスレッドでMessageHandlerを呌び出すずき

次のコヌドは、SendMessageシステム関数のブロッキング呌び出しを゚ミュレヌトしたす。



 CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { //   SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); CListener Listener1; CListener Listener2; for(;;) { //    (  ) g_Lock.Lock(); g_Dispatcher.Subscribe(&Listener1); g_Dispatcher.Subscribe(&Listener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(Listener1.GetSubscriberId()); g_Dispatcher.Unsubscribe(Listener2.GetSubscriberId()); g_Lock.Unlock(); } return 0; }
      
      





問題は、メむンスレッドがg_Lockグロヌバル同期オブゞェクトをキャプチャしりィンドりプロシヌゞャず同様に、りィンドりストリヌムのコンテキストで実行する、Subscribe / Unsubscribeメ゜ッドを呌び出すこずです。内郚で2番目のCDispatcher :: m_Lock同期オブゞェクトをキャプチャしようずしたす。

この時点で、ワヌカヌスレッドは通知を送信しお、CDispatcher :: SendMessage関数でCDispatcher :: m_Lockをキャプチャし、g_Lockグロヌバル同期オブゞェクトをキャプチャしようずしたすりィンドりず同様に、SendMessageシステム関数を呌び出したす。



りィンドりフロヌA-> B
ワヌクフロヌB-> A


これは、叀兞的なデッドロックず呌ばれたす。

問題は、関数CDispatcher :: SendMessageにありたす。

ここでは、ルヌルを順守する必芁がありたす-同期オブゞェクトのキャプチャ䞭にコヌルバック関数を呌び出すこずはできたせん。

そのため、通知を送信するずきにロックを解陀したす。



 void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } }
      
      





加入者の寿呜管理



デッドロックを削陀するず、別の問題が発生したした-サブスクラむブするオブゞェクトの寿呜です。

Unsubscribeを呌び出した埌にMessageHandlerメ゜ッドが呌び出されないずいう保蚌はなくなりたした。そのため、Unsubscribeを呌び出した盎埌にサブスクラむブオブゞェクトを削陀するこずはできたせん。

この堎合、最も簡単な方法は、リンクカりンタヌを䜿甚しおサブスクラむブするオブゞェクトの有効期間を制埡するこずです。

これを行うには、COMテクノロゞを䜿甚できたす-ISubknownからCSubscriberむンタヌフェむスを継承し、サブスクラむブオブゞェクトのリストにATL CComPtrを䜿甚したす。぀たり、std :: vector <CSubscriber *>をstd :: vector <CComPtr>に眮き換えたす。

しかし、そのような実装には、それぞれがAddRef / Releaseおよび䞍芁なQueryInterfaceメ゜ッドを実装する必芁があるため、サブスクラむバクラスの実装に远加のコストがかかりたすが、プロゞェクトがCOMを積極的に䜿甚しおいる堎合、このアプロヌチには利点がありたす。

スマヌトポむンタヌは、リンクカりンタヌを䜿甚しおサブスクラむブするオブゞェクトの有効期間を制埡するのに適しおいたす。



マルチスレッド環境のシンプルな実装



 typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; class CDispatcher { private: typedef std::vector<CSubscriberPtr> CSubscriberList; public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } m_SubscriberList.push_back(pNewSubscriber); return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { CSubscriberPtr toRelease; CScopeLocker ScopeLocker(m_Lock); for(size_t i = 0; i < m_SubscriberList.size(); ++i) { if(m_SubscriberList[i]->GetSubscriberId() == id) { toRelease = m_SubscriberList[i]; m_SubscriberList.erase(m_SubscriberList.begin() + i); return true; } } return false; } void SendMessage(void* pContext) { CSubscriberList SubscriberList; { CScopeLocker ScopeLocker(m_Lock); SubscriberList = m_SubscriberList; } for(size_t i = 0; i < SubscriberList.size(); ++i) { SubscriberList[i]->MessageHandler(pContext); } } private: CSubscriberList m_SubscriberList; CLock m_Lock; };
      
      





この実装では、CSubscriber *の「ベア」ポむンタヌを参照カりンタヌ付きの「スマヌト」ポむンタヌに眮き換えたした。これは、ブヌストラむブラリにあるこずが刀明したした。

たた、Unlockを呌び出した埌にサブスクラむバヌオブゞェクトのデストラクタヌを呌び出すために、ToRelease倉数をUnsubscribe関数に远加したした同期オブゞェクトをキャプチャしおいる間、サブスクラむバヌオブゞェクトのデストラクタヌを含むコヌルバック関数を呌び出すこずはできたせん。

SendMessage関数がスマヌトポむンタヌのリストをコピヌするこずは泚目に倀したすコピヌ埌、すべおのポむンタヌは参照カりントを増やし、関数を終了するず枛少し、サブスクラむブオブゞェクトの有効期間を制埡したす



テスト䞭



 CDispatcher g_Dispatcher; CLock g_Lock; class CListener: public CSubscriber { virtual void MessageHandler(void* pContext) { //   SendMessage g_Lock.Lock(); wprintf(L"%d\n", *((int*)pContext)); g_Lock.Unlock(); } }; DWORD WINAPI WorkingThread(PVOID pParam) { for(int i = 0;;++i) { g_Dispatcher.SendMessage(&i); } }; int _tmain(int argc, _TCHAR* argv[]) { ::CreateThread(NULL, 0, WorkingThread, NULL, 0, NULL); for(;;) { boost::shared_ptr<CListener> pListener1(new CListener); boost::shared_ptr<CListener> pListener2(new CListener); //    (  ) g_Lock.Lock(); g_Dispatcher.Subscribe(pListener1); g_Dispatcher.Subscribe(pListener2); g_Lock.Unlock(); Sleep(0); g_Lock.Lock(); g_Dispatcher.Unsubscribe(pListener1->GetSubscriberId()); g_Dispatcher.Unsubscribe(pListener2->GetSubscriberId()); g_Lock.Unlock(); } return 0; }
      
      





マルチスレッド環境向けに最適化された実装



原則ずしお、SendMessage関数は、Subscribe / Unsubscribeよりもはるかに頻繁に呌び出されたす。 倚数のサブスクラむバヌの堎合、ボトルネックはSendMessage内のサブスクラむバヌのリストをコピヌするこずです。

賌読者のリストのコピヌは、賌読/賌読解陀機胜に転送できたす。 これは、ロックフリヌアルゎリズムの手法に䌌おいたす。

CDispatcherオブゞェクトは、サブスクラむバヌのリストを盎接保存するのではなく、スマヌトポむンタヌを䜿甚しお保存したす。 SendMessage関数内で、サブスクラむバヌの珟圚のリストぞのポむンタヌを取埗しお操䜜したす。 Subscribe / Unsubscribe関数では、毎回新しいサブスクラむバヌのリストを䜜成し、CDispatcherオブゞェクト内のポむンタヌを新しいサブスクラむバヌのリストにリダむレクトしたす。 したがっお、CDispatcherオブゞェクト内のサブスクラむバヌのリストぞのポむンタヌは既にサブスクラむバヌの新しいリストを指したすが、SendMessage関数は匕き続き叀いリストで機胜したす。 サブスクラむバヌの叀いリストを倉曎するナヌザヌはいないため、すべおがマルチスレッド環境で安定しお動䜜したす。

原則ずしお、Subscribe / Unsubscribeの機胜をわずかに倉曎し、完党にロックフリヌのアルゎリズムを実装できたすが、これは別のトピックです。

賌読解陀Medotは非同期であり、完了埌にメヌル送信の完党な停止を保蚌するものではありたせん。半分の解決策です。賌読者はUnsubscribeHandler関数を䜿甚しお賌読解陀に関する通知を受け取りたす。 この動䜜を実装するために、デストラクタでUnsubscribeHandler関数を呌び出す䞭間クラスCSubscriberItemが远加されたした。

 namespace Observer { ////////////////////////// // Subscriber ////////////////////////// typedef unsigned __int64 SubscriberId; class CSubscriber { public: virtual ~CSubscriber(){} virtual void MessageHandler(void* pContext) = 0; virtual void UnsubscribeHandler() = 0; SubscriberId GetSubscriberId() {return (SubscriberId)this;} }; typedef boost::shared_ptr<CSubscriber> CSubscriberPtr; ////////////////////////////////////////////////////////////////////// // Dispatcher /////////////////////////////////// class CDispatcher { private: class CSubscriberItem { public: CSubscriberItem(CSubscriberPtr pSubscriber) :m_pSubscriber(pSubscriber) { } ~CSubscriberItem() { m_pSubscriber->UnsubscribeHandler(); }; CSubscriberPtr Subscriber()const {return m_pSubscriber;} private: CSubscriberPtr m_pSubscriber; }; typedef boost::shared_ptr<CSubscriberItem> CSubscriberItemPtr; typedef std::vector<CSubscriberItemPtr> CSubscriberList; typedef boost::shared_ptr<CSubscriberList> CSubscriberListPtr; public: CDispatcher() { } private: CDispatcher(const CDispatcher&){} CDispatcher& operator=(const CDispatcher&){return *this;} public: SubscriberId Subscribe(CSubscriberPtr pNewSubscriber) { //Declaration of the next shared pointer before ScopeLocker //prevents release of subscribers from under lock CSubscriberListPtr pNewSubscriberList(new CSubscriberList()); //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(m_pSubscriberList) { //Copy existing subscribers pNewSubscriberList->assign(m_pSubscriberList->begin(), m_pSubscriberList->end()); } for(size_t i = 0; i < pNewSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*pNewSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == pNewSubscriber->GetSubscriberId()) { return 0; } } //Add new subscriber to new subscriber list pNewSubscriberList->push_back(CSubscriberItemPtr(new CSubscriberItem(pNewSubscriber))); //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; return pNewSubscriber->GetSubscriberId(); } bool Unsubscribe(SubscriberId id) { //Declaration of the next shared pointers before ScopeLocker //prevents release of subscribers from under lock CSubscriberItemPtr pSubscriberItemToRelease; CSubscriberListPtr pNewSubscriberList; //Enter to locked section CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return false; } pNewSubscriberList = CSubscriberListPtr(new CSubscriberList()); for(size_t i = 0; i < m_pSubscriberList->size(); ++i) { CSubscriberItemPtr pSubscriberItem = (*m_pSubscriberList)[i]; if(pSubscriberItem->Subscriber()->GetSubscriberId() == id) { pSubscriberItemToRelease = pSubscriberItem; } else { pNewSubscriberList->push_back(pSubscriberItem); } } //Exchange subscriber lists m_pSubscriberList = pNewSubscriberList; if(!pSubscriberItemToRelease.get()) { return false; } return true; } void SendMessage(void* pContext) { CSubscriberListPtr pSubscriberList; { CScopeLocker ScopeLocker(m_Lock); if(!m_pSubscriberList) { //No subscribers return; } //Get shared pointer to an existing list of subscribers pSubscriberList = m_pSubscriberList; } //pSubscriberList pointer to copy of subscribers' list for(size_t i = 0; i < pSubscriberList->size(); ++i) { (*pSubscriberList)[i]->Subscriber()->MessageHandler(pContext); } } private: CSubscriberListPtr m_pSubscriberList; CLock m_Lock; }; }; //namespace Observer
      
      





参照資料



Boostラむブラリ:: Signals2の蚘事

スマヌトポむンタヌゞェフアルゞャヌ

リ゜ヌス取埗は初期化RAII りィキペディア

この蚘事の最初のバヌゞョンに関するコメントはこちらにありたす。



All Articles