Comet – 1,000,000ユーザーの負荷を持つMochiwebのアプリケーション。 パート3/3

このシリーズの記事のパート1パート2では、mochiwebを使用してアプリケーションを作成する方法と、接続ユーザーにメッセージを送信する方法を示しました。 接続ごとにメモリ消費を8 KBに削減しました。 c10kテストを繰り返しました。 スケジュールを作りました。 楽しかったですが、今は100万の接続ですべてを繰り返す時間です。



この記事の内容は次のとおりです。

•Mnesiaデータベースの使用。

•100万人のユーザーに対する「友人」という形式のもっともらしいデータセットの生成。

•Mnesiaを構成し、データを入力します。

•1台のマシンから100万の接続を発見。

•100万人のユーザーとの比較テスト。

•接続処理のためのLibevent +C。

•最終的な結論。



このテストの一部は、1台のテストマシンから1,000,000の接続を開く機能です。 1,000,000接続を受け入れることができるサーバーを作成する方が、実際に1,000,000接続を作成するよりも簡単です。 したがって、この記事のかなりの部分は、単一のマシンから1,000,000の接続を開くために使用される方法に関するものです。



Pubsubの発売。



パート2では、ルーターを使用して特定のユーザーにメッセージを送信しました。 これはチャット/ IMシステムには最適ですが、代わりにもっと便利なことができます。 大規模なテストを開始する前に、サブスクライバーデータベースという別のモジュールを追加しましょう。 友人に関するデータを含むリポジトリを作成し、友人リストの人々が生成したすべてのイベントを提供できるようにします。



私の意図は、これをLast.fmに使用することです。 これにより、友達が現在聴いている曲のリアルタイムチャンネルを取得できます。 これは、ソーシャルネットワークで生成された他のイベントにも同様に適用できます。 Flickrにアップロードされた写真、Facebook、Twitterなどのニュースフィードの要素 FriendFeedにはリアルタイムのベータ版APIもあるため、これは間違いなく関連しています。



サブスクリプションマネージャーの実装



シンプルなサブスクリプションマネージャーを実装しますが、自動的に人々をすべての友達にサブスクライブします。



API:

•add_subscriptions([{購読者、購読者}、...])

•remove_subscriptions([{購読者、購読者}、...])

•get_subscribers(ユーザー)



