Erlangの高速TCPソケット

TCP接続の処理は、速度が1秒あたり1万リクエストに近づくと簡単にボトルネックになります。効率的な読み取りと書き込みは別の問題になり、処理コアのほとんどはアイドル状態になります。



この記事では、TCPを操作する3つのコンポーネント、接続の受信、メッセージの受信、およびそれらへの応答を改善する最適化を提案します。



この記事は、Erlangプログラマーと、単にErlangに興味がある人の両方を対象としています。 言語の深い知識は必要ありません。





「TCPの操作」を3つの部分に分けます。

  1. 接続受け入れ
  2. メッセージを受信する
  3. メッセージへの返信


タスクによっては、これらの部分のいずれかがボトルネックになる場合があります。



TCPサービスを作成するには、 gen_tcpを直接使用する方法と、最も人気のあるErlang接続プールライブラリであるranchを使用する方法の2つを検討します。 提案された最適化の一部は、いずれかの場合にのみ適用されます。



パフォーマンスの変化を評価するために、 tcp_workerでMZBenchを使用します。tcp_workerは、接続および要求機能と同期機能を実装します。 2つのスクリプト「fast_connect」と「fast_receive」が使用されます。 1つ目は速度を上げて接続を開き、2つ目は既に開いている接続でできるだけ多くのパケットを送信しようとします。 各スクリプトは、c4.2xlarge Amazonノードで実行されました。 Erlangバージョンは18です。



MZBenchのスクリプトと関数コードはGitHubで入手できます



接続受け入れ



常に再接続するクライアントが多数ある場合、たとえば、クライアントプロセスの時間が非常に制限されている場合、または永続的な接続をサポートしていない場合は、接続をすばやく受け入れることが重要です。



牧場の最適化



牧場を使用するTCPサービスは非常に簡単です。 ランチに付属するサンプルエコーサービスのコードを変更して、着信パケットに対して「ok」と応答するようにします。違いは以下のとおりです。



--- a/examples/tcp_echo/src/echo_protocol.erl +++ b/examples/tcp_echo/src/echo_protocol.erl @@ -16,8 +16,8 @@ init(Ref, Socket, Transport, _Opts = []) -> loop(Socket, Transport) -> case Transport:recv(Socket, 0, 5000) of - {ok, Data} -> - Transport:send(Socket, Data), + {ok, _Data} -> + Transport:send(Socket, <<"ok">>), loop(Socket, Transport); _ -> ok = Transport:close(Socket) --- a/examples/tcp_echo/src/tcp_echo_app.erl +++ b/examples/tcp_echo/src/tcp_echo_app.erl @@ -11,8 +11,8 @@ %% API. start(_Type, _Args) -> - {ok, _} = ranch:start_listener(tcp_echo, 1, - ranch_tcp, [{port, 5555}], echo_protocol, []), + {ok, _} = ranch:start_listener(tcp_echo, 100, + ranch_tcp, [{port, 5555}, {max_connections, infinity}], echo_protocol, []), tcp_echo_sup:start_link().
      
      







「fast_connect」スクリプトを実行して開始します(接続を開く速度を上げます)。





左側のグラフは、サイズが214msの異常値を示しています。残りの行は、5秒間隔に分割された時間遅延のパーセンタイルに対応しています。 右側のグラフは、化合物の開封速度です。たとえば、放電領域では、毎秒約3.5千化合物でした。 このシナリオでは、毎回1つのメッセージが送信されるため、メッセージの数は開いている接続の数に対応します。



速度をさらに上げると、次の結果が得られます。







1000ミリ秒の放出はタイムアウトに対応します。 化合物を開く速度を上げ続けると、放出がより頻繁になります。 最初のスパイクは5k rpsで現れ、11k rpsで常に存在します。



タイマー付きのパケット受信時のタイムアウトを置き換える:sleep()



メッセージを受信する際のタイムアウトパラメータの単純な例外により、接続の確立速度が大幅に向上することがわかりました。 最大速度でソケットをポーリングしないように、タイマーを追加しました:sleep(20):



 --- a/examples/tcp_echo/src/echo_protocol.erl +++ b/examples/tcp_echo/src/echo_protocol.erl @@ -15,10 +15,11 @@ init(Ref, Socket, Transport, _Opts = []) -> loop(Socket, Transport). loop(Socket, Transport) -> - case Transport:recv(Socket, 0, 5000) of - {ok, Data} -> - Transport:send(Socket, Data), + case Transport:recv(Socket, 0, 0) of + {ok, _Data} -> + Transport:send(Socket, <<"ok">>), loop(Socket, Transport); + {error, timeout} -> timer:sleep(20), loop(Socket, Transport); _ -> ok = Transport:close(Socket) end.
      
      







