先ほど、リアクティブプログラミングにおけるSpring Boot 2の潜在的な役割について、いくつかの小さな記事を紹介しました。 その後、一般的なプログラミングで非同期操作がどのように機能するかについて多くの質問を受けました。 今日は 、 ノンブロッキングI / Oとは何か、この知識を適用して、1つのスレッドで多くのオープンで重い(長い)接続を処理できるPythonの小さなtcpサーバーを作成する方法を知りたいと思います。 pythonの知識は必要ありません。すべてのコメントが非常に簡単になります。 みんなを招待します!
私は、他の多くの開発者と同様に、実験が本当に好きなので、以降の記事全体は、彼らがもたらす一連の実験と結論だけで構成されます。 あなたはその主題に精通していないと思われ、私を試してみようと思っています。 サンプルソースはgithubにあります。
非常に単純なtcpサーバーを書くことから始めましょう。 サーバーのタスクは、ソケットからデータを受信して印刷し、 サーバーから文字列Helloを返すことです! 。 次のようになります。
同期tcpサーバー
import socket # SERVER_ADDRESS = ('localhost', 8686) # server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind(SERVER_ADDRESS) server_socket.listen(10) print('server is running, please, press ctrl+c to stop') # while True: connection, address = server_socket.accept() print("new connection from {address}".format(address=address)) data = connection.recv(1024) print(str(data)) connection.send(bytes('Hello from server!', encoding='UTF-8')) connection.close()
ここではすべてが非常に簡単です。 ソケットの概念に詳しくない場合は、非常にシンプルで実用的な記事をご覧ください。 ソケットを作成し、着信接続をキャッチし、指定されたロジックに従って処理します。 ここでは、メッセージに注意を払う価値があります。 クライアントとの新しい接続を作成するとき、コンソールにそれについて書き込みます。
記事が完全に読まれるまで、プログラムのリストを真剣に掘り下げるべきではないことにすぐに注意したい。 最初に何かが完全に明確でない場合、それは完全に正常です。 読み続けてください。
クライアントのないサーバーにはあまり意味がありません。 したがって、次のステップは、サーバーを使用する小さなクライアントを作成することです。
tcpクライアント
#!/usr/bin/env python # -*- coding: utf-8 -*- import socket MAX_CONNECTIONS = 20 address_to_server = ('localhost', 8686) clients = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(MAX_CONNECTIONS)] for client in clients: client.connect(address_to_server) for i in range(MAX_CONNECTIONS): clients[i].send(bytes("hello from client number " + str(i), encoding='UTF-8')) for client in clients: data = client.recv(1024) print(str(data))
ここで重要な機能は、最初に可能な最大接続数を確立し、それを使用してからデータ転送/ストレージに使用することです。
サーバーを起動しましょう。 私たちが最初に目にするもの:
server is running, please, press ctrl+c to stop
これは、サーバーを正常に起動し、着信要求を受け入れる準備ができたことを意味します。 クライアントを実行し、すぐにサーバーコンソールで確認します(ポートは異なる場合があります)。
server is running, please, press ctrl+c to stop new connection from ('127.0.0.1', 39196) b'hello from client number 0' new connection from ('127.0.0.1', 39198) b'hello from client number 1' ...
予想されることです。 無限ループでは、新しい接続を取得し、その接続からのデータをすぐに処理します。 ここで問題は何ですか? 以前は、 server_socket.listen(10)オプションを使用してサーバーを構成しました。 これは、まだ受け入れられていない接続の最大キューサイズを意味します。 しかし、1つの接続を受け入れるため、これはまったく意味がありません。 この状況で何をすべきか? 実際、いくつかの方法があります。
- スレッド/プロセスを使用して並列化します(これには、たとえばforkまたはプールを使用できます)。 詳細はこちら 。
- サーバーに接続するときではなく、これらの接続が適切な量のデータで満たされるように、要求を処理します。 簡単に言えば、すぐに最大量のリソースを開き、できる限り多くのリソースから読み取ることができます(理想的な場合、これに必要なプロセッサー時間)。
2番目のアイデアは魅力的なようです。 1つのスレッドと複数の接続の処理。 それがどのように見えるか見てみましょう。 コードの豊富さを恐れないでください。 何かがすぐにはっきりしない場合、これは非常に正常です。 あなたは自分でそれを実行して利益を上げることができます:
非同期サーバー
import select import socket SERVER_ADDRESS = ('localhost', 8686) # , MAX_CONNECTIONS = 10 # INPUTS = list() OUTPUTS = list() def get_non_blocking_server_socket(): # , server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(0) # server.bind(SERVER_ADDRESS) # server.listen(MAX_CONNECTIONS) return server def handle_readables(readables, server): """ """ for resource in readables: # , if resource is server: connection, client_address = resource.accept() connection.setblocking(0) INPUTS.append(connection) print("new connection from {address}".format(address=client_address)) # , else: data = "" try: data = resource.recv(1024) # except ConnectionResetError: pass if data: # print("getting data: {data}".format(data=str(data))) # , if resource not in OUTPUTS: OUTPUTS.append(resource) # , , else: # clear_resource(resource) def clear_resource(resource): """ """ if resource in OUTPUTS: OUTPUTS.remove(resource) if resource in INPUTS: INPUTS.remove(resource) resource.close() print('closing connection ' + str(resource)) def handle_writables(writables): # for resource in writables: try: resource.send(bytes('Hello from server!', encoding='UTF-8')) except OSError: clear_resource(resource) if __name__ == '__main__': # server_socket = get_non_blocking_server_socket() INPUTS.append(server_socket) print("server is running, please, press ctrl+c to stop") try: while INPUTS: readables, writables, exceptional = select.select(INPUTS, OUTPUTS, INPUTS) handle_readables(readables, server_socket) handle_writables(writables) except KeyboardInterrupt: clear_resource(server_socket) print("Server stopped! Thank you for using!")
新しいサーバーを起動して、コンソールを見てみましょう。
非同期サーバー出力
server is running, please, press ctrl+c to stop new connection from ('127.0.0.1', 56608) new connection from ('127.0.0.1', 56610) new connection from ('127.0.0.1', 56612) new connection from ('127.0.0.1', 56614) new connection from ('127.0.0.1', 56616) new connection from ('127.0.0.1', 56618) new connection from ('127.0.0.1', 56620) new connection from ('127.0.0.1', 56622) new connection from ('127.0.0.1', 56624) getting data: b'hello from client number 0' new connection from ('127.0.0.1', 56626) getting data: b'hello from client number 1' getting data: b'hello from client number 2'
結論からわかるように、新しい接続とデータをほぼ並行して受け入れます。 さらに、新しい接続からのデータは期待していません。 代わりに、新しいものを設定しています。
どのように機能しますか?
実際のところ、リソースを使用したすべての操作(およびソケットへのアクセスはこのカテゴリに属します)は、オペレーティングシステムのシステムコールを介して行われます。 要するに、システムコールはオペレーティングシステムAPIへの呼び出しです。
最初のケースと2番目のケースで何が起こるかを考えてください。
同期コール
写真を見てみましょう:
最初の矢印は、アプリケーションがオペレーティングシステムにアクセスしてリソースからデータを取得していることを示しています。 さらに、プログラムは目的のイベントまでブロックされます。 マイナスは明らかです。1つのスレッドがある場合、他のユーザーは現在のスレッドの処理を待つ必要があります。
非同期呼び出し
次に、非同期呼び出しを示す図を見てください。
最初の矢印は、最初の場合と同様に、リソースからデータを取得するようにOS(オペレーティングシステム)に要求します。 しかし、次に何が起こるか見てください。 リソースからのデータを待たずに作業を続けます。 私たちは何をしますか? OSの順序を指定し、すぐに結果を待たないでください。 最も簡単な答えは、データの可用性についてシステムを個別にポーリングすることです。 したがって、フローをブロックせずにリソースを利用できます。
しかし実際には、そのようなシステムは実用的ではありません。 常にデータを見て、ある種のイベントを待機するこのような状態は、アクティブ待機と呼ばれます。 マイナスは明らかです。情報が更新されない場合、CPU時間を浪費します。 より良い解決策は、ロックをそのままにして、「スマート」にすることです。 ストリームは特定のイベントを待っているだけではありません。 代わりに、彼は私たちのプログラムのデータの変更を期待しています。 これは、非同期サーバーでselect関数が機能する方法です。
これで、非同期サーバーの実装に戻り、新しい知識でそれを見ることができます。 最初に目を引くのは、作業方法です。 最初のケースでプログラムが「上から下へ」実行された場合、2番目のケースではイベントを操作します。 ソフトウェア開発におけるこのようなアプローチは、イベント駆動型開発と呼ばれます。
このアプローチが特効薬ではないことはすぐに注目に値します。 彼には多くの欠点があります。 第一に、このようなコードは維持および変更がより困難です。 第二に、すべてを台無しにするブロッキング呼び出しがあります。 たとえば、上記のプログラムでは、 印刷機能を使用しました。 実際、そのような関数はOSにもアクセスするため、実行のスレッドはブロックされ、他のデータソースは辛抱強く待っています。
おわりに
アプローチの選択は、解決する問題に直接依存します。 タスク自体が最も生産的なアプローチを選択するようにします。 たとえば、人気のあるJava WebサーバーTomcatはスレッドを使用します。 同様に人気のあるNginxサーバーは、非同期アプローチを使用します。 pythonで人気のあるgunicorn Webサーバーの作成者は、 preforkのパスに従いました。
最後まで記事を読んでくれてありがとう! 次回(まもなく)私たちのプログラムの生活の中で他の可能性のある非ブロッキングの状況についてお話します。 次の投稿でお会いできてうれしいです。