C ++ 11の非同期タスク

こんにちは、私の小さな図書館をコミュニティと共有したいと思います。

私はC / C ++でプログラミングしていますが、残念ながら、作業中のプロジェクトではC ++ 11標準を使用できません。 しかし、5月の休暇が来て、自由時間が現れたので、私はこの禁断の果実を実験して研究することにしました。 学ぶのに最適なことは練習です。 プログラミング言語の記事を読むと、よりよく読むことができるので、関数の非同期実行用の小さなライブラリを作成することにしました。

std :: future、std :: asyncなどがあることがわかっていることをすぐに予約してください。 私は自分に似た何かを実現し、ラムダ関数、スレッド、ミューテックスの世界に頭を突っ込むことに興味がありました。 休日はサイクリングに最適な時期です。





それでは始めましょう


私のライブラリは次のように機能することにしました。

スレッドの数が固定されたプールがあります。

タスクは、ラムダ関数の構文を使用して追加されます。

タスク自体から、実装の結果を抽出するか、単に作業の終了を待つことができます。

先を見ると、次のようになります。



... act::control control(N_THREADS); auto some_task = act::make_task([](std::vector<double>::const_iterator begin, std::vector<double>::const_iterator end) { double sum = 0; for (auto i = begin; i != end; ++i) { sum+=(*i); } return sum; } , data.begin(), data.end()); control << some_task; cout << some_task->get() << endl; ...
      
      







タスククラス


最初に、タスクを記述するクラスを作成する必要があります。



 template <typename T> class task : public task<decltype(&T::operator())> { }; template <typename ClassType, typename ReturnType, typename ... Args> class task<ReturnType(ClassType::*)(Args...) const> { protected: const ClassType &m_func; std::tuple<Args...> m_vars; ReturnType m_return; public: task(const ClassType &v, Args... args): m_func(v), m_vars(args ...) {} virtual ~task() {} private: };
      
      







ご存じのように、lamba関数はoperator()演算子を使用してファンクタークラスに拡張されます。

タスククラスはテンプレートであり、その型はファンクター演算子&T ::演算子()の型から抽出されます。

このクラスは、ファンクターへのポインター、std :: tupleの形式の関数引数、および戻り値を格納します。



したがって、ラムダ関数をパラメーター付きでオブジェクトに保存できるようになりました。次に、その呼び出し方法を学習する必要があります。

これを行うには、m_varsに格納されているパラメーターを使用してm_funcでopertator()を呼び出します。

最初からこれを行う方法はわかりませんでしたが、Googleの使用の増加と2番目のリンクへの移行により結果が得られました。



  template<int ...> struct seq { }; template<int N, int ...S> struct gens : gens<N-1, N-1, S...> { }; template<int ...S> struct gens<0, S...> { typedef seq<S...> type; };
      
      







この構成を使用して、次の関数をクラスに追加できます。



  ... public: void invoke() { ReturnType r = caller(typename gens<sizeof...(Args)>::type()); } private: template<int ...S> ReturnType caller(seq<S...>) const { return m_func(std::get<S>(m_vars) ...); } ...
      
      







タスクの基本クラス


次に、タスクの基本クラスを実装します。



  class abstract_task { protected: mutable std::mutex m_mutex; mutable std::condition_variable m_cond_var; mutable bool m_complete; public: abstract_task(): m_complete(false) {} virtual ~abstract_task() {} virtual void invoke() = 0; virtual void wait() const { std::unique_lock<std::mutex> lock(m_mutex); while (!m_complete) { m_cond_var.wait(lock); } } };
      
      





このクラスには、ミューテックスと、タスクの完了を通知する状態変数が含まれています。 したがって、ソースコードはgithubで利用できるため、このクラスの問題にはいくつかの変更がありますが、これは省略します。



タスク作成


タスクを作成するためのラッパー関数を作成しましょう。



  template <typename T, typename ... Args> std::shared_ptr<task<decltype(&T::operator())>> make_task(T func, Args ... args ) { return std::shared_ptr<task<decltype(&T::operator())>>(new task<decltype(&T::operator())>(func, args ...)); }
      
      