この最適化により、牧場アプリケーションはより多くの更新を取得できます。最初の急増は11k rpsでのみ発生します。







さらに速度を上げようとすると、放出はさらに大きくなります。 したがって、最大数は24k rpsです。



おわりに

提案された最適化により、11kから24k rpsの接続受信速度の約2倍のゲインが得られました。



Gen_tcpの最適化



以下は、私が牧場で行ったことに似た、gen_tcpを使用したクリーンな実装です(テキストは、リポジトリにsimple.erlとしてサンプルが用意されています)。



 -export([service/1]). -define(Options, [ binary, {backlog, 128}, {active, false}, {buffer, 65536}, {keepalive, true}, {reuseaddr, true} ]). -define(Timeout, 5000). main([Port]) -> {ok, ListenSocket} = gen_tcp:listen(list_to_integer(Port), ?Options), accept(ListenSocket). accept(ListenSocket) -> case gen_tcp:accept(ListenSocket) of {ok, Socket} -> erlang:spawn(?MODULE, service, [Socket]), accept(ListenSocket); {error, closed} -> ok end. service(Socket) -> case gen_tcp:recv(Socket, 0, ?Timeout) of {ok, _Binary} -> gen_tcp:send(Socket, <<"ok">>), service(Socket); _ -> gen_tcp:close(Socket) end.
      
      







同じスクリプトを実行すると、結果が得られました。







ご覧のとおり、約18k rpsで接続の受信が不安定になります。 私たちは、18kを要することが判明したと仮定します。



タイマー付きのパケット受信時のタイムアウトを置き換える:sleep()



牧場と同じ最適化を適用します。



 service(Socket) -> case gen_tcp:recv(Socket, 0, 0) of {ok, _Binary} -> gen_tcp:send(Socket, <<"ok">>), service(Socket); {error, timeout} -> timer:sleep(20), service(Socket); _ -> gen_tcp:close(Socket) end.
      
      







この場合、23k rpsを処理します:







ホストプロセスの追加



2番目のアイデアは、接続を受け入れるプロセスの数を増やすことです。 これは、いくつかのプロセスからgen_tcp:acceptを呼び出すことで実現できます。



 main([Port]) -> {ok, ListenSocket} = gen_tcp:listen(list_to_integer(Port), ?Options), erlang:spawn(?MODULE, accept, [ListenSocket]), erlang:spawn(?MODULE, accept, [ListenSocket]), accept(ListenSocket).
      
      







負荷の下でテストすると32k rpsが得られます。







負荷がさらに増加すると、遅延が増加します。



おわりに

gen_tcpのタイムアウトを最適化すると、受信速度が18kから23kに5k rps増加します。

複数のホストプロセスがある場合、gen_tcpは32k rpsを処理します。これは、最適化なしの場合の1.8倍です。



まとめ







メッセージを受信する



これは、すでに確立された接続で多数のショートメッセージを受信する方法の一部です。 新しい接続が開かれることはめったにないため、できるだけ早くメッセージを読んで返信する必要があります。 このシナリオは、Webソケットを備えたロード済みアプリケーションに実装されます。



複数のノードから25kの接続を開き、メッセージの送信速度を徐々に上げます。



牧場の最適化



以下は、牧場を使用した最適化されていないコードの結果です(左側が時間遅延、右側がメッセージ処理速度)。





最適化を行わない場合、牧場は最大時間800msで70k rpsを処理します。



Linuxバッファーを増やす



かなり一般的な最適化は、 Linuxソケットバッファーの増加です。 この最適化が結果にどのように影響するかを見てみましょう。







おわりに

この場合、バッファを増やしても大きな利点はありません。



GET_TCP最適化



以下では、前の記事のgen_tcpソリューションでパケット処理速度を確認しました。





牧場のように70k rps。



読み取りプロセスの数を減らします。



前のケースでは、ソケットから25,000プロセスを読み取りました(接続ごとに1プロセス)。 次に、この数を減らして結果を確認します。



