C ++ 11を使用して同期バリアを作成する

はじめに



POSIXストリームとC ++ 11ストリームの2つの異なる並列プログラミングテクノロジーを比較すると、後者にはpthreadライブラリのbarrier_t型の類似物がないことがわかります。



奇妙なことに、このような重要な同期プリミティブは標準ライブラリにありません。 この記事では、C ++ 11標準セットに含まれるライブラリのみを使用してバリアを作成する方法について説明します。



定義

バリアは、同期のプリミティブの1つです。 多数のスレッドで作成されます。 最初のスレッドが作業を完了すると、バリアで待機し、残りのスレッドが作業を完了するまで待機します。

バリアが作成されたときと同じくらい多くのストリームがバリアに蓄積されるとすぐに、バリアで待機するすべてのフローが機能し続けます。



ブラックジャックを使用してバリアを作成してみましょう...



まず、C ++ 11標準に含まれる次のライブラリを接続する必要があります。

#include <thread> #include <atomic> #include <condition_variable> #include <mutex>
      
      







おそらく、これらのライブラリがすべて必要なのはなぜでしょうか? さて、障壁には最初のものは必要ありませんが、このライブラリを接続せずにコードをチェックできるとは思いません。



しかし、まず最初に!



バリアにとって最も重要な分野は何ですか? 明らかにスレッドの数。

障壁は他に何を知る必要がありますか? 現在待機中のスレッドの数。



手と届く

 class barrier { const unsigned int threadCount; unsigned int threadsWaiting; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } };
      
      







ただし、少し考えてみましょう。 同期には時間がかかるため、障壁はすでにアプリケーションの速度を低下させています。 したがって、バリア自体の作成と処理のコストを削減したいと考えています。

したがって、アトミック操作は、バリアで予期されるフローの数を変更するのにより適しています。



したがって、クラスは次のようになります。



 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } };
      
      







さて、クラススケルトンを作成しました。 このクラスのオブジェクトを作成できます。コンストラクターがあり、コピーコンストラクターがあります...

すみません、私は何と言いましたか? 一般に、オブジェクト指向プログラミングと並列プログラミングの組み合わせでは、不快な結果から身を守るために、コピーコンストラクターを削除することをお勧めします。



C ++ 11では、このコンストラクターを明示的に無効にすることができます。



 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } barrier(const barrier &) = delete; };
      
      







だから、私たちはそれを理解しました。 私たち全員がこれを始めたメソッドを書くだけです。 障壁を待っています。



次のアイデアが思い浮かびます:バリアを待機するか、それを通過することを担当する論理変数を作成し、このまさに条件によって条件変数を使用して動作を実装します。



そこで、新しいフィールドでクラスを修正します。

 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; bool isNotWaiting; std::condition_variable waitVariable; std::mutex mutex; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } barrier(const barrier &) = delete; };
      
      







それではメソッドを理解しましょう。 すべてのスレッドがまだ渡されていない場合、バリアに到達するスレッドはこの条件変数でスリープする必要があります。つまり、次のコードを実行する必要があります。



 std::unique_lock<std::mutex> lock(mutex); waitVariable.wait(lock, [&]{ return noWait; });
      
      







すべてのフローが合格した場合、バリアで待機する必要がなくなったことを他のフローに通知する必要があります。 これにより、次のコードが実行されます。

 isNotWaiting = true; waitVariable.notify_all(); threadsWaiting.store(0);
      
      







最後のメソッドは、数値0をthreadsWaiting変数にアトミックに書き込みます。



今、1つの簡単な質問を解決するために残っています:これら2つのケースを結合する方法。 バリアで待機しているスレッドの数はどのようにしてわかりますか?



ここで、バリアの配置方法を思い出します。 バリア上のフローを待機するには、 すべてのフローがバリア機能を呼び出す必要があります。 したがって、waitメソッドが呼び出されるとすぐに、すぐにthreadsWaiting変数を1増やす必要があります。