仮想クラスがあるため、ポインターを使用するのは論理的であり、ポインターだけでなくスマートポインターも使用します。



管理クラス


次に、バックグラウンドスレッドでタスクを実行するためのエンティティを実装します。

コードの一部のみを提供します。



 ... class control { std::deque<std::shared_ptr<abstract_task>> m_tasks; std::vector<std::thread> m_pool; std::mutex m_mutex; std::condition_variable m_cond_var; std::condition_variable m_empty_cond; std::atomic<bool> m_run; std::vector<bool> m_active; public: control(std::size_t pool_size = 2) { m_run.store(true, std::memory_order_relaxed); auto func = [this](int n) { while (m_run.load(std::memory_order_relaxed)) { std::unique_lock<std::mutex> lock(m_mutex); m_active[n] = true; if (m_tasks.empty()) { m_empty_cond.notify_all(); m_active[n] = false; m_cond_var.wait(lock); } else { std::shared_ptr<abstract_task> t = m_tasks.front(); m_tasks.pop_front(); lock.unlock(); t->invoke(); lock.lock(); m_active[n] = false; } } }; pool_size = pool_size > 0 ? pool_size : 1; m_active.resize(pool_size, false); for(std::size_t i = 0; i < pool_size; ++i) { m_pool.emplace_back(func, i); } } ...
      
      







楽しみのために、私は新しい標準のすべての機能を使用しました。

このクラスは、スレッドの配列とアクティビティ状態変数の配列を作成して、子スレッドによるタスクの実行を監視します。

子スレッドのメインループは、アトミック変数によって制御されます(原則として、それはvolatile宣言するのに十分でした。競合状態はなく、メインスレッドはそれに書き込みのみを行い、子は読み取りのみを行うためです)



性能


std :: asyncと比較したこのソリューションのパフォーマンステスト用でない場合、この記事を書くことはほとんどありません。

構成:

Intel Core(TM)i7-2600 CPU @ 3.40GHz

$ gcc --version

gcc(Debian 4.8.2-21)4.8.2





テストは、配列の並列加算と、すべての加算の結果の非同期加算で構成されます。 操作の結果は次のようになります。

res = sum(配列)* N_P




数値はミリ秒単位です。



テスト1


最適化はオフになり、配列内の要素の数は100,000,000、生成されたタスクの数は73、プール内のスレッドの数は6

結果:

test_act 16775 OK

test_async 16028 OK


パフォーマンスは同等です。



テスト2


最適化が有効になっており、配列内の要素の数は100,000,000、生成されたタスクの数は73、プール内のスレッドの数は6です

結果:

test_act 1597.6 OK

test_async 2530.5 OK


私の実装は1.5倍高速です。



テスト3


最適化が有効になっており、配列内の要素の数は100,000,000、生成されたタスクの数は73、プール内のスレッドの数は7

結果:

test_act 1313.1 OK

test_async 2503.7 OK




テスト4


最適化が有効になり、配列内の要素の数は100,000,000、生成されたタスクの数は73、プール内のスレッドの数は8

結果:

test_act 1402 OK

test_async 2492.2 OK




テスト5


最適化が有効になっており、配列内の要素の数は100,000,000、生成されたタスクの数は173、プール内のスレッドの数は8

結果:

test_act 4435.7 OK

test_async 5789.4 OK




結論とバグ


これらの結果は、おそらく非同期が各タスクに対して独自のスレッドを生成するという事実によるものです。私の実装では、スレッドの数は固定されており、作成のオーバーヘッドはありません。

バグ-ラムダ関数でスコープ変数を([]経由で)キャプチャすると、SIGSEGVが呼び出されます。 パラメータを介してそれらを転送しても問題ありませんが。



この記事とライブラリ自体の有用性はわかりませんが、少なくとも私の標準では新しい標準の機能のいくつかを適用しました。

ソースコード



All Articles