100個のプロセスを作成し、それらの間に新しいソケットを配布します。



 main([Port]) -> {ok, ListenSocket} = gen_tcp:listen(list_to_integer(Port), ?Options), Readers = [erlang:spawn(?MODULE, reader, []) || _X <- lists:seq(1, ?Readers)], accept(ListenSocket, Readers, []). accept(ListenSocket, [], Reversed) -> accept(ListenSocket, lists:reverse(Reversed), []); accept(ListenSocket, [Reader | Rest], Reversed) -> case gen_tcp:accept(ListenSocket) of {ok, Socket} -> Reader ! Socket, accept(ListenSocket, Rest, [Reader | Reversed]); {error, closed} -> ok end. reader() -> reader([]). read_socket(S) -> case gen_tcp:recv(S, 0, 0) of {ok, _Binary} -> gen_tcp:send(S, <<"ok">>), true; {error, timeout} -> true; _ -> gen_tcp:close(S), false end. reader(Sockets) -> Sockets2 = lists:filter(fun read_socket/1, Sockets), receive S -> reader([S | Sockets2]) after ?SmallTimeout -> reader(Sockets) end.
      
      







この最適化により、パフォーマンスが大幅に向上します。







速度の向上に加えて、時間遅延ははるかに良く見え、処理されるパケットの数は約100kであり、さらに、120kのメッセージでも処理できますが、大きな時間遅延があります。 最適化がなければ、これはできませんでした。



おわりに

1つのプロセスから複数の接続を処理すると、純粋なgen_tcpサーバーのパフォーマンスが少なくとも50%向上します。



Linuxバッファーを増やす



vanilla gen_tcpスクリプトを使用して、システムに同じ最適化を適用します。





牧場の場合と同様に、重要な結果は表示されず、追加の外れ値のみが大きな時間遅延の形で現れました。



最適化を既に最適化されたgen_tcpに適用すると、多くの時間遅延外れ値が得られます。







おわりに

純粋なgen_tcpソリューションも、Linuxバッファーの増加の恩恵を受けません。 ソケットから読み取るプロセスの数を減らすと、処理速度が50%向上します。



まとめ







メッセージへの返信



正式には、前の章では、メッセージ処理サイクルはそれに対する答えを想定していましたが、この部分を最適化するためのことはしませんでした。 同じアイデアをメッセージ送信機能に適用してみます。 ここでは、前の章のスクリプトを使用します。このスクリプトでは、パケットは既に確立された接続を通過します。



タイムアウトとプロセスの最適化



前の章で使用したのと同じアイデアを送信機能に適用できます。タイムアウトを削除し、より少ないプロセスから応答します。 send関数にはタイムアウトなどのパラメーターはありません。接続を開くときに{send_timeout、0}オプションを設定する必要があります。



残念ながら、この最適化は実際には何も変更せず、コードを変更するだけでオプションを追加することになります。そのため、読者にdiffとグラフを煩わせないことにしました。



プロセス数がどのように影響するかを確認するために、次のスクリプトを使用しました。



 -export([responder/0, service/2]). -define(Options, [ binary, {backlog, 128}, {active, false}, {buffer, 65536}, {keepalive, true}, {send_timeout, 0}, {reuseaddr, true} ]). -define(SmallTimeout, 50). -define(Timeout, 5000). -define(Responders, 200). main([Port]) -> {ok, ListenSocket} = gen_tcp:listen(list_to_integer(Port), ?Options), Responders = [erlang:spawn(?MODULE, responder, []) || _X <- lists:seq(1, ?Responders)], accept(ListenSocket, Responders, []). accept(ListenSocket, [], Reversed) -> accept(ListenSocket, lists:reverse(Reversed), []); accept(ListenSocket, [Responder | Rest], Reversed) -> case gen_tcp:accept(ListenSocket) of {ok, Socket} -> erlang:spawn(?MODULE, service, [Socket, Responder]), accept(ListenSocket, Rest, [Responder | Reversed]); {error, closed} -> ok end. responder() -> receive S -> gen_tcp:send(S, <<"ok">>), responder() after ?SmallTimeout -> responder() end. service(Socket, Responder) -> case gen_tcp:recv(Socket, 0, ?Timeout) of {ok, _Binary} -> Responder ! Socket, service(Socket, Responder); _ -> gen_tcp:close(Socket) end.
      
      







