状態同期用ライブラリ









そのため、あるプロジェクトでは、異なるプロセス間でデータを交換する方法を改革する必要がありました。 歴史的に確立されたスキームは見苦しいものでした。 1つのプロセスは、現在の設定をXMLファイルとして定期的に上書きします。 2番目のユーザーは、このファイルを1秒間に1回読み取り、前回から変更されていることを確認します。 ファイルの変更は、現在と過去の状態の多くの比較を通じて計算され、一連のアクションが発生しました。 読み取りプロセスは、3番目のプロセスなどによって読み取られた別のXMLファイルを作成しました。 最も悲しいことは、このスキームでは面倒な1回限りの比較コードが必要であり、新しいデータを追加するときに階層化されていたことです。



XMLファイルのこのすべての動物園を、pub / subをサポートするメッセージングシステムに置き換えるというアイデアが提案されました。 NATSRedis、およびZeroMQの 3つの候補が積極的に検討されました 。 メタデータだけでなく、大量のバイナリデータもリアルタイムで交換することが計画されていたため、最大帯域幅はコーナーの端になりました。 このため、最初の2つの候補者は、高レベルで便利なブローカーベースのAPIにもかかわらず、除外する必要がありました(テストでは、NATSがRedisに有利なスタートを切ることが示されましたが、ZeroMQは約20%を失います)。



次に、プロセス間で状態を同期する方法についての質問がありました。 次のスキームは論理的に見えました。



  1. サーバーに接続した後、クライアントは完全な状態を読み取ります。
  2. さらに、状態が変化すると、サーバーはクライアントがサブスクライブするパッチ(変更)を公開します。
  3. パッチを受信すると、クライアントはパッチの変更(イベント)に対応するハンドラーを呼び出し、サーバーの以前の状態にそれを課します。


JSONパッチの使用はこのスキームに完全に適合しているため、パッチを生成および適用するための車輪を再発明することはできません。 したがって、JSON Patchサポートが組み込まれたJSONライブラリは、状態同期のためのライブラリの理想的なベースになりました。







そのため、数週間の作業の後、次の通信プリミティブを含む小さなライブラリが作成されました。







  1. Publisherは、PUBソケットの単純なラッパーです。
  2. サブスクライバー -SUBソケットのラッパー。選択したストリームで通知を非同期的に処理できます。
  3. リクエスター-REQアドバイスのラッパー。これにより、要求を非同期的に送信し、専用スレッドで応答を処理できます。
  4. Replier -REPソケットのラッパー。専用ストリームで着信リクエストを処理できます。


これらのプリミティブに基づいて、状態を同期し、コールバックを特定の変更に割り当てるために、 クライアントサーバーが実装されました。







サンプルコードとその出力
#include <chrono> #include <map> #include <string> #include <vector> #include "syncer.h" using namespace nlohmann; using namespace std; using namespace std::chrono; using namespace syncer; struct Site { int temperature; int pressure; }; static inline void to_json(json& j, const Site& s) { j = json(); j["temperature"] = s.temperature; j["pressure"] = s.pressure; } static inline void from_json(const json& j, Site& s) { s.temperature = j.at("temperature").get<int>(); s.pressure = j.at("pressure").get<int>(); } struct State { map<string, Site> sites; string forecast; }; static inline void to_json(json& j, const State& s) { j = json(); j["sites"] = s.sites; j["forecast"] = s.forecast; } static inline void from_json(const json& j, State& s) { s.sites = j.at("sites").get<map<string, Site>>(); s.forecast = j.at("forecast").get<string>(); } PatchOpRouter<State> CreateRouter() { PatchOpRouter<State> router; router.AddCallback<int>(R"(/sites/(\w+)/temperature)", PATCH_OP_ANY, [] (const State& old, const smatch& m, PatchOp op, int t) { cout << "Temperature in " << m[1].str() << " has changed: " << old.sites.at(m[1].str()).temperature << " -> " << t << endl; }); router.AddCallback<Site>(R"(/sites/(\w+)$)", PATCH_OP_ADD, [] (const State&, const smatch& m, PatchOp op, const Site& s) { cout << "Site added: " << m[1].str() << " (temperature: " << s.temperature << ", pressure: " << s.pressure << ")" << endl; }); router.AddCallback<Site>(R"(/sites/(\w+)$)", PATCH_OP_REMOVE, [] (const State&, const smatch& m, PatchOp op, const Site&) { cout << "Site removed: " << m[1].str() << endl; }); return router; } int main() { State state; state.sites["forest"] = { 51, 29 }; state.sites["lake"] = { 49, 31 }; state.forecast = "cloudy and rainy"; Server<State> server("tcp://*:5000", "tcp://*:5001", state); Client<State> client("tcp://localhost:5000", "tcp://localhost:5001", CreateRouter()); this_thread::sleep_for(milliseconds(100)); cout << "Forecast: " << client.data().forecast << endl; state.sites.erase("lake"); state.sites["forest"] = { 50, 28 }; state.sites["desert"] = { 55, 30 }; state.forecast = "cloudy and rainy"; server.Update(state); this_thread::sleep_for(milliseconds(100)); return 0; }
      
      





このコードを実行した結果は、次の出力になります。







追加されたサイト:森林(温度:51、圧力:29)

追加されたサイト:湖(温度:49、圧力:31)

予報:曇りと雨

森林の温度が変化しました:51-> 50

削除されたサイト:湖

追加されたサイト:砂漠(温度:55、圧力:30)


もちろん、選択されたアプローチは、Epollを使用する代わりに、個々のソケットに寛大にスレッドを割り当てるため、パフォーマンスの点で最適とはほど遠いです。 したがって、多数の同時接続を必要とするシステムにはあまり適していません。 ほとんどの場合、これは重要ではないことを期待しましょう。







そのため、プロセス間通信のほとんどを大幅に簡素化することが可能になりました。 手動の変更チェックは他の機能と強く混合されているため、レガシーコードでは簡単に実行できません。したがって、「実際に」カットする必要があります。 一方、新しいコードの同期を実装することは喜びでした。








All Articles