OTP原則を使用したノンブロッキングTCPサーバーの作成

エントリー



このマニュアルの読者は、 gen_serverおよびgen_fsmの動作、 gen_tcpモジュールを使用したTCPソケットの相互作用、アクティブおよびパッシブソケットモード、および「OTPスーパーバイザー」の原理に既に精通していることを前提としてます。



OTPは、堅牢なアプリケーションを構築するための便利なツールを提供します。 部分的には、これは、OTPスーパーバイザーの階層によって接続されているgen_servergen_fsmなどの動作に一般的な機能を抽象化することで実現されます。



いくつかの有名なTCPサーバーテンプレートがあります。 検討するプロセスには、1つのリッスンプロセスと、接続されたクライアントごとに新しいFSMプロセスを作成するプロセスが含まれます。 gen_tcpモジュールを介したOTPでのTCP接続はサポートされていますが、OTPの原理に基づいて非ブロッキングTCPサーバーを作成するための標準的な動作はありません。 非ブロッキングサーバーとは、リスニングプロセスとFSMプロセスがブロッキング呼び出しを行わず、タイムアウトを発生させずに着信メッセージ(構成の変更、再起動など)にすばやく応答することを意味します。 Erlangコンテキストでのロックは、オペレーティングシステムプロセスではなく、Erlangプロセスのロックを意味することに注意してください。



このガイドでは、アプリケーションの動作を制御し、OTPの原則に完全に準拠するgen_serverおよびgen_fsmを使用して、ノンブロッキングTCPサーバーを作成する方法を示します。



OTPに詳しくない読者は、OTPを使用せずにブロッキングコールgen_tcp:connect / 3およびgen_tcp:acceept / 1を使用してフォールトトレラントサーバーを構築する方法に関するJoe Armstrongのガイドに注意を払うことをお勧めします。



サーバー構造



サーバーの設計には、 tcp_server_appメインアプリケーションスーパーバイザープロセスと再起動戦略one_for_oneおよび2つの子プロセスが含まれます。 1つ目は、クライアント接続に関する非同期通知を待機するgen_serverとして実装されたリスニングプロセスです。 2番目は別のtcp_client_supアプリケーションスーパーバイザーであり、クライアント要求を処理するためのFSMプロセスを開始し、標準SASLエラーレポートを使用して異常なシャットダウンを記録します。



この資料を簡単にするために、クライアント要求ハンドラー(tcp_echo_fsm)は、クライアント要求を返す「エコー」サーバーを提供します。



アプリケーションの動作とスーパーバイザー



アプリケーションを作成するには、「Supervisor」および「Application」ビヘイビアコールバック関数を実装するモジュールを作成する必要があります。 従来、これらの関数は簡潔さを考慮して別々のモジュールに実装されていますが、それらを1つに結合します。



追加ボーナスとして、 get_app_env関数を実装します。これは、構成パラメーターの処理方法と、起動時のエミュレーターのコマンドラインパラメーターを示します。



スーパーバイザー階層の2つのレベルには、 init / 1関数の2つのインスタンスが必要です。 2つの異なる再起動戦略が使用されるため、それらを異なるレベルで実装します。



アプリケーションの起動後、 tcp_server_app:start / 2コールバック関数は、 supervisor:start_link / 2関数を呼び出します。この関数は、 tcp_server_app:init([Port、Module])を呼び出してメインアプリケーションスーパーバイザーを作成します。 このスーパーバイザーは、 tcp_listenerプロセスと、クライアント接続の処理を担当するtcp_client_sup子スーパーバイザーを作成します。 init関数のModule引数は、FSMクライアント接続ハンドラーの名前です(この場合、 tcp_echo_fsm )。



TCPサーバーアプリケーション(tcp_server_app.erl):

