EventMachineプロキシデーモン

EventMachineは、高性能でスケーラブルなネットワークアプリケーションを作成するための非常に便利なフレームワークであるという事実にもかかわらず、インターネットはその使用とテストの例が豊富にないことに満足していません。 また、たとえばハブに存在するこれらの例は、データ転送の機能を考慮していないため、正しく機能しません(何らかの理由で、データが一般に部分的に送信されることを考慮していないため)。 実際、この記事は、たとえばRubyとEventMachineの記事でEMの基本原則に精通しており、その基礎に基づいてより複雑なものを書く方法と、結果として得られたコードをテストする方法を学びたい人を対象としています。



最近、彼らは私にテストタスクを送信しました。その本質は、EMでプロキシデーモンを書くことであり、Unixドメインソケットを介してクライアントからの接続を非同期に受け入れ、キューに入れ、これらのコマンドを抽象的なシステムに接続されたソケットにリダイレクトし、受信しますこのシステムからの応答は、それらをクライアントに送り返します。 クライアントメッセージの形式は{id:1、text: "req1"}です。これはサーバーによって応答に変換される必要があります-{id:1、text: "answ1"}。



タスク、クライアント、および抽象システムを簡素化するために、EMを使用してエミュレートしました。これにより、組み込みのデータ転送プロトコルEM :: P :: ObjectProtocolの使用が許可されました。これは、リアクターの1ティックではなくデータを取得できることを考慮し、送信用にデータをシリアル化します。



クライアントコードを書くことから始めましょう。 クライアントがプロキシデーモンとの接続を確立すると、 post_init関数が呼び出されます。この関数では、クライアントはsend_objectを使用してハッシュメッセージを送信します。

次に、デーモンからの応答を待ち、コンソールに表示します。 複数のクライアントの接続を一度にエミュレートする必要があるため、接続の数と既に応答を受信して​​デーモンから切断されたクライアントの数に関するデータを保存する変数が導入されました。 接続の合計数は定数TOTAL_CONNECTIONSに格納されます。これは、クライアントの起動時に設定されます。 クライアントがサーバーから切断されると、 バインド解除呼び出しが行われます。 すべてのクライアントが応答を受信すると、リアクターは停止します。



module EMClient<br/> include EM::P::ObjectProtocol<br/> <br/> @@connection_number = 0 <br/> @@dissconnected = 0 <br/> <br/> #send request as the connection has been established <br/> def post_init <br/> @@connection_number += 1 <br/> send_object({ 'id' => rand ( 10 ), 'text' => "req#{@@connection_number}" })<br/> end <br/> <br/> <br/> def receive_object (obj)<br/> #display response from server <br/> p obj.inspect<br/> end <br/> <br/> def unbind <br/> @@dissconnected += 1 <br/> #stop reactor after all requests have been processed <br/> EM.stop if @@dissconnected == TOTAL_CONNECTIONS<br/> end <br/> end <br/>







