John Torjoの本Boost.Asio C ++ Network Programmingの翻訳を続けています。 第2章のこの部分では、非同期プログラミングについて説明します。
内容:
- 第1章:Boost.Asioの使用開始
- 第2章:Boost.Asioの基本
- パート1:Boost.Asioの基本
- パート2:非同期プログラミング
- 第3章:エコーサーバー/クライアント
- 第4章:クライアントとサーバー
- 第5章:同期と非同期
- 第6章:Boost.Asio-その他の機能
- 第7章:Boost.Asio-追加トピック
このセクションでは、非同期プログラミングで作業するときに発生する問題のいくつかについて詳しく説明します。 一度読んだ後、これらの概念の理解を深めるために、この本を読み進めながら、それに戻ることをお勧めします。
非同期で作業する必要性
前述したように、原則として、同期プログラミングは非同期プログラミングよりもはるかに簡単です。 線形に考えるのがはるかに簡単なので(関数Aを呼び出し、終了後にハンドラーを呼び出し、関数Bを呼び出し、終了後にハンドラーを呼び出す、など、イベントハンドラーのように考えることができます)。 後者の場合、たとえば、5つのイベントを実行できますが、実行される順序を見つけることはできません。また、それらがすべて満たされるかどうかもわかりません。
しかし、非同期プログラミングはより複雑ですが、たとえば、多数のクライアントを同時に処理する必要があるサーバーを作成する場合は、おそらくそれを好むでしょう。 クライアントが多いほど、非同期プログラミングは同期プログラミングに比べて簡単になります。
1000のクライアントを同時に処理するアプリケーションがあり、クライアントからサーバーへの、およびサーバーからクライアントへの各メッセージが文字「\ n」で終わるとします。
同期コード、1スレッド:
using namespace boost::asio; struct client { ip::tcp::socket sock; char buff[1024]; // each msg is at maximum this size int already_read; // how much have we already read? }; std::vector<client> clients; void handle_clients() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) on_read(clients[i]); } void on_read(client & c) { int to_read = std::min( 1024 - c.already_read, c.sock. available()); c.sock.read_some( buffer(c.buff + c.already_read, to_read)); c.already_read += to_read; if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) { int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff; std::string msg(c.buff, c.buff + pos); std::copy(c.buff + pos, c.buff + 1024, c.buff); c.already_read -= pos; on_read_msg(c, msg); } } void on_read_msg(client & c, const std::string & msg) { // analyze message, and write back if ( msg == "request_login") c.sock.write( "request_ok\n"); else if ... }
サーバー(および基本的にはすべてのネットワークアプリケーション)を作成するときに避けたいことの1つは、コードが応答を停止することです。 この場合、
handle_clients()
関数をできるだけブロックしないようにします。 機能がいずれかの時点でブロックされている場合、クライアントからのすべての着信メッセージは、機能がロック解除されるまで待機し、処理を開始します。
応答性を維持するために、データがある
if ( clients[i].sock.available() ) on_read(clients[i])
、
if ( clients[i].sock.available() ) on_read(clients[i])
場合にのみソケットから読み取り
if ( clients[i].sock.available() ) on_read(clients[i])
。 on_readでは、利用可能な範囲でのみ読み取ります。
read_until(c.sock, buffer(...),'\n')
を呼び出すことは、特定のクライアントから最後までメッセージを読み取るまでブロックされるため、良いアイデアではありません(これがいつ起こるかわかりません) 。
ここでのボトルネックは
on_read_msg()
関数です。 この関数が実行されるまで、すべての着信メッセージは一時停止されます。 うまく書かれた関数
on_read_msg()
はこれが起こらないことを保証しますが、起こる可能性があります(例えば、バッファがいっぱいの場合、ソケットへの書き込みがブロックされることがあります)。
同期コード、10スレッド:
using namespace boost::asio; struct client { // ... same as before bool set_reading() { boost::mutex::scoped_lock lk(cs_); if ( is_reading_) return false; // already reading else { is_reading_ = true; return true; } } void unset_reading() { boost::mutex::scoped_lock lk(cs_); is_reading_ = false; } private: boost::mutex cs_; bool is_reading_; }; std::vector<client> clients; void handle_clients() { for ( int i = 0; i < 10; ++i) boost::thread( handle_clients_thread); } void handle_clients_thread() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) if ( clients[i].set_reading()) { on_read(clients[i]); clients[i].unset_reading(); } } void on_read(client & c) { // same as before } void on_read_msg(client & c, const std::string & msg) { // same as before }
複数のスレッドを使用するには、それらを同期する必要があります。これは
set_reading
()および
set_unreading()
関数が行うことです。
set_reading()
関数は非常に重要です。 「読むことができ、読むことができるかどうかを確認する」ことは、1つのステップで行われます。 2つのステップ(「読み取り可能かどうかを確認する」と「読み取りを開始する」)で行う場合、2つのスレッドを開始できます。1つはクライアントの読み取りを確認し、もう1つは同じクライアントの
on_read
関数を呼び出します。長い目で見れば、これはデータの破損につながり、場合によってはシステムのクラッシュにさえつながります。
コードがより複雑になっていることに気付くでしょう。
3番目のオプションは、同期コード、つまりクライアントごとに1つのスレッドを持つことも可能です。 しかし、同時顧客の数が増えると、これは主に許容できない操作になります。
次に、非同期オプションについて検討します。 私たちは常に非同期の読み取り操作を行いました。 クライアントが要求を行うと、
on_read
操作が
on_read
、応答し、次の要求が到着するまで待機します(別の非同期読み取り操作を開始します)。
非同期コード、10スレッド:
using namespace boost::asio; io_service service; struct client { ip::tcp::socket sock; streambuf buff; // reads the answer from the client } std::vector<client> clients; void handle_clients() { for ( int i = 0; i < clients.size(); ++i) async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2)); for ( int i = 0; i < 10; ++i) boost::thread(handle_clients_thread); } void handle_clients_thread() { service.run(); } void on_read(client & c, const error_code & err, size_t read_bytes) { std::istream in(&c.buff); std::string msg; std::getline(in, msg); if ( msg == "request_login") c.sock.async_write( "request_ok\n", on_write); else if ... ... // now, wait for the next read from the same client async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2)); }
コードがどれほど簡単になったかに注目してください。
client
構造には2つのメンバーしかありません
async_read_until
handle_clients()
は
async_read_until
呼び出す
async_read_until
、10個のスレッドを作成し、それぞれが
service.run()
を呼び出します。 これらのスレッドは、クライアントへのすべての非同期読み取りまたは書き込み操作を処理します。 注意すべきもう1つの点は、
on_read()
関数が常に次の非同期読み取り操作の準備をすることです(最後の行を参照)。
非同期関数run()、run_one()、poll()、poll_one()
リスニングループを実装するために、
io_service
クラスは
run(), run_one(), poll()
および
poll_one()
などの4つの関数を提供します。 ほとんどの場合、
service.run()
作業します。 ここでは、他の機能を使用して達成できることを学習します。
絶えず働く
繰り返し
run()
、保留中の操作が完了するか、ユーザーが
io_service::stop()
呼び出すまで、
run()
は機能します。
io_service
インスタンスを機能させ続けるには、通常、1つ以上の非同期操作を追加し、それらが終了したら、次のコードに示すように追加を続けます。
using namespace boost::asio; io_service service; ip::tcp::socket sock(service); char buff_read[1024], buff_write[1024] = "ok"; void on_read(const boost::system::error_code &err, std::size_t bytes) ; void on_write(const boost::system::error_code &err, std::size_t bytes) { sock.async_read_some(buffer(buff_read), on_read); } void on_read(const boost::system::error_code &err, std::size_t bytes) { // ... process the read ... sock.async_write_some(buffer(buff_write,3), on_write); } void on_connect(const boost::system::error_code &err) { sock.async_read_some(buffer(buff_read), on_read); } int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001); sock.async_connect(ep, on_connect); service.run(); }
service.run()
が呼び出されると、少なくとも1つの非同期操作が待機しています。 ソケットがサーバーに接続すると、
on_connect
が
on_connect
、別の非同期操作が追加されます。
on_connect
の終了後、まだスケジュールされた操作が1つあります(
read
)。
on_read
操作が
on_read
と、応答を書き込み、もう1つのスケジュールされた操作(
write
)が追加されます。
on_write
関数が
on_write
、サーバーから次のメッセージが読み取ら
on_write
、別のスケジュールされた操作が追加されます。
on_write
関数が
on_write
、1つのスケジュールされた操作(
read
)があります。 そのため、アプリケーションを閉じることを決定するまで、このサイクルが続きます。
関数run_one()、poll()、poll_one()
非同期関数ハンドラーは、
io_service::run
が呼び出されたスレッドと同じスレッドで呼び出されることに注意して
io_service::run
。 使用する機能は少なくとも90〜95%であるため、簡単にするためにこれが注目されています。 ストリーム内の
run_one(), poll()
、または
poll_one()
呼び出しについても同様です。
run_one()
関数は、複数の非同期操作を実行して送信します。
- スケジュールされた操作がない場合、関数はすぐに終了し、0を返します
- 保留中の操作がある場合、最初の操作の機能ブロックが実行され、1が返されます
次の同等のコードを検討できます。
io_service service; service.run(); // OR while ( !service.stopped()) service.run_once();
run_once()
を使用して非同期操作を開始し、それが完了するのを待つことができます。
io_service service; bool write_complete = false; void on_write(const boost::system::error_code & err, size_t bytes) { write_complete = true; } ... std::string data = "login ok"; write_complete = false; async_write(sock, buffer(data), on_write); do service.run_once() while (!write_complete);
また、
blocking_tcp_client.cpp
や
blocking_udp_client.cpp
など、Boost.Asioにバンドルされている
run_one()
を使用する例もあります。
poll_one
関数は、ブロックせずに実行する準備ができている保留中の操作を1つだけ起動します。
- ブロックせずに開始する準備ができている保留中の操作が少なくとも1つある場合、
run_one()
はそれを開始して1を返します。 - それ以外の場合、関数はすぐに終了し、0を返します。
ブロックせずに開始する準備ができている保留中の操作は、通常次のいずれかです。
- 期限切れになっており、
async_wait
ハンドラーによって呼び出される必要があるタイマー - 完了したI / O操作(たとえば
async_read
)とそのハンドラーをasync_read
必要があります - 以前に
io_services
インスタンスio_services
追加されたカスタムハンドラー(これについては、次のセクションで詳しく説明します)
poll_one
を使用して、完了したI / O操作のすべてのハンドラーが実行されていることを確認し、次のタスクに進むことができます。
io_service service; while ( true) { // run all handlers of completed IO operations while ( service.poll_one()) ; // ... do other work here ... }
poll()
関数は、保留中のすべての操作を実行し、ブロックせずに実行できます。 次のコードは同等です。
io_service service; service.poll(); // OR while ( service.poll_one()) ;
以前のすべての関数は、失敗した場合に
boost::system::system_error
をスローします。 しかし、これは決して起こらないはずです。 ここでスローされたエラーは通常クラッシュにつながります。リソースエラーか、ハンドラーの1つが例外をスローした可能性があります。 いずれの場合でも、各関数には例外をスローしないオーバーロードがありますが、
boost::system::error_code
を引数として
boost::system::error_code
、それを戻り値として設定します。
io_service service; boost::system::error_code err = 0; service.run(err); if ( err) std::cout << "Error " << err << std::endl;
非同期操作
非同期操作とは、サーバーに接続しているクライアントの非同期処理、ソケットからの非同期の読み取りおよび書き込みだけではありません。 これは、非同期で実行される可能性のあるすべての操作を対象としています。
デフォルトでは、すべての非同期関数のハンドラーが呼び出される順序はわかりません。 さらに、通常、次の呼び出しは非同期です(非同期の読み取り/書き込み/受信ソケットから発信)。
service.post()
を使用して、非同期に呼び出されるカスタム関数を追加できます。次に例を示します。
#include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/asio.hpp> #include <iostream> using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << std::endl; } void worker_thread() { service.run(); } int main(int argc, char* argv[]) { for ( int i = 0; i < 10; ++i) service.post(boost::bind(func, i)); boost::thread_group threads; for ( int i = 0; i < 3; ++i) threads.create_thread(worker_thread); // wait for all threads to be created boost::this_thread::sleep( boost::posix_time::millisec(500)); threads.join_all(); }
前の例では、
service.post(some_function)
は非同期関数呼び出しを追加します。 この関数は、
io_service
インスタンスに
service.run()
を呼び出すスレッドの1つでこの
some_function
を呼び出すように要求するとすぐに終了します。 この例では、3つのスレッドのいずれかを事前に作成しました。 非同期関数がどの順序で呼び出されるかを確認することはできません。 追加された順に呼び出されることを期待しないでください(
post()
)。 前の例の結果は次のとおりです。
func called, i= 0 func called, i= 2 func called, i= 1 func called, i= 4 func called, i= 3 func called, i= 6 func called, i= 7 func called, i= 8 func called, i= 5 func called, i= 9
非同期関数にハンドラーを割り当てたい場合があります。 たとえば、レストラン(
go_to_restaurant
)に行き、
order
、食べなければならない(
go_to_restaurant
)としましょう。 最初にレストランに来て、注文してから食べたいだけです。 これを行うには、
io_service::strand
を使用して、呼び出す非同期ハンドラーを割り当てます。 次の例を考えてみましょう。
using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl; } void worker_thread() { service.run(); } int main(int argc, char* argv[]) { io_service::strand strand_one(service), strand_two(service); for ( int i = 0; i < 5; ++i) service.post( strand_one.wrap( boost::bind(func, i))); for ( int i = 5; i < 10; ++i) service.post( strand_two.wrap( boost::bind(func, i))); boost::thread_group threads; for ( int i = 0; i < 3; ++i) threads.create_thread(worker_thread); // wait for all threads to be created boost::this_thread::sleep( boost::posix_time::millisec(500)); threads.join_all(); }
上記のコードでは、最初の5つのストリームIDと最後の5つのストリームIDが順番に表示されます。つまり、
func called, i = 0
れる前
func called, i = 0
が表示さ
func called, i = 1
func called, i = 2
れます。 。 同じことが
func called, i = 5
れた
func called, i = 5
func called, i = 6
および
func called, i = 6
が
func called, i = 7
なります。 関数がシーケンシャルに呼び出されたとしても、これはそれらがすべて同じスレッドで呼び出されることを意味しないことに注意してください。 このプログラムの可能な実装は次のとおりです。
func called, i= 0/002A60C8 func called, i= 5/002A6138 func called, i= 6/002A6530 func called, i= 1/002A6138 func called, i= 7/002A6530 func called, i= 2/002A6138 func called, i= 8/002A6530 func called, i= 3/002A6138 func called, i= 9/002A6530 func called, i= 4/002A6138
非同期ポスト()vsディスパッチ()vsラップ()
Boost.Asioは、非同期呼び出しに関数ハンドラーを追加する3つの方法を提供します。
-
service.post(handler)
:この関数は、指定されたハンドラーを呼び出すためにio_service
インスタンスに要求を行った直後にio_service
するようにします。 ハンドラーは、後でservice.run()
を呼び出したスレッドの1つで呼び出されます。 -
service.dispatch(handler)
:これは、指定されたハンドラーを呼び出すio_service
インスタンスへの要求ですが、さらに、現在のスレッドがservice.run()
を呼び出した場合、関数内のハンドラーを呼び出すことができます。 -
service.wrap(handler)
:この関数は、service.dispatch(handler)
を呼び出すラッパー関数を作成します。 これは少しわかりにくいですが、すぐにその意味を説明します。
前のセクションで
service.post()
を使用する例と、プログラム実行の可能な結果を見ました。 変更して、
service.dispatch
が結果にどのように影響するかを確認します。
using namespace boost::asio; io_service service; void func(int i) { std::cout << "func called, i= " << i << std::endl; } void run_dispatch_and_post() { for ( int i = 0; i < 10; i += 2) { service.dispatch(boost::bind(func, i)); service.post(boost::bind(func, i + 1)); } } int main(int argc, char* argv[]) { service.post(run_dispatch_and_post); service.run(); }
ここで何が起こっているかを説明する前に、プログラムを実行して結果を見てみましょう。
func called, i= 0 func called, i= 2 func called, i= 4 func called, i= 6 func called, i= 8 func called, i= 1 func called, i= 3 func called, i= 5 func called, i= 7 func called, i= 9
最初に偶数が書き込まれ、次に奇数が書き込まれます。 これは、
dispatch()
を使用して偶数を書き込み、
post()
を使用して奇数を書き込むためです。 現在のスレッドが
service.run()
dispatch()
を呼び出しているため、
dispatch()
は終了する前にハンドラーを呼び出しますが、post()はすぐに終了します。
さて、
service.wrap(handler)
について話しましょう。 wrap()は、別の関数の引数として使用できるファンクターを返します。
using namespace boost::asio; io_service service; void dispatched_func_1() { std::cout << "dispatched 1" << std::endl; } void dispatched_func_2() { std::cout << "dispatched 2" << std::endl; } void test(boost::function<void()> func) { std::cout << "test" << std::endl; service.dispatch(dispatched_func_1); func(); } void service_run() { service.run(); } int main(int argc, char* argv[]) { test( service.wrap(dispatched_func_2)); boost::thread th(service_run); boost::this_thread::sleep( boost::posix_time::millisec(500)); th.join(); }
文字列
test(service.wrap(dispatched_func_2));
dispatched_func_2
をラップし、引数として
test
に渡されるファンクターを作成します。
test()
呼び出されると、呼び出しを
dispatched_func_1()
にリダイレクトし、
func()
を呼び出します。 この時点で、
func()
呼び出し
func()
service.dispatch(dispatched_func_2)
func()
同等であることがわかります。これらは順番に呼び出されるためです。 プログラム出力はこれを確認します:
test dispatched 1 dispatched 2
io_service::strand
クラス(非同期アクションのシリアル化に使用)には、関数
poll(), dispatch()
wrap()
も含まれています。 それらの意味は、
io_service
の
poll(), dispatch()
および
wrap()
io_service
の意味と同じ
io_service
。 ただし、ほとんどの場合、
io_service::strand::wrap()
関数は
io_service::poll()
または
io_service::dispatch()
引数としてのみ使用します。
生き続けるために
次の操作を実行して言います。
io_service service; ip::tcp::socket sock(service); char buff[512]; ... read(sock, buffer(buff));
この場合、
sock
と
buff
は両方とも
read()
呼び出しで生き残るはずです。 つまり、
read()
呼び出しが完了した後、有効でなければなりません。 これはまさにあなたが期待するものです。関数に渡す引数はすべて、その中で有効でなければなりません。 非同期に進むと、事態はさらに複雑になります。
io_service service; ip::tcp::socket sock(service); char buff[512]; void on_read(const boost::system::error_code &, size_t) {} ... async_read(sock, buffer(buff), on_read);
この場合、
sock
と
buff
は
read
操作自体に耐える必要があり
read
が、非同期であるため、これがいつ発生するかはわかりません。
ソケットバッファーを使用する場合、非同期呼び出しに耐えた
buffer
インスタンスを使用できます(
boost::shared_array<>
)。 ここでは、ソケットと読み取り/書き込みバッファを内部に含むクラスを作成することにより、同じ原則を使用できます。 次に、すべての非同期呼び出しに対して、
boost::bind
functorを共有ポインターで渡します:
using namespace boost::asio; io_service service; struct connection : boost::enable_shared_from_this<connection> { typedef boost::system::error_code error_code; typedef boost::shared_ptr<connection> ptr; connection() : sock_(service), started_(true) {} void start(ip::tcp::endpoint ep) { sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1)); } void stop() { if ( !started_) return; started_ = false; sock_.close(); } bool started() { return started_; } private: void on_connect(const error_code & err) { // here you decide what to do with the connection: read or write if ( !err) do_read(); else stop(); } void on_read(const error_code & err, size_t bytes) { if ( !started() ) return; std::string msg(read_buffer_, bytes); if ( msg == "can_login") do_write("access_data"); else if ( msg.find("data ") == 0) process_data(msg); else if ( msg == "login_fail") stop(); } void on_write(const error_code & err, size_t bytes) { do_read(); } void do_read() { sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2)); } void do_write(const std::string & msg) { if ( !started() ) return; // note: in case you want to send several messages before // doing another async_read, you'll need several write buffers! std::copy(msg.begin(), msg.end(), write_buffer_); sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2)); } void process_data(const std::string & msg) { // process what comes from server, and then perform another write } private: ip::tcp::socket sock_; enum { max_msg = 1024 }; char read_buffer_[max_msg]; char write_buffer_[max_msg]; bool started_; }; int main(int argc, char* argv[]) { ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001); connection::ptr(new connection)->start(ep); }
すべての非同期呼び出しでは、
boost::bind
ファンクターが引数として送信されます。 このファンクターは、
connection
インスタンスへの共有ポインターを内部的に保存します。 非同期操作が保留されている間、Boost.Asioは
boost::bind
functorのコピーを保存し、これは
connection
への共有ポインターを保存し
connection
。 問題は解決しました!
もちろん、
connection
クラスは単なる
skeleton
クラスです。 ニーズに合わせて調整する必要があります(サーバーの場合は、外観が完全に異なります)。 新しい接続
connection::ptr(new connection)->start(ep)
簡単に作成できることに注意してください。 これにより、サーバーへの(非同期)接続が開始されます。 接続を閉じたい場合は、
stop()
を呼び出します。
インスタンスが開始されると(
start()
)、接続を待機します。 接続が発生すると、
on_connect()
呼び出されます。 エラーがない場合、読み取り操作(
do_read()
)が呼び出されます。 読み取り操作が完了すると、メッセージを解釈できます。 ほとんどの場合、アプリケーションの
on_read()
は異なって見えます。 メッセージを送信するときは、
do_write()
で行われるように、メッセージをバッファにコピーしてから送信する必要があります。これは、バッファが非同期書き込み操作に耐えなければならないためです。 最後のメモ-記録するときは、書き込む量を指定する必要があることを忘れないでください。そうしないと、バッファ全体が送信されます。
まとめ
ネットワークAPIは非常に広範囲です。 この章は、独自のネットワークアプリケーションを実装するときに戻るリンクとして実装されました。
Boost.Asioはエンドポイントの概念を実装しており、IPアドレスとポートと考えることができます。 正確なIPアドレスがわからない場合は、
resolver
オブジェクトを使用して、1つ以上のIPアドレスの代わりにwww.yahoo.comなどのホスト名を含めることができます。
また、コアAPIにあるソケットクラスも調べました。 Boost.AsioはTCP、UDP、およびICMPの実装を提供しますが、自分のプロトコル用に拡張できますが、これは気弱な人向けではありません。
非同期プログラミングは必要な悪です。 特にサーバーを作成するときに、なぜこれが必要なのかを見ました。通常、
service.run()
非同期ループを作成するのに十分な呼び出しがありますが、さらに進む必要がある場合は
run_one(), poll()
、またはを使用できます
poll_one()
。
非同期アプローチを使用する場合は、
service.post()
またはを使用するだけで、独自の非同期関数を使用でき
service.dispatch()
ます。
最後に、非同期操作の全期間(完了まで)の間、ソケットとバッファー(読み取りまたは書き込み用)の両方を有効に保つために、特別な予防措置を講じる必要があります。クラス
connection
はから派生し
enabled_shared_from_this
、その中に必要なすべてのバッファを含み、各非同期呼び出しでこの操作に共有ポインタを渡す必要があります。
次の章は多くの実際的な作業になります。クライアント/サーバーエコーなどのアプリケーションを実装する際の多くのアプリケーションコーディング。
みなさん、ありがとうございました!