-module(tcp_server_app). -author('saleyn@gmail.com'). -behaviour(application). %% Internal API -export([start_client/0]). %% Application and Supervisor callbacks -export([start/2, stop/1, init/1]). -define(MAX_RESTART, 5). -define(MAX_TIME, 60). -define(DEF_PORT, 2222). %% A startup function for spawning new client connection handling FSM. %% To be called by the TCP listener process. start_client() -> supervisor:start_child(tcp_client_sup, []). %%---------------------------------------------------------------------- %% Application behaviour callbacks %%---------------------------------------------------------------------- start(_Type, _Args) -> ListenPort = get_app_env(listen_port, ?DEF_PORT), supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]). stop(_S) -> ok. %%---------------------------------------------------------------------- %% Supervisor behaviour callbacks %%---------------------------------------------------------------------- init([Port, Module]) -> {ok, {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME}, [ % TCP Listener { tcp_server_sup, % Id = internal id {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A} permanent, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [tcp_listener] % Modules = [Module] | dynamic }, % Client instance supervisor { tcp_client_sup, {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]}, permanent, % Restart = permanent | transient | temporary infinity, % Shutdown = brutal_kill | int() >= 0 | infinity supervisor, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }; init([Module]) -> {ok, {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME}, [ % TCP Client { undefined, % Id = internal id {Module,start_link,[]}, % StartFun = {M, F, A} temporary, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }. %%---------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------- get_app_env(Opt, Default) -> case application:get_env(application:get_application(), Opt) of {ok, Val} -> Val; _ -> case init:get_argument(Opt) of [[Val | _]] -> Val; error -> Default end end.
      
      







リスニングプロセス



gen_tcpモジュールの欠点の1つは、接続をブロックするためだけのインターフェイスを提供することです。



prim_inetモジュールのテストは、クライアント接続を受け入れるためのネットワークドライバーへのコマンドが非同期であるという興味深い事実を示しました。 これは文書化されていません。つまり、OTPチームはいつでもこれを変更できますが、この機能を使用してサーバーを作成します。



リスニングプロセスはgen_serverとして実装されます

TCPリスナープロセス(tcp_listener.erl):

 -module(tcp_listener). -author('saleyn@gmail.com'). -behaviour(gen_server). %% External API -export([start_link/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { listener, % Listening socket acceptor, % Asynchronous acceptor's internal reference module % FSM handling module }). %%-------------------------------------------------------------------- %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason} % %% @doc Called by a supervisor to start the listening process. %% @end %%---------------------------------------------------------------------- start_link(Port, Module) when is_integer(Port), is_atom(Module) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []). %%%------------------------------------------------------------------------ %%% Callback functions from gen_server %%%------------------------------------------------------------------------ %%---------------------------------------------------------------------- %% @spec (Port::integer()) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% %% @doc Called by gen_server framework at process startup. %% Create listening socket. %% @end %%---------------------------------------------------------------------- init([Port, Module]) -> process_flag(trap_exit, true), Opts = [binary, {packet, 2}, {reuseaddr, true}, {keepalive, true}, {backlog, 30}, {active, false}], case gen_tcp:listen(Port, Opts) of {ok, Listen_socket} -> %%Create first accepting process {ok, Ref} = prim_inet:async_accept(Listen_socket, -1), {ok, #state{listener = Listen_socket, acceptor = Ref, module = Module}}; {error, Reason} -> {stop, Reason} end. %%------------------------------------------------------------------------- %% @spec (Request, From, State) -> {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | %% {stop, Reason, State} %% @doc Callback for synchronous server calls. If `{stop, ...}' tuple %% is returned, the server is stopped and `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_call(Request, _From, State) -> {stop, {unknown_call, Request}, State}. %%------------------------------------------------------------------------- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc Callback for asyncrous server calls. If `{stop, ...}' tuple %% is returned, the server is stopped and `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_cast(_Msg, State) -> {noreply, State}. %%------------------------------------------------------------------------- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc Callback for messages sent directly to server's mailbox. %% If `{stop, ...}' tuple is returned, the server is stopped and %% `terminate/2' is called. %% @end %% @private %%------------------------------------------------------------------------- handle_info({inet_async, ListSock, Ref, {ok, CliSocket}}, #state{listener=ListSock, acceptor=Ref, module=Module} = State) -> try case set_sockopt(ListSock, CliSocket) of ok -> ok; {error, Reason} -> exit({set_sockopt, Reason}) end, %% New client connected - spawn a new process using the simple_one_for_one %% supervisor. {ok, Pid} = tcp_server_app:start_client(), gen_tcp:controlling_process(CliSocket, Pid), %% Instruct the new FSM that it owns the socket. Module:set_socket(Pid, CliSocket), %% Signal the network driver that we are ready to accept another connection case prim_inet:async_accept(ListSock, -1) of {ok, NewRef} -> ok; {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)}) end, {noreply, State#state{acceptor=NewRef}} catch exit:Why -> error_logger:error_msg("Error in async accept: ~p.\n", [Why]), {stop, Why, State} end; handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) -> error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]), {stop, Error, State}; handle_info(_Info, State) -> {noreply, State}. %%------------------------------------------------------------------------- %% @spec (Reason, State) -> any %% @doc Callback executed on server shutdown. It is only invoked if %% `process_flag(trap_exit, true)' is set by the server process. %% The return value is ignored. %% @end %% @private %%------------------------------------------------------------------------- terminate(_Reason, State) -> gen_tcp:close(State#state.listener), ok. %%------------------------------------------------------------------------- %% @spec (OldVsn, State, Extra) -> {ok, NewState} %% @doc Convert process state when code is changed. %% @end %% @private %%------------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%------------------------------------------------------------------------ %%% Internal functions %%%------------------------------------------------------------------------ %% Taken from prim_inet. We are merely copying some socket options from the %% listening socket to the new client socket. set_sockopt(ListSock, CliSocket) -> true = inet_db:register_socket(CliSocket, inet_tcp), case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of {ok, Opts} -> case prim_inet:setopts(CliSocket, Opts) of ok -> ok; Error -> gen_tcp:close(CliSocket), Error end; Error -> gen_tcp:close(CliSocket), Error end.
      
      







このモジュールでは、 init / 1は2つのパラメーターを取ります-待機プロセスが開くポート番号とクライアント接続ハンドラーの名前。 初期化関数は、パッシブモードでソケットを開きます。 これは、クライアントから受信したデータの流れを制御できるようにするために行われます。



このコードの最も興味深い部分は、 prim_inet:async_accept / 2の呼び出しです。 これを機能させるには、ソケットの登録とクライアントソケットのオプションのコピーを処理するset_sockopt / 2関数からOTP内部コードの一部をコピーする必要があります。



クライアントソケットが接続されるとすぐに、ネットワークドライバーはメッセージ{inet_async、ListSock、Ref、{OK、CliSocket}}を使用してこれをリッスンプロセスに通知します。 現時点では、クライアントリクエストを処理してCliSocketに転送するプロセスのインスタンスを起動しています。



クライアントメッセージを処理するプロセス



tcp_listenerは一般的な実装ですが、 tcp_echo_fsmはTCPサーバーの作成方法を記述するためのFSMスタブにすぎません。 2つの関数をこのモジュールからエクスポートする必要があります。tcp_client_supスーパーバイザーの場合はstart_link / 0 、リスニングプロセスの場合はset_socket / 2です {active、true}オプション。



間違った(リスニング)プロセスへの転送によるメッセージ損失を回避するために、リスニングプロセスとクライアントプロセスの間で使用される同期パターンを強調したいと思います。 ソケットを所有するプロセスは、パッシブモードでソケットを開いたままにします。 次に、クライアントプロセスは、リスニングプロセスからオプション(パッシブモードを含む)を継承するソケットを受け取ります。 ソケットの所有権は、 gen_tcpを呼び出すことによってクライアントプロセスに転送されます。controling_process/ 2およびset_socket / 2は、ソケットからのメッセージの受信を開始できることをクライアントプロセスに通知します。 ソケットがアクティブモードに設定されるまで、受信したすべてのデータはソケットバッファに保存されます。



ソケットの所有権がWAIT_FOR_SOCKET状態のクライアントFSMプロセスに転送されると、 {active、once}モードが設定され、ネットワークドライバーが一度に1つのメッセージを送信できるようになります。 これは、データフローの制御を維持し、プロセスキューでのメッセージとTCPトラフィックの混合を回避するために使用されるOTP原則です。



FSM状態は、命名規則を使用するtcp_echo_fsmモジュールの特別な関数を使用して実装されます。 FSMは2つの状態で構成されています。 WAIT_FOR_SOCKETは、FSMがソケットの所有権を待機している初期状態です。WAIT_FOR_DATAは、クライアントからのTCPメッセージを待機している状態です。 この状態では、FSMは特別な「タイムアウト」メッセージも処理します。これは、クライアントからのアクティビティがないことを意味し、プロセスはクライアントとの接続を閉じます。



 -module(tcp_echo_fsm). -author('saleyn@gmail.com'). -behaviour(gen_fsm). -export([start_link/0, set_socket/2]). %% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). %% FSM States -export([ 'WAIT_FOR_SOCKET'/2, 'WAIT_FOR_DATA'/2 ]). -record(state, { socket, % client socket addr % client address }). -define(TIMEOUT, 120000). %%%------------------------------------------------------------------------ %%% API %%%------------------------------------------------------------------------ %%------------------------------------------------------------------------- %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error} %% @doc To be called by the supervisor in order to start the server. %% If init/1 fails with Reason, the function returns {error,Reason}. %% If init/1 returns {stop,Reason} or ignore, the process is %% terminated and the function returns {error,Reason} or ignore, %% respectively. %% @end %%------------------------------------------------------------------------- start_link() -> gen_fsm:start_link(?MODULE, [], []). set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) -> gen_fsm:send_event(Pid, {socket_ready, Socket}). %%%------------------------------------------------------------------------ %%% Callback functions from gen_server %%%------------------------------------------------------------------------ %%------------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | %% {stop, StopReason} %% @private %%------------------------------------------------------------------------- init([]) -> process_flag(trap_exit, true), {ok, 'WAIT_FOR_SOCKET', #state{}}. %%------------------------------------------------------------------------- %% Func: StateName/2 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) -> % Now we own the socket inet:setopts(Socket, [{active, once}, {packet, 2}, binary]), {ok, {IP, _Port}} = inet:peername(Socket), {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT}; 'WAIT_FOR_SOCKET'(Other, State) -> error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]), %% Allow to receive async messages {next_state, 'WAIT_FOR_SOCKET', State}. %% Notification event coming from client 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) -> ok = gen_tcp:send(S, Data), {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}; 'WAIT_FOR_DATA'(timeout, State) -> error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]), {stop, normal, State}; 'WAIT_FOR_DATA'(Data, State) -> io:format("~p Ignoring data: ~p\n", [self(), Data]), {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}. %%------------------------------------------------------------------------- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- handle_event(Event, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}. %%------------------------------------------------------------------------- %% Func: handle_sync_event/4 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %% @private %%------------------------------------------------------------------------- handle_sync_event(Event, _From, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}. %%------------------------------------------------------------------------- %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%------------------------------------------------------------------------- handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) -> % Flow control: enable forwarding of next TCP message inet:setopts(Socket, [{active, once}]), ?MODULE:StateName({data, Bin}, StateData); handle_info({tcp_closed, Socket}, _StateName, #state{socket=Socket, addr=Addr} = StateData) -> error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]), {stop, normal, StateData}; handle_info(_Info, StateName, StateData) -> {noreply, StateName, StateData}. %%------------------------------------------------------------------------- %% Func: terminate/3 %% Purpose: Shutdown the fsm %% Returns: any %% @private %%------------------------------------------------------------------------- terminate(_Reason, _StateName, #state{socket=Socket}) -> (catch gen_tcp:close(Socket)), ok. %%------------------------------------------------------------------------- %% Func: code_change/4 %% Purpose: Convert process state when code is changed %% Returns: {ok, NewState, NewStateData} %% @private %%------------------------------------------------------------------------- code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}.
      
      







アプリケーションの説明



OTPアプリケーションの作成に必要なもう1つの部分は、アプリケーション名、バージョン、開始モジュール、および環境変数を含む構成ファイルを作成することです。



アプリケーションファイル(tcp_server.app):

 {application, tcp_server, [ {description, "Demo TCP server"}, {vsn, "1.0"}, {id, "tcp_server"}, {modules, [tcp_listener, tcp_echo_fsm]}, {registered, [tcp_server_sup, tcp_listener]}, {applications, [kernel, stdlib]}, %% %% mod: Specify the module name to start the application, plus args %% {mod, {tcp_server_app, []}}, {env, []} ] }.
      
      







編集



このアプリケーション用に次のディレクトリ構造を作成します。



 ./tcp_server ./tcp_server/ebin/ ./tcp_server/ebin/tcp_server.app ./tcp_server/src/tcp_server_app.erl ./tcp_server/src/tcp_listener.erl ./tcp_server/src/tcp_echo_fsm.erl
      
      





 $ cd tcp_server/src $ for f in tcp*.erl ; do erlc +debug_info -o ../ebin $f
      
      





打ち上げ



SASLをサポートするErlangシェルを起動して、アプリケーションのプロセスとエラーレポートのステータスを確認できるようにします。 さらに、スーパーバイザーの階層を視覚的に調べるために、appmonアプリケーションを起動します。



 $ cd ../ebin $ erl -boot start_sasl ... 1> appmon:start(). {ok,<0.44.0>} 2> application:start(tcp_server). ok
      
      







appmonアプリケーションのtcp_serverボタンをクリックして、アプリケーションスーパーバイザーの階層を表示します。



 3> {ok,S} = gen_tcp:connect({127,0,0,1},2222,[{packet,2}]). {ok,#Port<0.150>}
      
      







新しいEchoサーバークライアント接続を開始しました。



 4> gen_tcp:send(S,<<"hello">>). ok 5> f(M), receive M -> M end. {tcp,#Port<0.150>,"hello"}
      
      







エコーサーバーが期待どおりに動作していることを確認しました。 次に、サーバーにクライアント接続を「配置」して、エラーメッセージの生成を見てみましょう。



 6> [{_,Pid,_,_}] = supervisor:which_children(tcp_client_sup). [{undefined,<0.64.0>,worker,[]}] 7> exit(Pid,kill). true =SUPERVISOR REPORT==== 31-Jul-2007::14:33:49 === Supervisor: {local,tcp_client_sup} Context: child_terminated Reason: killed Offender: [{pid,<0.77.0>}, {name,undefined}, {mfa,{tcp_echo_fsm,start_link,[]}}, {restart_type,temporary}, {shutdown,2000}, {child_type,worker}]
      
      







サーバーが多数の接続で負荷がかかっている場合、リスニングプロセスはオペレーティングシステムで設定された特定の制限を超えると新しい接続を受け入れない場合があることに注意してください。 この場合、エラーメッセージが表示されます。



 "too many open files"
      
      







おわりに



OTPは、ノンブロッキングTCPサーバーを構築するためのビルディングブロックを提供します。 このガイドでは、標準のOTP動作を使用して簡単なサーバーを作成する方法を示します。



All Articles