次は、抽象システムの動作をエミュレートするサーバーコードです。 そのタスクは、オブジェクトを受け取り、変換して送り返すことです。 EM.add_time()を使用すると、2番目の引数によって関数に渡されるコードブロック(この場合はデーモンからの要求への応答の実行を遅延させることができます。



module SocketServer<br/> include EM::P::ObjectProtocol<br/> <br/> def receive_object (obj)<br/> #emulation of job on server <br/> EM.add_timer( 1 + rand ( 5 )) do <br/> #validation of obj goes here)) <br/> obj[ 'text' ]. sub !(/req/, 'answ' )<br/> send_object(obj)<br/> end <br/> p "Server received object: #{obj.inspect}" <br/> end <br/> end <br/>







次に、さらに興味深い部分に移り、メッセージ用のメッセージキューと抽象サーバーを備えた接続プールを作成します。 このために、 EM :: Queueクラスが使用されました。このクラスには、 pop(* a、&b)push(* items)の 2つのメソッドがあり、キューにアイテムを追加してキューから取り出すことができます。 popメソッドは、要素がキューに表示されるときに実行されるコードのブロックとして最後の引数を取ります。



サーバーとの接続を確立するために、 EMConnectionモジュールが使用されました。このモジュールではsend_request(obj、&block)メソッドが定義され、その本質はサーバーメッセージを送信し、サーバーから応答を受信したときに実行されるブロックを送信することでした。



ConnectionPoolクラスは、接続プールの作成を担当します。 初期化されると、プールサイズが決定され、キューが初期化されます。 次に、指定された接続数がstart_queueメソッドに設定され、ワー​​カー( queue_worker_loop )が各接続に対して起動されます。これは、接続を引数としてとるprocです。 彼の仕事の本質は、キューから、サーバーに送信する必要があるオブジェクトを表す要素と、オブジェクトの受信後に実行する必要があるコードブロックを取得することです。 さらに、このコードブロックを実行した後、procはそれ自体を呼び出すため、一種の無限ループが取得されます。



module EMConnection<br/> include EM::P::ObjectProtocol<br/> <br/> def receive_object (obj)<br/> #calling callback on object receiving <br/> @callback. call (obj)<br/> end <br/> <br/> def send_request obj, &block<br/> #sending data to server and setting callback <br/> send_object obj<br/> @callback = block<br/> end <br/> <br/> end <br/> <br/> #simple connection pool using EM queue, default size 10 <br/> class ConnectionPool <br/> <br/> def initialize (conf)<br/> @pool_size = conf[:size] || 10 <br/> @connections = []<br/> @query_queue = EM:: Queue .new<br/> start_queue conf<br/> end <br/> <br/> def queue_worker_loop <br/> proc { |connection|<br/> @query_queue.pop do | request |<br/> connection. send_request ( request [:obj]) do |response|<br/> request [:callback].call response #if request[:callback] <br/> queue_worker_loop .call connection<br/> end <br/> end <br/> }<br/> end <br/> <br/> def start_queue (conf)<br/> @pool_size.times do <br/> connection = EM.connect( '0.0.0.0' , 8080 , EMConnection)<br/> @connections << connection<br/> queue_worker_loop .call connection<br/> end <br/> end <br/> <br/> def request (obj, &block)<br/> @query_queue.push :obj => obj, :callback => block<br/> end <br/> end <br/>







次に、プロキシデーモンを担当するコードに移りましょう。 その役割は、接続プールを初期化することです。もちろん、これはデーモンコードではなく行うことができますが、この方法では、プールは必要なときにのみ初期化されます。 オブジェクトをクライアントから受信すると、オブジェクトと接続プールのキューコードブロックを渡し、抽象サーバーから応答を受信すると、メッセージをクライアントに送り返し、 close_connection_after_writingメソッドを使用して接続を閉じます。すぐに接続を閉じます)。



module DaemonServer<br/> include EM::P::ObjectProtocol<br/> <br/> def post_init <br/> @@connections_pool ||= ConnectionPool. new (:size => 5 )<br/> end <br/> <br/> def receive_object (obj)<br/> @@connections_pool.request obj do |response|<br/> send_object(response)<br/> close_connection_after_writing<br/> end <br/> end <br/> end <br/>







次に、サーバー、クライアント、プロキシデーモンの起動を担当するスクリプトに移りましょう。

TCPSocketでサーバーを起動します。 ここではすべてが非常に簡単です。



EventMachine.run {<br/> EventMachine.start_server "127.0.0.1" , 8080 , SocketServer<br/>} <br/>









エミュレートされたクライアントの数を設定する機会を与える必要があるため、クライアントではもう少し複雑です。これは、スクリプトの実行時にパラメーターを渡すことで実装されます。 Unixドメインソケットでのクライアントの起動は、TCPソケットの場合とは異なります。 接続の代わりにconnect_unix_domainを呼び出し、関数の最初の引数としてIPアドレスとポート、ファイル名の代わりに渡します。



tc = ARGV[ 0 ].to_i<br/>TOTAL_CONNECTIONS = tc > 0 ? tc : 25 <br/> <br/>file = File .expand_path( '../tmp/daemon.sock' , __FILE__ )<br/> p "Starting #{TOTAL_CONNECTIONS} client(s)" <br/>EventMachine::run {<br/> TOTAL_CONNECTIONS.times{ EM.connect_unix_domain(file, EMClient) }<br/>} <br/>







プロキシが悪魔になるには、もちろん、悪魔化(キャプテン)する必要があります。 このために、 デーモン gemを使用しました。 スクリプトが-dスイッチで始まる場合、デーモンを停止できるように、ログとプロセスpidを含むファイルを保存する場所を決定する追加オプションを使用してDaemons.daemonizeメソッドを呼び出すと、 スクリプトが悪魔になります。



