前回の記事では、リアクティブアーキテクチャの理論的基礎について検討しました。 データストリーム、リアクティブなErlang / Elixirシステムの実装方法、およびそれらのメッセージングパターンについて説明します。
SOA、MSA、およびメッセージング
SOA、MSA-システム構築のルールを定義するシステムアーキテクチャ。メッセージングはその実装のプリミティブを提供します。
建築システムのこれまたはそのアーキテクチャを宣伝したくありません。 私は、特定のプロジェクトとビジネスに最も効果的で有用なプラクティスを使用しています。 どのパラダイムを選んだとしても、Unixウェイに注目してシステムブロックを作成することをお勧めします。個々のエンティティを担当する最小限の接続性を持つコンポーネントです。 APIメソッドは、エンティティを使用して最も単純なアクションを実行します。
メッセージング-名前が示すように、メッセージブローカーです。 その主な目的は、メッセージを受信および提供することです。 彼は、情報を送信するためのインターフェイス、システム内で情報を送信するための論理チャネルの形成、ルーティングとバランス、およびシステムレベルでの障害の処理を担当しています。
開発中のメッセージングは、rabbitmqと競合したり置き換えたりしようとはしていません。 主な機能:
- 配布
交換ポイントは、それらを使用するコードに可能な限り近いクラスターのすべてのノードで作成できます。 - シンプル。
定型コードとユーザビリティの最小化に焦点を当てます。 - 最高のパフォーマンス。
rabbitmqの機能を繰り返そうとはしていませんが、OTPで可能な限りシンプルでコストを最小限に抑えるアーキテクチャ層とトランスポート層のみを強調しています。 - 柔軟性。
各サービスは、多くの交換テンプレートを組み合わせることができます。 - 設計に固有のフォールトトレランス。
- スケーラビリティ。
メッセージングはアプリケーションとともに成長します。 負荷が増加すると、交換ポイントを個々のマシンに移動できます。
発言。 コード編成の観点から見ると、メタプロジェクトはErlang / Elixirの複雑なシステムに適しています。 すべてのプロジェクトコードは1つのリポジトリ、つまりアンブレラプロジェクトにあります。 同時に、マイクロサービスは可能な限り分離され、別のエンティティを担当する単純な操作を実行します。 このアプローチを使用すると、システム全体のAPIを簡単にサポートでき、変更を加えるだけで、ユニットと統合テストを作成するのに便利です。
システムコンポーネントは、直接またはブローカーを介して対話します。 メッセージングの観点から、各サービスにはいくつかのライフフェーズがあります。
- サービスの初期化。
この段階で、プロセスと依存関係を実行するサービスの構成と起動が行われます。 - 交換ポイントを作成します。
サービスは、ノードの構成で指定された静的交換ポイントを使用するか、交換ポイントを動的に作成できます。 - サービス登録。
サービスがリクエストを処理するには、交換ポイントで登録する必要があります。 - 通常の機能。
サービスは有用な仕事を生み出します。 - シャットダウン。
シャットダウンには、通常と緊急の2種類があります。 通常のサービスでは、交換ポイントから切断され、停止します。 緊急の場合、メッセージングはフェールオーバーシナリオの1つを実行します。
かなり複雑に見えますが、コードのすべてがそれほど怖いわけではありません。 コメント付きのコードの例は、テンプレートの分析で少し後で説明します。
取引所
交換ポイントは、メッセージングテンプレート内のコンポーネントとの対話のロジックを実装するメッセージングプロセスです。 以下のすべての例で、コンポーネントは交換ポイントを介して相互作用し、その組み合わせがメッセージングを形成します。
メッセージ交換パターン(MEP)
グローバルに、共有パターンは双方向と一方向に分けることができます。 前者は受信したメッセージへの応答を意味し、後者はそうではありません。 クライアント/サーバーアーキテクチャの双方向パターンの典型的な例は、要求/応答パターンです。 テンプレートとその変更を検討してください。
要求-応答またはRPC
RPCは、別のプロセスから応答を取得する必要がある場合に使用されます。 このプロセスは、同じサイトで起動することも、異なる大陸に配置することもできます。 以下は、メッセージングを介したクライアントとサーバーの相互作用の図です。
メッセージングは完全に非同期であるため、クライアントの交換は2つのフェーズに分けられます。
提出依頼
messaging:request(Exchange, ResponseMatchingTag, RequestDefinition, HandlerProcess).
Exchange-交換ポイントの一意の名前
ResponseMatchingTag-応答を処理するローカルラベル。 たとえば、異なるユーザーに属する複数の同一のリクエストを送信する場合。
RequestDefinition-リクエスト本文
HandlerProcess -PIDハンドラー。 このプロセスは、サーバーから応答を受け取ります。
応答処理
handle_info(#'$msg'{exchange = EXCHANGE, tag = ResponseMatchingTag,message = ResponsePayload}, State)
ResponsePayload-サーバーの応答。
サーバーの場合、プロセスは2つのフェーズで構成されます。
- 交換ポイントの初期化
- 着信リクエストの処理
このテンプレートをコードで説明しましょう。 正確な時間メソッドのみを提供する単純なサービスを実装する必要があるとします。
サーバーコード
api.hrlでサービスAPIの定義を取り出します。
%% ===================================================== %% entities %% ===================================================== -record(time, { unixtime :: non_neg_integer(), datetime :: binary() }). -record(time_error, { code :: non_neg_integer(), error :: term() }). %% ===================================================== %% methods %% ===================================================== -record(time_req, { opts :: term() }). -record(time_resp, { result :: #time{} | #time_error{} }).
time_controller.erlでサービスコントローラーを定義する
%% . gen_server . %% gen_server init(Args) -> %% messaging:monitor_exchange(req_resp, ?EXCHANGE, default, self()) {ok, #{}}. %% . , . handle_info(#exchange_die{exchange = ?EXCHANGE}, State) -> erlang:send(self(), monitor_exchange), {noreply, State}; %% API handle_info(#time_req{opts = _Opts}, State) -> messaging:response_once(Client, #time_resp{ result = #time{ unixtime = time_utils:unixtime(now()), datetime = time_utils:iso8601_fmt(now())} }); {noreply, State}; %% gen_server terminate(_Reason, _State) -> messaging:demonitor_exchange(req_resp, ?EXCHANGE, default, self()), ok.
顧客コード
サービスにリクエストを送信するには、クライアントの任意の場所でメッセージングリクエストAPIを呼び出すことができます。
case messaging:request(?EXCHANGE, tag, #time_req{opts = #{}}, self()) of ok -> ok; _ -> %% repeat or fail logic end
分散システムでは、コンポーネントの構成が非常に異なる場合があり、要求の時点でメッセージングがまだ開始されていないか、サービスコントローラーが要求を処理する準備ができていません。 したがって、メッセージングの応答を確認し、失敗のケースを処理する必要があります。
送信に成功すると、クライアントはサービスから応答またはエラーを受け取ります。
handle_infoで両方のケースを処理します。
handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time{unixtime = Utime}}}, State) -> ?debugVal(Utime), {noreply, State}; handle_info(#'$msg'{exchange = ?EXCHANGE, tag = tag, message = #time_resp{result = #time_error{code = ErrorCode}}}, State) -> ?debugVal({error, ErrorCode}), {noreply, State};
リクエストチャンク応答
巨大なメッセージの送信を許可しない方が良い。 システム全体の応答性と安定した動作はこれに依存します。 要求への応答が大量のメモリを消費する場合、パーツへの分解は必須です。
そのような場合の例をいくつか示します。
- コンポーネントは、ファイルなどのバイナリデータを交換します。 回答を小さな部分に分割すると、メモリオーバーフローをキャッチせずに、あらゆるサイズのファイルで効率的に作業できます。
- リスト。 たとえば、データベース内の巨大なテーブルからすべてのレコードを選択し、それを別のコンポーネントに転送する必要があります。
これらの回答を機関車と呼びます。 いずれの場合も、1 MBの単一メッセージよりも、1 MBの1024メッセージの方が優れています。
Erlangクラスターでは、追加のゲインが得られます-交換ポイントをバイパスして、回答が受信者にすぐに送信されるため、交換ポイントとネットワークの負荷が軽減されます。
リクエストを伴う応答
これは、インタラクティブシステムを構築するためのRPCパターンの非常にまれな変更です。
パブリッシュ/サブスクライブ(データ配布ツリー)
イベント指向システムは、データが利用可能になると、データを消費者に配信します。 したがって、システムはプルまたはポーリングよりもモデルをプッシュする傾向があります。 この機能により、常にクエリを実行してデータを待機することでリソースを浪費することがなくなります。
この図は、特定のトピックにサブスクライブしている消費者にメッセージを配信するプロセスを示しています。
このテンプレートの使用の古典的な例は、状態の分布です:コンピューターゲームのゲーム世界、取引所の市場データ、データフィードの有用な情報。
加入者コードを考慮してください。
init(_Args) -> %% , = key messaging:subscribe(?SUBSCRIPTION, key, tag, self()), {ok, #{}}. handle_info(#exchange_die{exchange = ?SUBSCRIPTION}, State) -> %% , messaging:subscribe(?SUBSCRIPTION, key, tag, self()), {noreply, State}; %% handle_info(#'$msg'{exchange = ?SUBSCRIPTION, message = Msg}, State) -> ?debugVal(Msg), {noreply, State}; %% - terminate(_Reason, _State) -> messaging:unsubscribe(?SUBSCRIPTION, key, tag, self()), ok.
ソースは、便利な場所で公開後関数を呼び出すことができます。
messaging:publish_message(Exchange, Key, Message).
Exchange-交換ポイントの名前、
キー -ルーティングキー
メッセージ -ペイロード
反転パブリッシュ/サブスクライブ
pub-subを展開すると、ロギングに便利なパターンを取得できます。 ソースとコンシューマのセットは完全に異なる場合があります。 この図は、1つのコンシューマと多くのソースがあるケースを示しています。
タスク分布パターン
ほとんどすべてのプロジェクトで、レポートの生成、通知の配信、サードパーティのシステムからのデータの受信など、処理を遅延させるタスクが発生します。 これらのタスクを実行するシステムのスループットは、ハンドラーを追加することで簡単にスケーリングできます。 残っているのは、ハンドラーのクラスターを形成し、それらの間でタスクを均等に分散することです。
3つのハンドラーの例で発生する状況を考慮してください。 タスクの配布段階でも、配布の公平性とハンドラーのオーバーフローの問題が発生します。 ラウンドロビン配布は正義を担当し、ハンドラーのオーバーフローの状況を回避するために、 prefetch_limit制限を導入します。 一時モードでは、 prefetch_limitは1つのハンドラーがすべてのタスクを取得することを許可しません。
メッセージングは、キューと処理の優先順位を管理します。 ハンドラーは、利用可能になるとタスクを受け取ります。 タスクは成功または失敗する場合があります。
-
messaging:ack(Tack)
-メッセージ処理が成功した場合に呼び出されます -
messaging:nack(Tack)
-すべての緊急事態で呼び出されます。 タスクが戻った後、メッセージングはそれを別のハンドラーに転送します。
3つのタスクを処理するときに、複雑な障害が発生したとします。タスクを受け取った後のハンドラー1が落ち、交換ポイントに何かを報告する時間がありません。 この場合、ackタイムアウトの期限が切れた後の交換ポイントは、ジョブを別のハンドラーに転送します。 ハンドラー3は何らかの理由でタスクを拒否し、nackを送信しました。その結果、タスクは正常に完了した別のハンドラーにも渡されました。
予備結果
分散システムの基本的な構成要素を分解し、Erlang / Elixirでの使用の基本的な理解を得ました。
基本的なパターンを組み合わせることで、新たな問題を解決するための複雑なパラダイムを構築できます。
サイクルの最後の部分では、サービスの構成、ルーティング、およびバランシングの一般的な問題を検討し、システムのスケーラビリティとフォールトトレランスの実際的な側面についても説明します。
2番目の部分の終わり。
写真マリウス・クリステンセン
websequencediagrams.comが作成したイラスト