これを行うには、fetch_addなどの関数を使用します。 これは、いわゆるRMW操作(読み取り-変更-書き込み)の1つです。 彼女はアトミック変数の値を読み取り、アトミックに引数で追加し、古い値を返しながら新しい値をそれに書き込みます。



したがって、上記の2つのケースは条件演算子によって結合され、クラスは次のようになります。



 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; bool isNotWaiting; std::condition_variable waitVariable; std::mutex mutex; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; } barrier(const barrier &) = delete; void wait() { if (threadsWaiting.fetch_add(1) >= threadCount - 1) { isNotWaiting = true; waitVariable.notify_all(); threadCount.store(0); } else { std::unique_lock<std::mutex> lock(mutex); waitVariable.wait(lock,[&]{ return isNoWaiting;}); } };
      
      







現在は、変数isNotWaitingの初期値を設定するだけです。これは明らかにfalseです。



 class barrier { const unsigned int threadCount; std::atomic<unsigned int> threadsWaiting; bool isNotWaiting; std::condition_variable waitVariable; std::mutex mutex; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; isNotWaiting = false; } barrier(const barrier &) = delete; void wait() { if (threadsWaiting.fetch_add(1) >= threadCount - 1) { isNotWaiting = true; waitVariable.notify_all(); threadCount.store(0); } else { std::unique_lock<std::mutex> lock(mutex); waitVariable.wait(lock,[&]{ return isNotWaiting;}); } };
      
      







そこで、サードパーティのライブラリを接続せずに、C ++ 11標準を使用してバリアのクラスを作成しました。



さて、あなたは私に反対することができます:さて、私はいくつかのコードを書きましたか? そして、それが機能するという証拠はどこにありますか?



したがって、最も重要な部分:障壁を実証する



 #include <iostream> #include <thread> #include <atomic> #include <condition_variable> #include <mutex> class barrier { const unsigned int threadCount; std::atomic<unsigned int>threadsWaiting; bool isNotWaiting; std::condition_variable waitVariable; std::mutex mutex; public: barrier(unsigned int n) : threadCount(n) { threadsWaiting = 0; isNotWaiting = false; } barrier(const barrier &) = delete; void wait() { if (threadsWaiting.fetch_add(1) >= threadCount - 1) { isNotWaiting = true; waitVariable.notify_all(); threadsWaiting.store(0); } else { std::unique_lock<std::mutex> lock(mutex); waitVariable.wait(lock,[&]{ return isNotWaiting;}); } } }; barrier *myBarrier; class Thread { private: std::thread* cppthread; static void threadFunction(Thread* arg) { arg->run(); } public: Thread() {} Thread(const Thread&) = delete; virtual ~Thread() {delete cppthread;} virtual void run() = 0; void start() { cppthread = new std::thread(Thread::threadFunction, this); } void wait() { cppthread->join(); } }; class BarrierDemo: public Thread { int id; public: BarrierDemo(int i) { id = i; } void run() { std::cout << "Thread " << id << "runs before barrier" << std::endl; myBarrier->wait(); std::cout << "Thread " << id << "runs after barrier" << std::endl; } }; int main() { // your code goes here int threads; std::cin >> threads; myBarrier = new barrier(threads); BarrierDemo* bardemos = static_cast<BarrierDemo*>(::operator new(sizeof(BarrierDemo)*threads)); for (int i = 0; i < threads; i++) { new (&bardemos[i])BarrierDemo(i); bardemos[i].start(); } for (int i = 0; i < threads; i++) { bardemos[i].wait(); } ::operator delete(bardemos); delete myBarrier; return 0; }
      
      







上記のコードをC ++ 11サポートのあるコンパイラにコピーして、その機能をテストできます。 この記事はここで終わります。



PS与えられたコードから、これが「1回限りの」バリアであると推測するのは簡単です。すべてのフローが通過するとすぐに、クラスの同じインスタンスをバリアとして再利用することはできません。



All Articles