「まれな」アーラン言語に関する記事カウンターをもう1つ増やすために、アーランクラスターをひざの上で組み立てて、並列コンピューティングを実行する方法を説明します。
何が起こっているのかを理解するには、読者はErlangの基本(OTPなし)を知る必要があるかもしれません。 ちなみに、これは入手可能 レジを離れることなく 居心地の良いHabrを離れることなく、ここに:
しかし、事前にスマートになってはいけません...
叙情的な余談
現在理解しているように、IT業界の発展につれて、まったく異なるプログラミング言語がアプリケーションを見つけています。 Erlang言語は例外ではなく、もともとはフォールトトレラントシステム用の言語として開発されましたが、分散マルチプロセッサシステムの世界でニッチを獲得した理由は次のとおりです。
- マルチスレッドプログラミングの便利なモデル( 同時実行性 );
- 異なるデバイス上のノードで構成される分散システムへのこのモデルの正常な拡張( 分散コンピューティング );
- このモデルの効果的な実装。 高可用性システムを開発するためのわずかな努力を可能にします。
- 「ホットスワップコード」の組み込みメカニズム( ホットスワップ );
- 多くの開発ツールを使用します。
そして、残りながら:
- 汎用関数型プログラミング言語、
- フォールトトレラントシステムの開発( フォールトトレラント )
- 独自の方法論を使用して、複雑なシステムを効率的に開発します。
本番環境でErlangを使用している企業とプロジェクトをリストします。
- WhatsApp-メッセンジャー、
- RabbitMQ-メッセージブローカー、
- Bet365は、2,000万人以上の顧客を抱えるオンラインブックメーカーです。
- Riak-分散NoSQLデータベース、
- Couchbase-分散NoSQLデータベース、
- Ejabberd-XMPPサーバー、
- ゴールドマン・サックスはアメリカの大手銀行です。
私たちの意見では、言語には弱点があることを読者から隠しません。弱点は、他の著者によってよく説明されています。たとえば、 Erlang in Wargaming
それでは、プログラムを作成して、それを行う関数を記述しましょう...
関数値の並列計算(同時に実行されるプロセスの数と計算タイムアウトの制限付き)
マップ機能
この関数の署名をlists:map
似たものにしlists:map
:
map(Fn, Items, WorkersN, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason}
map(Fn, Items, WorkersN, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason}
、ここで:
-
Fn
は、値を計算する必要がある関数です。 -
Items
-この関数の引数値のリスト、 -
WorkersN
同時に実行されるプロセスの最大数、 -
Timeout
-待機する準備ができている関数の値を計算するためのタイムアウト、 -
FnResult
関数Fn
計算値、 -
Reason
-ワーカープロセスの終了理由。
関数の実装:
-module(dmap_pmap). ... map(Fn, Items, WorkersN, Timeout) -> Total = length(Items), Context = #{fn => Fn, items => Items, results => #{}, counter => 1, total => Total, workers => 0, workers_max => min(WorkersN, Total), pids => #{}, timeout => Timeout}, Self = self(), spawn(fun() -> Self ! map_loop(Context) end), Result = receive Any -> Any end, case get_error(Result) of undefined -> {ok, Result}; {'EXIT', Reason} -> throw({'EXIT', Reason}); {error, timeout} -> {error, timeout} end.
コードのハイライトは次のとおりです。
- Context
Context
値を初期化します。これをループでさらにスローします(map_loop
)。 - ワーカープロセスを実行し、それらからの結果を待つスーパーバイザープロセスを作成します。
-
map_loop
ループmap_loop
を実行して、このスーパーバイザーを実行します。 - スーパーバイザーからの結果を待って、関数の結果を返します。
- プライベート関数
get_error
計算エラーを返すか、エラーがなかった場合はundefined
返します。
map_loop関数
この関数のシグネチャ: map_loop(Context) -> [FnResult]
実装:
map_loop(#{counter := Counter, total := Total, workers := Workers, workers_max := WorkersMax, fn := Fn, items := Items, pids := PIDs} = Context) when Workers < WorkersMax, Counter =< Total -> Self = self(), Index = Counter, WorkerIndex = Workers + 1, PID = spawn(fun() -> WorkerPID = self(), io:fwrite("{Index, PID, {W, WMax}}: ~p~n", [{Index, WorkerPID, {Workers + 1, WorkersMax}}]), Item = lists:nth(Index, Items), Self ! {Index, WorkerPID, catch Fn(Item, WorkerIndex)} end), Context2 = Context#{counter => Counter + 1, workers => Workers + 1, pids => PIDs#{PID => Index}}, map_loop(Context2); map_loop(#{workers := Workers, timeout := Timeout, pids := _PIDs} = Context) when Workers > 0 -> receive {Index, PID, {'EXIT', _Reason} = Result} when is_integer(Index) ->%% error case io:fwrite("got error: ~p~n", [{Index, PID, Result}]), Context2 = set_worker_result(PID, {Index, Result}, Context), Context3 = kill_workers(Context2, error), create_result(Context3); {Index, PID, Result} when is_integer(Index) -> %% ok case io:fwrite("got result: ~p~n", [{Index, PID, Result}]), Context2 = set_worker_result(PID, {Index, Result}, Context), map_loop(Context2) after Timeout -> %% timeout case io:fwrite("timeout: ~p~n", [#{context => Context}]), Context3 = kill_workers(Context, {error, timeout}), create_result(Context3) end; map_loop(#{workers := Workers, pids := PIDs} = Context) when Workers == 0, PIDs == #{} -> create_result(Context).
実装を見ていきましょう。
-
map_loop
関数の最初の実装は、WorkersMax
ワーカー以下でWorkersMax
連続して実行されません。 ワーカーはspawn
から開始し、そのロジックは匿名関数内に記述されFn
関数の値を計算し、結果をスーパーバイザーに送信します。 - 関数の2番目の実装(覚えているように、最初の実装を呼び出すための条件が満たされない場合に呼び出されます):ワーカーからの結果を待ちます。 結果を受け取った後、彼女は次に何をすべきかを決定します。サイクルを継続するか、作業を完了するか。
- 関数の3番目の実装では、アクティブなワーカーがなくなったときに到着し(これはすべての計算が完了したことを意味します)、単に計算の結果を返します。
ここで使用したプライベート関数について見ていきましょう。
-
set_worker_result
計算の結果をスーパーバイザーのContext
保存します。 -
kill_workers
作業者がいるすべてのプロセスを強制終了します(作業が中断した場合)。 -
create_result
ワーカーから受け取った計算create_result
結果をcreate_result
。
関数の完全なリストはGitHubで見ることができます: ここ
テスト中
ここで、Erlang REPLを使用して関数を少しテストします。
1)2番目のワーカーの結果が最初のワーカーよりも早くなるように、2人のワーカーに対して計算を実行します。
>catch dmap_pmap:map(fun(1, _) -> timer:sleep(2000), 1; (2, _) -> timer:sleep(1000), 2 end, [1, 2], 2, 5000).
最後の行は計算の結果です。
{Index, PID, {W, WMax}}: {1,<0.1010.0>,{1,2}} {Index, PID, {W, WMax}}: {2,<0.1011.0>,{2,2}} got result: {2,<0.1011.0>,2} got result: {1,<0.1010.0>,1} {ok,[1,2]}
2)2人のワーカーの計算を実行して、最初のワーカーでクラッシュが発生するようにします。
>catch dmap_pmap:map(fun(1, _) -> timer:sleep(100), erlang:exit(terrible_error); (2, _) -> timer:sleep(100), 2 end, [1, 2], 2, 5000).
{Index, PID, {W, WMax}}: {1,<0.2149.0>,{1,2}} {Index, PID, {W, WMax}}: {2,<0.2150.0>,{2,2}} got error: {1,<0.2149.0>,{'EXIT',terrible_error}} kill: <0.2150.0> {'EXIT',terrible_error}
3)関数の計算時間が許容タイムアウトを超えるように、2人のワーカーに対して計算を実行します。
> catch dmap_pmap:map(fun(1, _) -> timer:sleep(2000), erlang:exit(terrible_error); (2, _) -> timer:sleep(100), 2 end, [1, 2], 2, 1000).
{Index, PID, {W, WMax}}: {1,<0.3184.0>,{1,2}} {Index, PID, {W, WMax}}: {2,<0.3185.0>,{2,2}} got result: {2,<0.3185.0>,2} timeout: #{context => #{counter => 3,fn => #Fun<erl_eval.12.99386804>, items => [1,2], pids => #{<0.3184.0> => 1}, results => #{2 => 2}, timeout => 1000,total => 2,workers => 1,workers_max => 2}} kill: <0.3184.0> {error,timeout}
そして最後に...
クラスターコンピューティング
テスト結果は妥当に見えますが、クラスターがそれとどう関係しているのか、熱心な読者が尋ねるかもしれません。
実際、クラスターでのコンピューティングを開始するために必要なほぼすべてのものがすでにあることがわかります。クラスターとは、関連するErlangノードのセットを意味します。
別のdmap_dmap
モジュールにdmap_dmap
、次のシグネチャdmap_dmap
持つ別の関数があります。
map({M, F}, Items, WorkersNodes, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason}
map({M, F}, Items, WorkersNodes, Timeout) -> {ok, [FnResult]} | {error, timeout}, throws {'EXIT', Reason}
、ここで:
-
{M, F}
-引数を適用するモジュールと関数の名前(alaF:M(Item)
)、 -
Items
-この関数の引数値のリスト、 -
WorkersNodes
計算を実行するノードの名前のリスト、 -
Timeout
-待機する準備ができている関数の値を計算するためのタイムアウト。
実装:
-module(dmap_dmap). ... map({M, F}, Items, WorkersNodes, Timeout) -> Fn = fun(Item, WorkerIndex) -> Node = lists:nth(WorkerIndex, WorkersNodes), rpc:call(Node, M, F, [Item]) end, dmap_pmap:map(Fn, Items, length(WorkersNodes), Timeout).
この関数のロジックは非常に単純です。前のセクションのdmap_pmap:map
関数を使用します。この関数に匿名関数を代入し、単純に目的のノードで計算を実行します。
テストのために、別のモジュールで、ノードの名前を返す関数を開始します。
-module(dmap_test). test(X) -> {ok, {node(), X}}.
テスト中
テストのために、2つのターミナルでノードを実行する必要があります。たとえば、次のように(プロジェクトの作業ディレクトリから):
make run NODE_NAME=n1@127.0.0.1
make run NODE_NAME=n2@127.0.0.1
最初のノードで計算を実行します。
(n1@127.0.0.1)1> dmap_dmap:map({dmap_test, test}, [1, 2], ['n1@127.0.0.1', 'n2@127.0.0.1'], 5000).
そして、結果が得られます。
{Index, PID, {W, WMax}}: {1,<0.1400.0>,{1,2}} {Index, PID, {W, WMax}}: {2,<0.1401.0>,{2,2}} got result: {1,<0.1400.0>,{ok,{'n1@127.0.0.1',1}}} got result: {2,<0.1401.0>,{ok,{'n2@127.0.0.1',2}}} {ok,[{ok,{'n1@127.0.0.1',1}},{ok,{'n2@127.0.0.1',2}}]}
置き換えることができるように、注文したとおり、結果は2つのノードから届きました。
結論の代わりに
私たちの簡単な例は、アプリケーションの分野で、Erlangが有用な問題を解決することを比較的簡単にしていることを示しています(他のプログラミング言語を使用して解決するのはそれほど簡単ではありません)。
記事の形式が短いため、ライブラリのコードとアセンブリに関する質問があるかもしれませんが、それらは舞台裏に残されています。
GitHubの詳細については、 こちらをご覧ください 。
次の記事で残りの詳細をカバーすることをお約束します。