-module(subsmanager). -behaviour(gen_server). -include("/usr/local/lib/erlang/lib/stdlib-1.15.4/include/qlc.hrl"). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([add_subscriptions/1, remove_subscriptions/1, get_subscribers/1, first_run/0, stop/0, start_link/0]). -record(subscription, {subscriber, subscribee}). -record(state, {}). % state is all in mnesia -define(SERVER, global:whereis_name(?MODULE)). start_link() -> gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). stop() -> gen_server:call(?SERVER, {stop}). add_subscriptions(SubsList) -> gen_server:call(?SERVER, {add_subscriptions, SubsList}, infinity). remove_subscriptions(SubsList) -> gen_server:call(?SERVER, {remove_subscriptions, SubsList}, infinity). get_subscribers(User) -> gen_server:call(?SERVER, {get_subscribers, User}). %% init([]) -> ok = mnesia:start(), io:format("Waiting on mnesia tables..\n",[]), mnesia:wait_for_tables([subscription], 30000), Info = mnesia:table_info(subscription, all), io:format("OK. Subscription table info: \n~w\n\n",[Info]), {ok, #state{}}. handle_call({stop}, _From, State) -> {stop, stop, State}; handle_call({add_subscriptions, SubsList}, _From, State) -> % Transactionally is slower: % F = fun() -> % [ ok = mnesia:write(S) || S <- SubsList ] % end, % mnesia:transaction(F), [ mnesia:dirty_write(S) || S <- SubsList ], {reply, ok, State}; handle_call({remove_subscriptions, SubsList}, _From, State) -> F = fun() -> [ ok = mnesia:delete_object(S) || S <- SubsList ] end, mnesia:transaction(F), {reply, ok, State}; handle_call({get_subscribers, User}, From, State) -> F = fun() -> Subs = mnesia:dirty_match_object(#subscription{subscriber='_', subscribee=User}), Users = [Dude || #subscription{subscriber=Dude, subscribee=_} <- Subs], gen_server:reply(From, Users) end, spawn(F), {noreply, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Msg, State) -> {noreply, State}. terminate(_Reason, _State) -> mnesia:stop(), ok. code_change(_OldVersion, State, _Extra) -> io:format("Reloading code for ?MODULE\n",[]), {ok, State}. %% first_run() -> mnesia:create_schema([node()]), ok = mnesia:start(), Ret = mnesia:create_table(subscription, [ {disc_copies, [node()]}, {attributes, record_info(fields, subscription)}, {index, [subscribee]}, %index subscribee too {type, bag} ]), Ret.
      
      







注目すべき:

•絶対パスを使用してMnesiaに必要なqlc.hrlを含めました。 これは良くありませんが、別の方法で私は成功しませんでした。

•get_subscribersは別のプロセスを生成し、gen_server:replyを使用して同じプロセスに応答の作成を委任します。 これは、ルックアップを頻繁に呼び出す場合、gen_serverループがこの呼び出しでブロックしないことを意味します。

•rr(「subsmanager.erl」)。 以下の例では、erlシェルでレコード定義を使用できます。 records.hrlファイルに定義を入れてモジュールに含めるのが最良のスタイルです。 私は短くそうしました。



確認してください。 first_run()はMnesiaスキーマを作成するため、最初に呼び出すことが重要です。 mnesiaのもう1つの潜在的な不具合は、(デフォルトで)作成したノードのみがデータベースにアクセスできるため、シェルにerl名を付けることです。

 $ mkdir /var/mnesia $ erl -boot start_sasl -mnesia dir '"/var/mnesia_data"' -sname subsman (subsman@localhost)1> c(subsmanager). {ok,subsmanager} (subsman@localhost)2> subsmanager:first_run(). ... {atomic,ok} (subsman@localhost)3> subsmanager:start_link(). Waiting on mnesia tables.. OK. Subscription table info: [{access_mode,read_write},{active_replicas,[subsman@localhost]},{arity,3},{attributes,[subscriber,subscribee]},{checkpoints,[]},{commit_work,[{index,bag,[{3,{ram,57378}}]}]},{cookie,{{1224,800064,900003},subsman@localhost}},{cstruct,{cstruct,subscription,bag,[],[subsman@localhost],[],0,read_write,[3],[],false,subscription,[subscriber,subscribee],[],[],{{1224,863164,904753},subsman@localhost},{{2,0},[]}}},{disc_copies,[subsman@localhost]},{disc_only_copies,[]},{frag_properties,[]},{index,[3]},{load_by_force,false},{load_node,subsman@localhost},{load_order,0},{load_reason,{dumper,create_table}},{local_content,false},{master_nodes,[]},{memory,288},{ram_copies,[]},{record_name,subscription},{record_validation,{subscription,3,bag}},{type,bag},{size,0},{snmp,[]},{storage_type,disc_copies},{subscribers,[]},{user_properties,[]},{version,{{2,0},[]}},{where_to_commit,[{subsman@localhost,disc_copies}]},{where_to_read,subsman@localhost},{where_to_write,[subsman@localhost]},{wild_pattern,{subscription,'_','_'}},{{index,3},57378}] {ok,<0.105.0>} (subsman@localhost)4> rr("subsmanager.erl"). [state,subscription] (subsman@localhost)5> subsmanager:add_subscriptions([ #subscription{subscriber=alice, subscribee=rj} ]). ok (subsman@localhost)6> subsmanager:add_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]). ok (subsman@localhost)7> subsmanager:get_subscribers(rj). [bob,alice] (subsman@localhost)8> subsmanager:remove_subscriptions([ #subscription{subscriber=bob, subscribee=rj} ]). ok (subsman@localhost)8> subsmanager:get_subscribers(rj). [alice] (subsman@localhost)10> subsmanager:get_subscribers(charlie). []
      
      





整数IDを使用してユーザーを区別しますが、このテストではアトム(rj、alice、bob)を使用し、aliceとbobはrjのフレンドであると想定しました。 mnesia(およびets / dets)が使用したタイプを気にしないのは素晴らしいことです。Erlangの用語はすべて有効です。 これは、さまざまなタイプをサポートするための更新が難しくないことを意味します。



ルーターを変更する



特定のユーザー、つまりrouter:send(123、“ Hello user 123”)にメッセージをアドレス指定する代わりに、メッセージ(メッセージを生成した人)に「マーク」を付け、サインアップした各ユーザーにメッセージを送信するルーターがあります。 つまり、APIは次のように機能します。router:send(123、「こんにちは、ユーザー123にサブスクライブしている全員」)

 -module(router). -behaviour(gen_server). -export([start_link/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([send/2, login/2, logout/1]). -define(SERVER, global:whereis_name(?MODULE)). % will hold bidirectional mapping between id <–> pid -record(state, {pid2id, id2pid}). start_link() -> gen_server:start_link({global, ?MODULE}, ?MODULE, [], []). % sends Msg to anyone subscribed to Id send(Id, Msg) -> gen_server:call(?SERVER, {send, Id, Msg}). login(Id, Pid) when is_pid(Pid) -> gen_server:call(?SERVER, {login, Id, Pid}). logout(Pid) when is_pid(Pid) -> gen_server:call(?SERVER, {logout, Pid}). %% init([]) -> % set this so we can catch death of logged in pids: process_flag(trap_exit, true), % use ets for routing tables {ok, #state{ pid2id = ets:new(?MODULE, [bag]), id2pid = ets:new(?MODULE, [bag]) } }. handle_call({login, Id, Pid}, _From, State) when is_pid(Pid) -> ets:insert(State#state.pid2id, {Pid, Id}), ets:insert(State#state.id2pid, {Id, Pid}), link(Pid), % tell us if they exit, so we can log them out %io:format("~w logged in as ~w\n",[Pid, Id]), {reply, ok, State}; handle_call({logout, Pid}, _From, State) when is_pid(Pid) -> unlink(Pid), PidRows = ets:lookup(State#state.pid2id, Pid), case PidRows of [] -> ok; _ -> IdRows = [ {I,P} || {P,I} <- PidRows ], % invert tuples ets:delete(State#state.pid2id, Pid), % delete all pid->id entries [ ets:delete_object(State#state.id2pid, Obj) || Obj <- IdRows ] % and all id->pid end, %io:format("pid ~w logged out\n",[Pid]), {reply, ok, State}; handle_call({send, Id, Msg}, From, State) -> F = fun() -> % get users who are subscribed to Id: Users = subsmanager:get_subscribers(Id), io:format("Subscribers of ~w = ~w\n",[Id, Users]), % get pids of anyone logged in from Users list: Pids0 = lists:map( fun(U)-> [ P || { _I, P } <- ets:lookup(State#state.id2pid, U) ] end, [ Id | Users ] % we are always subscribed to ourselves ), Pids = lists:flatten(Pids0), io:format("Pids: ~w\n", [Pids]), % send Msg to them all M = {router_msg, Msg}, [ Pid ! M || Pid <- Pids ], % respond with how many users saw the message gen_server:reply(From, {ok, length(Pids)}) end, spawn(F), {noreply, State}. % handle death and cleanup of logged in processes handle_info(Info, State) -> case Info of {'EXIT', Pid, _Why} -> handle_call({logout, Pid}, blah, State); Wtf -> io:format("Caught unhandled message: ~w\n", [Wtf]) end, {noreply, State}. handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}.
      
      







ちょっとしたテスト-idではなくatomを使用しました。

 (subsman@localhost)1> c(subsmanager), c(router), rr("subsmanager.erl"). (subsman@localhost)2> subsmanager:start_link(). (subsman@localhost)3> router:start_link(). (subsman@localhost)4> Subs = [#subscription{subscriber=alice, subscribee=rj}, #subscription{subscriber=bob, subscribee=rj}]. [#subscription{subscriber = alice,subscribee = rj}, #subscription{subscriber = bob,subscribee = rj}] (subsman@localhost)5> subsmanager:add_subscriptions(Subs). ok (subsman@localhost)6> router:send(rj, "RJ did something"). Subscribers of rj = [bob,alice] Pids: [] {ok,0} (subsman@localhost)7> router:login(alice, self()). ok (subsman@localhost)8> router:send(rj, "RJ did something"). Subscribers of rj = [bob,alice] Pids: [<0.46.0>] {ok,1} (subsman@localhost)9> receive {router_msg, M} -> io:format("~s\n",[M]) end. RJ did something ok
      
      





アリスが直接アリスに送信されていなくても、サブスクライブしている相手がメッセージを送信すると、アリスはメッセージを受信できることがわかります。 結論は、ルータが可能なターゲットを[alice、bob]として識別したが、bobが許可されなかったため、メッセージを1人のaliceに渡したことを示しています。



簡単なデータセットの生成



多くの関係をランダムに生成できますが、これは特に現実的ではありません。 ソーシャルネットワークには通常、いくつかの超人気のユーザー(一部のTwitterユーザーは100,000人以上のフォロワーがいます)と、少数の友人しかいない多くの人々がいます。

データセットを生成するために、優れたigraphライブラリのPythonモジュールを使用しました。

 import igraph g = igraph.Graph.Barabasi(1000000, 15, directed=False) print "Edges: " + str(g.ecount()) + " Verticies: " + str(g.vcount()) g.write_edgelist("fakefriends.txt")
      
      







Mnesiaへのデータのアップロード



この小さなモジュールはfakefriends.txtファイルを読み取り、サブスクリプションのリストを作成します。

 -module(readfriends). -export([load/1]). -record(subscription, {subscriber, subscribee}). load(Filename) -> for_each_line_in_file(Filename, fun(Line, Acc) -> [As, Bs] = string:tokens(string:strip(Line, right, $\n), " "), {A, _} = string:to_integer(As), {B, _} = string:to_integer(Bs), [ #subscription{subscriber=A, subscribee=B} | Acc ] end, [read], []). % via: http://www.trapexit.org/Reading_Lines_from_a_File for_each_line_in_file(Name, Proc, Mode, Accum0) -> {ok, Device} = file:open(Name, Mode), for_each_line(Device, Proc, Accum0). for_each_line(Device, Proc, Accum) -> case io:get_line(Device, "") of eof -> file:close(Device), Accum; Line -> NewAccum = Proc(Line, Accum), for_each_line(Device, Proc, NewAccum) end.
      
      





これで、subsmanagerシェルでテキストファイルから読み取り、サブスクリプションを追加できます。

 $ erl -name router@minifeeds4.gs2 +K true +A 128 -setcookie secretcookie -mnesia dump_log_write_threshold 50000 -mnesia dc_dump_limit 40 erl> c(readfriends), c(subsmanager). erl> subsmanager:first_run(). erl> subsmanager:start_link(). erl> subsmanager:add_subscriptions( readfriends:load("fakefriends.txt") ).
      
      





追加のパラメーターに注意してください-「**警告** Mnesia is overloaded」メッセージを回避するのに役立ちます。 Mnesiaのドキュメントには、他にも多くの注目すべき設定が含まれています。



1,000,000



1つのホストから100万tcp接続を作成するのは簡単ではありません。 多数の化合物をモデル化するために専用の小さなクラスターで定期的にこれを行う人は、おそらくTsungのような実際のツールを使用していると感じています。 パート1の構成を使用しても、ハードポートの制限が発生します。 TCP接続を作成するとき、クライアントポートは/ proc / sys / net / ipv4 / ip_local_port_rangeの範囲から割り当てられます。 手動で設定するか、自動ポートを使用するかは関係ありません。 パート1では、範囲を「1024 65535」に設定しました。 65535-1024 = 64511の非特権の利用可能なポートがあります。 それらの一部は他のプロセスで使用されますが、ポートが不足するため、64511クライアントを超えることはありません。

ローカルポート範囲はIPに接続されているため、さまざまなローカルIPアドレスから発信接続を行うと、64511を超える発信接続を開くことができます。



17個の新しいIPアドレスを作成して、それぞれから62,000の接続を作成しましょう。これにより、合計1,054,000の接続が得られます。

 $ for i in `seq 1 17`; do echo sudo ifconfig eth0:$i 10.0.0.$i up ; done
      
      





ここでifconfigを確認すると、仮想インターフェイスが表示されます:eth0:1、eth0:2 ... eth0:17、それぞれ異なるIPアドレス。



今残っているのは、パート1のフラッドテストを変更してローカルIPを選択することだけです。 残念ながら、erlang httpクライアントでは、ソースIPを特定できません。



この時点で、別の可能性を検討していました。17のIPペア(サーバー上とクライアント上)を使用することです。各ペアは独自の隔離された/ 30サブネットにあります。 その後、クライアントに任意のIPサーバーへの接続を強制すると、ローカルIPの1つだけが実際にこのサブネット上のIPサーバーに到達できるため、ローカルアドレスが2番目のペアになります。 理論的には、これはクライアントマシン上のローカルソースIPは不要であることを宣言することを意味します(ただし、サーバーのIPアドレス範囲を定義する必要があります)。 これが機能するかどうかはわかりません-当時はそうだったようです。 結局、私はこれがあまりにもひどすぎると決めた。



gen_tcpでは送信元アドレスを指定できるため、最終的にはrawクライアントを使用しました。

 -module(floodtest2). -compile(export_all). -define(SERVERADDR, "10.1.2.3"). % where mochiweb is running -define(SERVERPORT, 8000). % Generate the config in bash like so (chose some available address space): % EACH=62000; for i in `seq 1 17`; do echo "{{10,0,0,$i}, $((($i-1)*$EACH+1)), $(($i*$EACH))}, "; done run(Interval) -> Config = [ {{10,0,0,1}, 1, 62000}, {{10,0,0,2}, 62001, 124000}, {{10,0,0,3}, 124001, 186000}, {{10,0,0,4}, 186001, 248000}, {{10,0,0,5}, 248001, 310000}, {{10,0,0,6}, 310001, 372000}, {{10,0,0,7}, 372001, 434000}, {{10,0,0,8}, 434001, 496000}, {{10,0,0,9}, 496001, 558000}, {{10,0,0,10}, 558001, 620000}, {{10,0,0,11}, 620001, 682000}, {{10,0,0,12}, 682001, 744000}, {{10,0,0,13}, 744001, 806000}, {{10,0,0,14}, 806001, 868000}, {{10,0,0,15}, 868001, 930000}, {{10,0,0,16}, 930001, 992000}, {{10,0,0,17}, 992001, 1054000}], start(Config, Interval). start(Config, Interval) -> Monitor = monitor(), AdjustedInterval = Interval / length(Config), [ spawn(fun start/5, [Lower, Upper, Ip, AdjustedInterval, Monitor]) || {Ip, Lower, Upper} <- Config ], ok. start(LowerID, UpperID, _, _, _) when LowerID == UpperID -> done; start(LowerID, UpperID, LocalIP, Interval, Monitor) -> spawn(fun connect/5, [?SERVERADDR, ?SERVERPORT, LocalIP, "/test/"++LowerID, Monitor]), receive after Interval -> start(LowerID + 1, UpperID, LocalIP, Interval, Monitor) end. connect(ServerAddr, ServerPort, ClientIP, Path, Monitor) -> Opts = [binary, {packet, 0}, {ip, ClientIP}, {reuseaddr, true}, {active, false}], {ok, Sock} = gen_tcp:connect(ServerAddr, ServerPort, Opts), Monitor ! open, ReqL = io_lib:format("GET ~s\r\nHost: ~s\r\n\r\n", [Path, ServerAddr]), Req = list_to_binary(ReqL), ok = gen_tcp:send(Sock, [Req]), do_recv(Sock, Monitor), (catch gen_tcp:close(Sock)), ok. do_recv(Sock, Monitor)-> case gen_tcp:recv(Sock, 0) of {ok, B} -> Monitor ! {bytes, size(B)}, io:format("Recvd ~s\n", [ binary_to_list(B)]), io:format("Recvd ~w bytes\n", [size(B)]), do_recv(Sock, Monitor); {error, closed} -> Monitor ! closed, closed; Other -> Monitor ! closed, io:format("Other:~w\n",[Other]) end. % Monitor process receives stats and reports how much data we received etc: monitor() -> Pid = spawn(?MODULE, monitor0, [{0,0,0,0}]), timer:send_interval(10000, Pid, report), Pid. monitor0({Open, Closed, Chunks, Bytes}=S) -> receive report -> io:format("{Open, Closed, Chunks, Bytes} = ~w\n",[S]); open -> monitor0({Open + 1, Closed, Chunks, Bytes}); closed -> monitor0({Open, Closed + 1, Chunks, Bytes}); chunk -> monitor0({Open, Closed, Chunks + 1, Bytes}); {bytes, B} -> monitor0({Open, Closed, Chunks, Bytes + B}) end.
      
      





最初に、パート1からmochiwebアプリケーションに接続しました-10秒ごとに1つのメッセージを各クライアントに送信するだけです。

 erl> c(floodtest2), floodtest2:run(20).
      
      







それはすぐに私のすべての記憶を食べました



gen_tcpで多数の接続を開くと、大量のメモリが削除されることがわかりました。 これを機能させるには〜36GBかかると思います。 私は自分のerlang httpクライアントを最適化することに興味がありませんでした。32GB以上のメモリを搭載できるマシンはデータベースの1つだけでした。



この時点で、HTTP APIを備えた試行済みのlibeventを思い出すことにしました。 新しいバージョンには、http APIのevhttp_connection_set_local_address関数もあります。



libeventを使用するC httpクライアントは次のとおりです。

 #include <sys/types.h> #include <sys/time.h> #include <sys/queue.h> #include <stdlib.h> #include <err.h> #include <event.h> #include <evhttp.h> #include <unistd.h> #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include <time.h> #include <pthread.h> #define BUFSIZE 4096 #define NUMCONNS 62000 #define SERVERADDR "10.103.1.43" #define SERVERPORT 8000 #define SLEEP_MS 10 char buf[BUFSIZE]; int bytes_recvd = 0; int chunks_recvd = 0; int closed = 0; int connected = 0; // called per chunk received void chunkcb(struct evhttp_request * req, void * arg) { int s = evbuffer_remove( req->input_buffer, &buf, BUFSIZE ); //printf("Read %d bytes: %s\n", s, &buf); bytes_recvd += s; chunks_recvd++; if(connected >= NUMCONNS && chunks_recvd%10000==0) printf(">Chunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed); } // gets called when request completes void reqcb(struct evhttp_request * req, void * arg) { closed++; } int main(int argc, char **argv) { event_init(); struct evhttp *evhttp_connection; struct evhttp_request *evhttp_request; char addr[16]; char path[32]; // eg: "/test/123" int i,octet; for(octet=1; octet<=17; octet++){ sprintf(&addr, "10.224.0.%d", octet); for(i=1;i<=NUMCONNS;i++) { evhttp_connection = evhttp_connection_new(SERVERADDR, SERVERPORT); evhttp_connection_set_local_address(evhttp_connection, &addr); evhttp_set_timeout(evhttp_connection, 864000); // 10 day timeout evhttp_request = evhttp_request_new(reqcb, NULL); evhttp_request->chunk_cb = chunkcb; sprintf(&path, "/test/%d", ++connected); if(i%100==0) printf("Req: %s\t->\t%s\n", addr, &path); evhttp_make_request( evhttp_connection, evhttp_request, EVHTTP_REQ_GET, path ); evhttp_connection_set_timeout(evhttp_request->evcon, 864000); event_loop( EVLOOP_NONBLOCK ); if( connected % 200 == 0 ) printf("\nChunks: %d\tBytes: %d\tClosed: %d\n", chunks_recvd, bytes_recvd, closed); usleep(SLEEP_MS*1000); } } event_dispatch(); return 0; }
      
      





ほとんどのパラメータは#defineとしてハードコードされているため、編集して再コンパイルできます。

 $ gcc -o httpclient httpclient.c -levent $ ./httpclient
      
      







64,500を超えるポートを開くことはできません。



64,500以上の接続を開くには、ローカルアドレスとローカルポートを自分で決定し、それに応じて管理する必要があります。



残念ながら、libevent HTTP APIにはローカルポートを指定するオプションがありません。 libeventを修正して、次のような関数を追加しました。

 void evhttp_connection_set_local_port(struct evhttp_connection *evcon, u_short port);.
      
      





それは驚くほど楽しい経験でした。libeventはよく書かれているようで、ドキュメントはまともです。



この変更されたlibeventを使用して、上記のコードに以下を追加できました。

 evhttp_connection_set_local_port(evhttp_connection, 1024+i);
      
      





異なるアドレスからの複数の接続が、ローカルアドレスに定義された同じローカルポート番号を使用できるようになりました。 クライアントを再コンパイルし、しばらく動作させて、バリアを通過するようにしました。



Netstatはこれを確認します。

 # netstat -n | awk '/^tcp/ {t[$NF]++}END{for(state in t){print state, t[state]}}' TIME_WAIT 8 ESTABLISHED 118222
      
      





これは、さまざまな状態で開いているポートの数を示しています。 最終的に2 ^ 16を超える接続を開くことができました。



これで、1台のコンピューターから100万のhttp接続を開くことができるツールができました。 各接続に約2 KBを使用し、さらにカーネルが占有しているものを使用しているようです。 mochiwebサーバーをテストします。



C1024K



このテストでは、4つの異なるサーバーを使用しました。 このテストと以前のテストの主な違いは、Cで記述されたクライアントの変更です。

サーバー1-クアッドコア2GHz CPU、16GBのRAM

•サブマネージャーの起動

•データの読み込み

•ルーターの起動

サーバー2-デュアルクアッドコア2.8GHz CPU、32GBのRAM

•Mochiwebアプリケーションを起動する

サーバー3-クアッドコア2GHz CPU、16GBのRAM

•17個の仮想IPアドレスの作成

•libeventのインストール

•クライアントの起動:./httpclient(1秒あたり100接続)

サーバー4-デュアルコア2GHz、2GB RAM

•msggenを実行して大量のメッセージを送信する



接続を開いているときとしばらくの間のメモリ使用量:



HttpClientには、接続間に10ミリ秒の遅延が組み込まれているため、100万の接続を開くのに約3時間かかりました。 約25GBのメモリが必要でした。 Gangliaから見ると、私のサーバーは次のようになります。



約38GBを要し、スワッピングが開始されることがわかります。 違いは主にコア消費にあると思います。



メッセージは1000プロセスを使用して生成され、メッセージ間の平均時間はプロセスごとに約60ミリ秒で、1秒あたり約16666メッセージを提供しました。

 erl> [ spawn( fun()->msggen:start(1000000, 10+random:uniform(100), 1000000) end) || I <- lists:seq(1,1000) ].
      
      





Gangliaのサーバー4:



1秒あたり約10 MB-16.666メッセージ。



メッセージの送信を開始したとき、最初のサーバーの負荷は低いままでした。 2番目のサーバーのCPU消費が増加しました。



当然、なぜなら プロセスはメッセージを処理するためにhibernate()を終了し、メモリ使用量がわずかに増加します。 メッセージなしですべての接続を開いておくと、メモリ使用量が最適になります。 当然のことながら、どのアクションでもより多くのメモリが必要です。



では、メモリはどこに流れますか? Mochiwebでは、1,000,000のアクティブな接続を開いたままにするために40 GBのRAMが必要です。 負荷がかかると、mochiwebは最大30GBのメモリを使用し、残りの10GBはコアを使用します。 つまり、接続ごとに約40Kbが必要です。



多数の接続があるさまざまなテストで、sysctl.confにいくつかの追加変更を加えました。 私は試行錯誤を繰り返してこの問題に取り組みましたが、どのような値を変更すべきかわかりません。 私のポリシーは、/ var / log / kern.logをチェックして、ミステリアスなエラーが私に伝えることを確認するエラーを待つことでした。 設定は次のとおりです。

 net.core.rmem_max = 33554432 net.core.wmem_max = 33554432 net.ipv4.tcp_rmem = 4096 16384 33554432 net.ipv4.tcp_wmem = 4096 16384 33554432 net.ipv4.tcp_mem = 786432 1048576 26777216 net.ipv4.tcp_max_tw_buckets = 360000 net.core.netdev_max_backlog = 2500 vm.min_free_kbytes = 65536 vm.swappiness = 0 net.ipv4.ip_local_port_range = 1024 65535
      
      





TCP Linux, . , , , , 1 000 000 .



Erlang Libevent



HTTP API libevent, libevent HTTPd, C.



erlang, C — HTTP .



Libevent HTTP API, HTTP . , Erlang. HTTP libevent, Id ( mochiweb ), Erlang -.



Erlang , {123, <<«Hello user 123»>>}, “Hello user 123″ 123, . , , .

 #include <sys/types.h> #include <sys/time.h> #include <sys/queue.h> #include <stdlib.h> #include <err.h> #include <event.h> #include <evhttp.h> #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include "erl_interface.h" #include "ei.h" #include <pthread.h> #define BUFSIZE 1024 #define MAXUSERS (17*65536) // C1024K // List of current http requests by uid: struct evhttp_request * clients[MAXUSERS+1]; // Memory to store uids passed to the cleanup callback: int slots[MAXUSERS+1]; // called when user disconnects void cleanup(struct evhttp_connection *evcon, void *arg) { int *uidp = (int *) arg; fprintf(stderr, "disconnected uid %d\n", *uidp); clients[*uidp] = NULL; } // handles http connections, sets them up for chunked transfer, // extracts the user id and registers in the global connection table, // also sends a welcome chunk. void request_handler(struct evhttp_request *req, void *arg) { struct evbuffer *buf; buf = evbuffer_new(); if (buf == NULL){ err(1, "failed to create response buffer"); } evhttp_add_header(req->output_headers, "Content-Type", "text/html; charset=utf-8"); int uid = -1; if(strncmp(evhttp_request_uri(req), "/test/", 6) == 0){ uid = atoi( 6+evhttp_request_uri(req) ); } if(uid <= 0){ evbuffer_add_printf(buf, "User id not found, try /test/123 instead"); evhttp_send_reply(req, HTTP_NOTFOUND, "Not Found", buf); evbuffer_free(buf); return; } if(uid > MAXUSERS){ evbuffer_add_printf(buf, "Max uid allowed is %d", MAXUSERS); evhttp_send_reply(req, HTTP_SERVUNAVAIL, "We ran out of numbers", buf); evbuffer_free(buf); return; } evhttp_send_reply_start(req, HTTP_OK, "OK"); // Send welcome chunk: evbuffer_add_printf(buf, "Welcome, Url: '%s' Id: %d\n", evhttp_request_uri(req), uid); evhttp_send_reply_chunk(req, buf); evbuffer_free(buf); // put reference into global uid->connection table: clients[uid] = req; // set close callback evhttp_connection_set_closecb( req->evcon, cleanup, &slots[uid] ); } // runs in a thread – the erlang c-node stuff // expects msgs like {uid, msg} and sends aa 'msg' chunk to uid if connected void cnode_run() { int fd; /* fd to Erlang node */ int got; /* Result of receive */ unsigned char buf[BUFSIZE]; /* Buffer for incoming message */ ErlMessage emsg; /* Incoming message */ ETERM *uid, *msg; erl_init(NULL, 0); if (erl_connect_init(1, "secretcookie", 0) == -1) erl_err_quit("erl_connect_init"); if ((fd = erl_connect("httpdmaster@localhost")) < 0) erl_err_quit("erl_connect"); fprintf(stderr, "Connected to httpdmaster@localhost\n\r"); struct evbuffer *evbuf; while (1) { got = erl_receive_msg(fd, buf, BUFSIZE, &emsg); if (got == ERL_TICK) { continue; } else if (got == ERL_ERROR) { fprintf(stderr, "ERL_ERROR from erl_receive_msg.\n"); break; } else { if (emsg.type == ERL_REG_SEND) { // get uid and body data from eg: {123, <<"Hello">>} uid = erl_element(1, emsg.msg); msg = erl_element(2, emsg.msg); int userid = ERL_INT_VALUE(uid); char *body = (char *) ERL_BIN_PTR(msg); int body_len = ERL_BIN_SIZE(msg); // Is this userid connected? if(clients[userid]){ fprintf(stderr, "Sending %d bytes to uid %d\n", body_len, userid); evbuf = evbuffer_new(); evbuffer_add(evbuf, (const void*)body, (size_t) body_len); evhttp_send_reply_chunk(clients[userid], evbuf); evbuffer_free(evbuf); }else{ fprintf(stderr, "Discarding %d bytes to uid %d – user not connected\n", body_len, userid); // noop } erl_free_term(emsg.msg); erl_free_term(uid); erl_free_term(msg); } } } // if we got here, erlang connection died. // this thread is supposed to run forever // TODO – gracefully handle failure / reconnect / etc pthread_exit(0); } int main(int argc, char **argv) { // Launch the thread that runs the cnode: pthread_attr_t tattr; pthread_t helper; int status; pthread_create(&helper, NULL, cnode_run, NULL); int i; for(i=0;i<=MAXUSERS;i++) slots[i]=i; // Launch libevent httpd: struct evhttp *httpd; event_init(); httpd = evhttp_start("0.0.0.0", 8000); evhttp_set_gencb(httpd, request_handler, NULL); event_dispatch(); // Not reached, event_dispatch() shouldn't return evhttp_free(httpd); return 0; }
      
      





#define, 8000 . Erlang cookie.



, :

 $ erl -setcookie secretcookie -sname httpdmaster@localhost
      
      







-:

 $ gcc -o httpdcnode httpdcnode.c -lerl_interface -lei -levent $ ./httpdcnode
      
      





, :

 erl> nodes(hidden). [c1@localhost]
      
      





localhost :8000/test/123. .

-:

 erl> {any, c1@localhost} ! {123, <<"Hello Libevent World">>}.
      
      





, Pid — {procname, node}. «any», , C-.



Erlang, libevent , Erlang.



, 1 000 000 httpdcnode , , . , 10 .



2GB:





2 .



:

 Mem: 32968672k total, 9636488k used, 23332184k free, 180k buffers
      
      





kernel/ TCP 8 , , .



libevent-cnode . , “race conditions”, , .



, Erlang , C + libevent . C C-, Erlang API. .







, , Last.fm. 40 — , 40GB . 10GB .



All Articles