ここでは、応答するプロセスが読者と共有されます。 25,000人の読者と200人の回答者がいます。



ただし、この最適化でも、前のセクションのgen_tcpソリューションと比較して、大幅なパフォーマンスの向上は見られません。





Erlangのチューニング



1つのプロセスを使用して複数のソケットを処理する場合、1つの低速なクライアントが他のすべてのソケットの速度を低下させる可能性があります。 この状況を回避するには、ソケットを開くときに{send_timeout、0}を設定し、失敗した場合は次のループで送信を繰り返します。



残念ながら、送信機能は送信されたバイト数を返しません。 POSIXエラーのみが返されるか、アトムが「OK」です。 これにより、正常に送信された最後のバイトから送信できなくなります。 さらに、この量を知っていると、ネットワークをより効率的に使用できます。これは、顧客のチャネルが貧弱な場合に特に重要になります。



次に、これを修正する方法の例を示します。



  1. 公式WebサイトからErlangソースをダウンロードします。

     $ wget http://erlang.org/download/otp_src_18.2.1.tar.gz $ tar -xf otp_src_18.2.1.tar.gz $ cd otp_src_18.2.1
          
          





  2. inet erts / emulator / drivers / common / inet_drv.cドライバー関数を更新します。

    1. 番号で応答する機能を追加します。

       static int inet_reply_ok_int(inet_descriptor* desc, int Val) { ErlDrvTermData spec[2*LOAD_ATOM_CNT + 2*LOAD_PORT_CNT + 2*LOAD_TUPLE_CNT]; ErlDrvTermData caller = desc->caller; int i = 0; i = LOAD_ATOM(spec, i, am_inet_reply); i = LOAD_PORT(spec, i, desc->dport); i = LOAD_ATOM(spec, i, am_ok); i = LOAD_INT(spec, i, Val); i = LOAD_TUPLE(spec, i, 2); i = LOAD_TUPLE(spec, i, 3); ASSERT(i == sizeof(spec)/sizeof(*spec)); desc->caller = 0; return erl_drv_send_term(desc->dport, caller, spec, i); }
            
            





    2. tcp_inet_commandv関数から「ok」を送信するアトムを削除しましょう。



        else inet_reply_error(INETP(desc), ENOTCONN); } else if (desc->tcp_add_flags & TCP_ADDF_PENDING_SHUTDOWN) tcp_shutdown_error(desc, EPIPE); >> else tcp_sendv(desc, ev); DEBUGF(("tcp_inet_commandv(%ld) }\r\n", (long)desc->inet.port)); }
            
            





    3. tcp_sendv関数で0を返す代わりに、int sendを追加します。

        default: if (len == 0) >> return inet_reply_ok_int(desc, 0); h_len = 0; break; } ----------------------------------- else if (n == ev->size) { ASSERT(NO_SUBSCRIBERS(&INETP(desc)->empty_out_q_subs)); >> return inet_reply_ok_int(desc, n); } else { DEBUGF(("tcp_sendv(%ld): s=%d, only sent " LLU"/%d of "LLU"/%d bytes/items\r\n", (long)desc->inet.port, desc->inet.s, (llu_t)n, vsize, (llu_t)ev->size, ev->vsize)); } DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n", (long)desc->inet.port, desc->inet.s)); driver_enqv(ix, ev, n); if (!INETP(desc)->is_ignored) sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } >> return inet_reply_ok_int(desc, n);
            
            







  3. 実行/構成&& make && make install。




これで、関数gen_tcp:sendは成功すると{ok、Number}を返します。 上記のコードフラグメントは「9」を出力します。



  {ok, Sock} = gen_tcp:connect(SomeHostInNet, 5555, [binary, {packet, 0}]), {ok, N} = gen_tcp:send(Sock, "Some Data"), io:format("~p", [N])
      
      







おわりに

1つのプロセスから複数の接続を処理する場合、ソケットの作成時に{send_timeout、0}オプションを使用する必要があります。そうしないと、1つの低速クライアントが他のすべてのクライアントへの送信を遅くする場合があります。



プロトコルが部分的なメッセージを処理できる場合、OTPにパッチを当てて、送信されたバイト数を考慮するのが最善です。



簡単に







参照資料






All Articles