「Boost.Asio C ++ネットワークプログラミング。」 第2章:Boost.Asioの基本 パート2

みなさんこんにちは!

John Torjoの本Boost.Asio C ++ Network Programmingの翻訳を続けています。 第2章のこの部分では、非同期プログラミングについて説明します。



内容:





このセクションでは、非同期プログラミングで作業するときに発生する問題のいくつかについて詳しく説明します。 一度読んだ後、これらの概念の理解を深めるために、この本を読み進めながら、それに戻ることをお勧めします。





非同期で作業する必要性



前述したように、原則として、同期プログラミングは非同期プログラミングよりもはるかに簡単です。 線形に考えるのがはるかに簡単なので(関数Aを呼び出し、終了後にハンドラーを呼び出し、関数Bを呼び出し、終了後にハンドラーを呼び出す、など、イベントハンドラーのように考えることができます)。 後者の場合、たとえば、5つのイベントを実行できますが、実行される順序を見つけることはできません。また、それらがすべて満たされるかどうかもわかりません。

しかし、非同期プログラミングはより複雑ですが、たとえば、多数のクライアントを同時に処理する必要があるサーバーを作成する場合は、おそらくそれを好むでしょう。 クライアントが多いほど、非同期プログラミングは同期プログラミングに比べて簡単になります。

1000のクライアントを同時に処理するアプリケーションがあり、クライアントからサーバーへの、およびサーバーからクライアントへの各メッセージが文字「\ n」で終わるとします。

同期コード、1スレッド:



using namespace boost::asio; struct client { ip::tcp::socket sock; char buff[1024]; // each msg is at maximum this size int already_read; // how much have we already read? }; std::vector<client> clients; void handle_clients() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) on_read(clients[i]); } void on_read(client & c) { int to_read = std::min( 1024 - c.already_read, c.sock. available()); c.sock.read_some( buffer(c.buff + c.already_read, to_read)); c.already_read += to_read; if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) { int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff; std::string msg(c.buff, c.buff + pos); std::copy(c.buff + pos, c.buff + 1024, c.buff); c.already_read -= pos; on_read_msg(c, msg); } } void on_read_msg(client & c, const std::string & msg) { // analyze message, and write back if ( msg == "request_login") c.sock.write( "request_ok\n"); else if ... }
      
      





サーバー(および基本的にはすべてのネットワークアプリケーション)を作成するときに避けたいことの1つは、コードが応答を停止することです。 この場合、 handle_clients()



関数をできるだけブロックしないようにします。 機能がいずれかの時点でブロックされている場合、クライアントからのすべての着信メッセージは、機能がロック解除されるまで待機し、処理を開始します。

応答性を維持するために、データがあるif ( clients[i].sock.available() ) on_read(clients[i])



if ( clients[i].sock.available() ) on_read(clients[i])



場合にのみソケットから読み取りif ( clients[i].sock.available() ) on_read(clients[i])



。 on_readでは、利用可能な範囲でのみ読み取ります。 read_until(c.sock, buffer(...),'\n')



を呼び出すことは、特定のクライアントから最後までメッセージを読み取るまでブロックされるため、良いアイデアではありません(これがいつ起こるかわかりません) 。

ここでのボトルネックはon_read_msg()



関数です。 この関数が実行されるまで、すべての着信メッセージは一時停止されます。 うまく書かれた関数on_read_msg()



はこれが起こらないことを保証しますが、起こる可能性があります(例えば、バッファがいっぱいの場合、ソケットへの書き込みがブロックされることがあります)。

同期コード、10スレッド:



 using namespace boost::asio; struct client { // ... same as before bool set_reading() { boost::mutex::scoped_lock lk(cs_); if ( is_reading_) return false; // already reading else { is_reading_ = true; return true; } } void unset_reading() { boost::mutex::scoped_lock lk(cs_); is_reading_ = false; } private: boost::mutex cs_; bool is_reading_; }; std::vector<client> clients; void handle_clients() { for ( int i = 0; i < 10; ++i) boost::thread( handle_clients_thread); } void handle_clients_thread() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) if ( clients[i].set_reading()) { on_read(clients[i]); clients[i].unset_reading(); } } void on_read(client & c) { // same as before } void on_read_msg(client & c, const std::string & msg) { // same as before }
      
      





複数のスレッドを使用するには、それらを同期する必要があります。これはset_reading



()およびset_unreading()



関数が行うことです。 set_reading()



関数は非常に重要です。 「読むことができ、読むことができるかどうかを確認する」ことは、1つのステップで行われます。 2つのステップ(「読み取り可能かどうかを確認する」と「読み取りを開始する」)で行う場合、2つのスレッドを開始できます。1つはクライアントの読み取りを確認し、もう1つは同じクライアントのon_read



関数を呼び出します。長い目で見れば、これはデータの破損につながり、場合によってはシステムのクラッシュにさえつながります。

コードがより複雑になっていることに気付くでしょう。

3番目のオプションは、同期コード、つまりクライアントごとに1つのスレッドを持つことも可能です。 しかし、同時顧客の数が増えると、これは主に許容できない操作になります。

次に、非同期オプションについて検討します。 私たちは常に非同期の読み取り操作を行いました。 クライアントが要求を行うと、 on_read



操作がon_read



、応答し、次の要求が到着するまで待機します(別の非同期読み取り操作を開始します)。

非同期コード、10スレッド:



 using namespace boost::asio; io_service service; struct client { ip::tcp::socket sock; streambuf buff; // reads the answer from the client } std::vector<client> clients; void handle_clients() { for ( int i = 0; i < clients.size(); ++i) async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2)); for ( int i = 0; i < 10; ++i) boost::thread(handle_clients_thread); } void handle_clients_thread() { service.run(); } void on_read(client & c, const error_code & err, size_t read_bytes) { std::istream in(&c.buff); std::string msg; std::getline(in, msg); if ( msg == "request_login") c.sock.async_write( "request_ok\n", on_write); else if ... ... // now, wait for the next read from the same client async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2)); }
      
      





コードがどれほど簡単になったかに注目してください。 client



構造には2つのメンバーしかありませんasync_read_until



handle_clients()



async_read_until



呼び出すasync_read_until



、10個のスレッドを作成し、それぞれがservice.run()



を呼び出します。 これらのスレッドは、クライアントへのすべての非同期読み取りまたは書き込み操作を処理します。 注意すべきもう1つの点は、 on_read()



関数が常に次の非同期読み取り操作の準備をすることです(最後の行を参照)。



非同期関数run()、run_one()、poll()、poll_one()



リスニングループを実装するために、 io_service



クラスはrun(), run_one(), poll()



およびpoll_one()



などの4つの関数を提供します。 ほとんどの場合、 service.run()



作業します。 ここでは、他の機能を使用して達成できることを学習します。



絶えず働く


繰り返しrun()



、保留中の操作が完了するか、ユーザーがio_service::stop()



呼び出すまで、 run()



は機能します。 io_service



インスタンスを機能させ続けるには、通常、1つ以上の非同期操作を追加し、それらが終了したら、次のコードに示すように追加を続けます。



 using namespace boost::asio; io_service service; ip::tcp::socket sock(service); char buff_read[1024], buff_write[1024] = "ok"; void on_read(const boost::system::error_code &err, std::size_t bytes) ; void on_write(const boost::system::error_code &err, std::size_t bytes) { sock.async_read_some(buffer(buff_read), on_read); } void on_read(const boost::system::error_code &err, std::size_t bytes) { // ... process the read ... sock.async_write_some(buffer(buff_write,3), on_write); } void on_connect(const boost::system::error_code &err) { sock.async_read_some(buffer(buff_read), on_read); } int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001); sock.async_connect(ep, on_connect); service.run(); }
      
      





service.run()



が呼び出されると、少なくとも1つの非同期操作が待機しています。 ソケットがサーバーに接続すると、 on_connect



on_connect



、別の非同期操作が追加されます。 on_connect



の終了後、まだスケジュールされた操作が1つあります( read



)。 on_read



操作がon_read



と、応答を書き込み、もう1つのスケジュールされた操作( write



)が追加されます。 on_write



関数がon_write



、サーバーから次のメッセージが読み取らon_write



、別のスケジュールされた操作が追加されます。 on_write



関数がon_write



、1つのスケジュールされた操作( read



)があります。 そのため、アプリケーションを閉じることを決定するまで、このサイクルが続きます。



関数run_one()、poll()、poll_one()


非同期関数ハンドラーは、 io_service::run



が呼び出されたスレッドと同じスレッドで呼び出されることに注意してio_service::run



。 使用する機能は少なくとも90〜95%であるため、簡単にするためにこれが注目されています。 ストリーム内のrun_one(), poll()



、またはpoll_one()



呼び出しについても同様です。

run_one()



関数は、複数の非同期操作を実行して送信します。



次の同等のコードを検討できます。



 io_service service; service.run(); // OR while ( !service.stopped()) service.run_once();
      
      





run_once()



を使用して非同期操作を開始し、それが完了するのを待つことができます。



 io_service service; bool write_complete = false; void on_write(const boost::system::error_code & err, size_t bytes) { write_complete = true; } ... std::string data = "login ok"; write_complete = false; async_write(sock, buffer(data), on_write); do service.run_once() while (!write_complete);
      
      





また、 blocking_tcp_client.cpp



blocking_udp_client.cpp



など、Boost.Asioにバンドルされているrun_one()



を使用する例もあります。 poll_one



関数は、ブロックせずに実行する準備ができている保留中の操作を1つだけ起動します。



ブロックせずに開始する準備ができている保留中の操作は、通常次のいずれかです。



poll_one



を使用して、完了したI / O操作のすべてのハンドラーが実行されていることを確認し、次のタスクに進むことができます。



 io_service service; while ( true) { // run all handlers of completed IO operations while ( service.poll_one()) ; // ... do other work here ... }
      
      





poll()



関数は、保留中のすべての操作を実行し、ブロックせずに実行できます。 次のコードは同等です。



 io_service service; service.poll(); // OR while ( service.poll_one()) ;
      
      





以前のすべての関数は、失敗した場合にboost::system::system_error



をスローします。 しかし、これは決して起こらないはずです。 ここでスローされたエラーは通常クラッシュにつながります。リソースエラーか、ハンドラーの1つが例外をスローした可能性があります。 いずれの場合でも、各関数には例外をスローしないオーバーロードがありますが、 boost::system::error_code



を引数としてboost::system::error_code



、それを戻り値として設定します。



 io_service service; boost::system::error_code err = 0; service.run(err); if ( err) std::cout << "Error " << err << std::endl;
      
      







非同期操作



非同期操作とは、サーバーに接続しているクライアントの非同期処理、ソケットからの非同期の読み取りおよび書き込みだけではありません。 これは、非同期で実行される可能性のあるすべての操作を対象としています。

デフォルトでは、すべての非同期関数のハンドラーが呼び出される順序はわかりません。 さらに、通常、次の呼び出しは非同期です(非同期の読み取り/書き込み/受信ソケットから発信)。 service.post()



を使用して、非同期に呼び出されるカスタム関数を追加できます。次に例を示します。



 #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/asio.hpp> #include <iostream> using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << std::endl; } void worker_thread() { service.run(); } int main(int argc, char* argv[]) { for ( int i = 0; i < 10; ++i) service.post(boost::bind(func, i)); boost::thread_group threads; for ( int i = 0; i < 3; ++i) threads.create_thread(worker_thread); // wait for all threads to be created boost::this_thread::sleep( boost::posix_time::millisec(500)); threads.join_all(); }
      
      





前の例では、 service.post(some_function)



は非同期関数呼び出しを追加します。 この関数は、 io_service



インスタンスにservice.run()



を呼び出すスレッドの1つでこのsome_function



を呼び出すように要求するとすぐに終了します。 この例では、3つのスレッドのいずれかを事前に作成しました。 非同期関数がどの順序で呼び出されるかを確認することはできません。 追加された順に呼び出されることを期待しないでください( post()



)。 前の例の結果は次のとおりです。



 func called, i= 0 func called, i= 2 func called, i= 1 func called, i= 4 func called, i= 3 func called, i= 6 func called, i= 7 func called, i= 8 func called, i= 5 func called, i= 9
      
      





非同期関数にハンドラーを割り当てたい場合があります。 たとえば、レストラン( go_to_restaurant



)に行き、 order



、食べなければならない( go_to_restaurant



)としましょう。 最初にレストランに来て、注文してから食べたいだけです。 これを行うには、 io_service::strand



を使用して、呼び出す非同期ハンドラーを割り当てます。 次の例を考えてみましょう。



 using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl; } void worker_thread() { service.run(); } int main(int argc, char* argv[]) { io_service::strand strand_one(service), strand_two(service); for ( int i = 0; i < 5; ++i) service.post( strand_one.wrap( boost::bind(func, i))); for ( int i = 5; i < 10; ++i) service.post( strand_two.wrap( boost::bind(func, i))); boost::thread_group threads; for ( int i = 0; i < 3; ++i) threads.create_thread(worker_thread); // wait for all threads to be created boost::this_thread::sleep( boost::posix_time::millisec(500)); threads.join_all(); }
      
      





上記のコードでは、最初の5つのストリームIDと最後の5つのストリームIDが順番に表示されます。つまり、 func called, i = 0



れる前func called, i = 0



が表示さfunc called, i = 1



func called, i = 2



れます。 。 同じことがfunc called, i = 5



れたfunc called, i = 5



func called, i = 6



およびfunc called, i = 6



func called, i = 7



なります。 関数がシーケンシャルに呼び出されたとしても、これはそれらがすべて同じスレッドで呼び出されることを意味しないことに注意してください。 このプログラムの可能な実装は次のとおりです。



 func called, i= 0/002A60C8 func called, i= 5/002A6138 func called, i= 6/002A6530 func called, i= 1/002A6138 func called, i= 7/002A6530 func called, i= 2/002A6138 func called, i= 8/002A6530 func called, i= 3/002A6138 func called, i= 9/002A6530 func called, i= 4/002A6138
      
      







非同期ポスト()vsディスパッチ()vsラップ()


Boost.Asioは、非同期呼び出しに関数ハンドラーを追加する3つの方法を提供します。



前のセクションでservice.post()



を使用する例と、プログラム実行の可能な結果を​​見ました。 変更して、 service.dispatch



が結果にどのように影響するかを確認します。



 using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << std::endl; } void run_dispatch_and_post() { for ( int i = 0; i < 10; i += 2) { service.dispatch(boost::bind(func, i)); service.post(boost::bind(func, i + 1)); } } int main(int argc, char* argv[]) { service.post(run_dispatch_and_post); service.run(); }
      
      





ここで何が起こっているかを説明する前に、プログラムを実行して結果を見てみましょう。



 func called, i= 0 func called, i= 2 func called, i= 4 func called, i= 6 func called, i= 8 func called, i= 1 func called, i= 3 func called, i= 5 func called, i= 7 func called, i= 9
      
      





最初に偶数が書き込まれ、次に奇数が書き込まれます。 これは、 dispatch()



を使用して偶数を書き込み、 post()



を使用して奇数を書き込むためです。 現在のスレッドがservice.run()



dispatch()



を呼び出しているため、 dispatch()



は終了する前にハンドラーを呼び出しますが、post()はすぐに終了します。

さて、 service.wrap(handler)



について話しましょう。 wrap()は、別の関数の引数として使用できるファンクターを返します。



 using namespace boost::asio; io_service service; void dispatched_func_1() { std::cout << "dispatched 1" << std::endl; } void dispatched_func_2() { std::cout << "dispatched 2" << std::endl; } void test(boost::function<void()> func) { std::cout << "test" << std::endl; service.dispatch(dispatched_func_1); func(); } void service_run() { service.run(); } int main(int argc, char* argv[]) { test( service.wrap(dispatched_func_2)); boost::thread th(service_run); boost::this_thread::sleep( boost::posix_time::millisec(500)); th.join(); }
      
      





文字列test(service.wrap(dispatched_func_2));



dispatched_func_2



をラップし、引数としてtest



に渡されるファンクターを作成します。 test()



呼び出されると、呼び出しをdispatched_func_1()



にリダイレクトし、 func()



を呼び出します。 この時点で、 func()



呼び出しfunc()



service.dispatch(dispatched_func_2)



func()



同等であることがわかります。これらは順番に呼び出されるためです。 プログラム出力はこれを確認します:



 test dispatched 1 dispatched 2
      
      





io_service::strand



クラス(非同期アクションのシリアル化に使用)には、関数poll(), dispatch()



wrap()



も含まれています。 それらの意味は、 io_service



poll(), dispatch()



およびwrap()



io_service



の意味と同じio_service



。 ただし、ほとんどの場合、 io_service::strand::wrap()



関数はio_service::poll()



またはio_service::dispatch()



引数としてのみ使用します。



生き続けるために



次の操作を実行して言います。



 io_service service; ip::tcp::socket sock(service); char buff[512]; ... read(sock, buffer(buff));
      
      





この場合、 sock



buff



は両方ともread()



呼び出しで生き残るはずです。 つまり、 read()



呼び出しが完了した後、有効でなければなりません。 これはまさにあなたが期待するものです。関数に渡す引数はすべて、その中で有効でなければなりません。 非同期に進むと、事態はさらに複雑になります。



 io_service service; ip::tcp::socket sock(service); char buff[512]; void on_read(const boost::system::error_code &, size_t) {} ... async_read(sock, buffer(buff), on_read);
      
      





この場合、 sock



buff



read



操作自体に耐える必要がありread



が、非同期であるため、これがいつ発生するかはわかりません。

ソケットバッファーを使用する場合、非同期呼び出しに耐えたbuffer



インスタンスを使用できます( boost::shared_array<>



)。 ここでは、ソケットと読み取り/書き込みバッファを内部に含むクラスを作成することにより、同じ原則を使用できます。 次に、すべての非同期呼び出しに対して、 boost::bind



functorを共有ポインターで渡します:



 using namespace boost::asio; io_service service; struct connection : boost::enable_shared_from_this<connection> { typedef boost::system::error_code error_code; typedef boost::shared_ptr<connection> ptr; connection() : sock_(service), started_(true) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1)); } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } private: void on_connect(const error_code & err) { // here you decide what to do with the connection: read or write if ( !err) do_read(); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg == "can_login") do_write("access_data"); else if ( msg.find("data ") == 0) process_data(msg); else if ( msg == "login_fail") stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2)); } void do_write(const std::string & msg) { if ( !started() ) return; // note: in case you want to send several messages before // doing another async_read, you'll need several write buffers! std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2)); } void process_data(const std::string & msg) { // process what comes from server, and then perform another write } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; }; int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); connection::ptr(new connection)->start(ep); }
      
      





すべての非同期呼び出しでは、 boost::bind



ファンクターが引数として送信されます。 このファンクターは、 connection



インスタンスへの共有ポインターを内部的に保存します。 非同期操作が保留されている間、Boost.Asioはboost::bind



functorのコピーを保存し、これはconnection



への共有ポインターを保存しconnection



。 問題は解決しました!

もちろん、 connection



クラスは単なるskeleton



クラスです。 ニーズに合わせて調整する必要があります(サーバーの場合は、外観が完全に異なります)。 新しい接続connection::ptr(new connection)->start(ep)



簡単に作成できることに注意してください。 これにより、サーバーへの(非同期)接続が開始されます。 接続を閉じたい場合は、 stop()



を呼び出します。

インスタンスが開始されると( start()



)、接続を待機します。 接続が発生すると、 on_connect()



呼び出されます。 エラーがない場合、読み取り操作( do_read()



)が呼び出されます。 読み取り操作が完了すると、メッセージを解釈できます。 ほとんどの場合、アプリケーションのon_read()



は異なって見えます。 メッセージを送信するときは、 do_write()



で行われるように、メッセージをバッファにコピーしてから送信する必要があります。これは、バッファが非同期書き込み操作に耐えなければならないためです。 最後のメモ-記録するときは、書き込む量を指定する必要があることを忘れないでください。そうしないと、バッファ全体が送信されます。



まとめ



ネットワークAPIは非常に広範囲です。 この章は、独自のネットワークアプリケーションを実装するときに戻るリンクとして実装されました。

Boost.Asioはエンドポイントの概念を実装しており、IPアドレスとポートと考えることができます。 正確なIPアドレスがわからない場合は、 resolver



オブジェクトを使用して、1つ以上のIPアドレスの代わりにwww.yahoo.comなどのホスト名を含めることができます。

また、コアAPIにあるソケットクラスも調べました。 Boost.AsioはTCP、UDP、およびICMPの実装を提供しますが、自分のプロトコル用に拡張できますが、これは気弱な人向けではありません。

非同期プログラミングは必要な悪です。 特にサーバーを作成するときに、なぜこれが必要なのかを見ました。通常、service.run()



非同期ループを作成するのに十分な呼び出しがありますが、さらに進む必要がある場合はrun_one(), poll()



、またはを使用できますpoll_one()





非同期アプローチを使用する場合は、service.post()



またはを使用するだけで、独自の非同期関数を使用できservice.dispatch()



ます。

最後に、非同期操作の全期間(完了まで)の間、ソケットとバッファー(読み取りまたは書き込み用)の両方を有効に保つために、特別な予防措置を講じる必要があります。クラスconnection



はから派生しenabled_shared_from_this



、その中に必要なすべてのバッファを含み、各非同期呼び出しでこの操作に共有ポインタを渡す必要があります。

次の章は多くの実際的な作業になります。クライアント/サーバーエコーなどのアプリケーションを実装する際の多くのアプリケーションコーディング。



みなさん、ありがとうございました!



All Articles