options = {<br/> :app_name => 'ProxyServer' ,<br/> :backtrace => true ,<br/> :log_output => true ,<br/> :dir_mode => :normal,<br/> :dir => File .expand_path( '../tmp' , __FILE__ )<br/>}<br/> <br/>file = File .expand_path( '../tmp/daemon.sock' , __FILE__ )<br/> File .unlink(file) if File .exists?(file)<br/> <br/>Daemons.daemonize(options) if ARGV.index( '-d' )<br/> <br/>EventMachine::run {<br/> EventMachine::start_unix_domain_server(file, DaemonServer)<br/>} <br/>







テストを行わないコードは、たとえそれが動作していてもほとんどコストがかからないと考えています。なぜなら、それを書いた人とそれを編集する必要がある人の両方に多くの問題と頭痛を引き起こす可能性があるからです。



EM- EMSpecに基づいて作成されたプログラムをテストするための既製のソリューションがあります。 しかし、私はそれを使用しなかったので、 rspecを使用してそれなしで行う方法を示します。

最初に、テストクライアントを作成する必要があります。 その中で、 send_request(obj、&block)メソッドを定義します。これにより、プロキシ要求をデーモンに送信し、クライアントが応答を受信したときに呼び出されるコードのブロックとしてコールバックを設定できます。 また、 onclose =(proc)メソッドを作成します。このメソッドは、接続が閉じられたときに呼び出されるコールバックを決定します。



module TestClient<br/> include EM::P::ObjectProtocol<br/> <br/> #on object received callback <br/> def receive_object (obj)<br/> @onresponse. call (obj)<br/> p "Client received object: #{obj.inspect}" <br/> end <br/> <br/> def send_request obj, &block<br/> @onresponse = block<br/> send_object obj<br/> end <br/> <br/> # on disconnect callback <br/> def onclose =( proc )<br/> @onclosed = proc <br/> end <br/> <br/> def unbind <br/> @onclosed.call<br/> end <br/> <br/> end <br/>







これで、記述されたコードをテストできるメソッドの作成に進むことができます。 最初のメソッドstart_servは、サーバー、テストクライアント、およびプロキシの起動を担当し、引数としてブロックを取ります。これにより、クライアント変数を自由に使用でき、クライアントを操作できます。 何かがうまくいかず、クライアントがサーバーから応答を受け取らない場合、 タイマーメソッドが必要です。rspecは、テストがフリーズしただけでなく失敗したことを示します。 テストの基本はserver_testメソッドです。このメソッドは上記のメソッドを使用してサーバー、クライアント、プロキシを起動し、接続が閉じられたときにリアクターを停止する必要があると判断し、クライアントからサーバーに送信される引数も取ります。



module HelperMethods<br/> def start_serv <br/> File .unlink(SOCK_FILE) if File .exists?(SOCK_FILE)<br/> EM.run {<br/> EventMachine.start_server "127.0.0.1" , 8080 , SocketServer<br/> EventMachine.start_unix_domain_server(SOCK_FILE, DaemonServer)<br/> client = EM.connect_unix_domain(SOCK_FILE, TestClient)<br/> yield client<br/> }<br/> end <br/> <br/> # if request takes to long it will show fail <br/> def timer start<br/> timeout = 6 <br/> EM.add_timer(timeout){<br/> ( Time .now-start).should be_within( 0 ).of(timeout)<br/> EM.stop<br/> }<br/> end <br/> <br/> #main wrapper for test starts server daemon and client <br/> def server_test request<br/> time = Time .now<br/> start_serv do |client|<br/> client.send_request request do |response|<br/> yield response<br/> end <br/> client.onclose= lambda {EM.stop}<br/> timer (time)<br/> end <br/> end <br/> end <br/>







例として、このタスクが正しく実行されることをテストで確認します。



describe "on sending test request" do <br/> include HelperMethods<br/> it "should responsend with right answer" do <br/> server_test({ 'id' => 0 , 'text' => "req1" }) do |response|<br/> response[ 'text' ].should == "answ1" <br/> response[ 'id' ].should == 0 <br/> end <br/> end <br/> end <br/>







実際、この記事が誰かに役立つことを願っています。 すべてのコードはgithubで入手できます。



PS最後まで読書をマスターしてくれた多くのテキストが見つかりました、ありがとう。



All Articles