ちょっとした歴史
AsyncioはPythonバージョン3.4で登場し、3.5ではより見栄えの良いasync / await構文が追加されました。 Asyncioは、すぐに使えるイベントループ、フューチャー、タスク、コルーチン、I / O多重化、同期プリミティブを提供します。 もちろん、これは小さくはありませんが、本格的な開発には十分ではありません。 これにはサードパーティのライブラリがあります。 優れた選択はこちらです。 当社では、マイクロサービスを作成するために、asyncioとサードパーティライブラリのセットを使用しています。 その性質上、私たちのサービスはCPUよりもI / Oに重点を置いているため、asyncioは私たちにとって素晴らしいものです。
実際に事実
これはasyncioのチュートリアルではありません。 非同期I / Oが優れている理由、またはスレッドを使用しない理由については説明しません。 コルーチン、ジェネレーター、イベントループなどに関するストーリーはありません。 また、ベンチマークや他の言語との比較はありません。 行こう!
デバッグする
まず、PYTHONASYNCIODEBUG。 これは、デバッグモードを有効にする環境変数です。 たとえば、関数をコルーチンとして宣言したが、それを通常の関数として呼び出しているというメッセージを見ることができます(python3.4に関連)。 また、asyncioロガーを構成して、ResourseWarning出力をデバッグし、引き続き有効にする必要があります。 多くの興味深いことがわかります。トランスポートを閉じるのを忘れたメッセージまたはイベントループ自体(読み取り-リソースを解放するのを忘れた)。 次のコードの起動を、-WdefaultインタープリターパラメーターとPYTHONASYNCIODEBUG = 1環境変数を使用して、それらを使用せずに比較します(コード例の前後で、インポートや例外処理などの無関係な部分を省略します)。
@asyncio.coroutine def test(): pass loop = asyncio.get_event_loop() test()
正しい完成
リソースの解放といえば。 すべてのシャッフルの正常な完了、接続のクローズなどを待った後、イベントループを正しく停止できる必要があります。 run_until_complete()を使用して特別な問題がない場合、run_forever()を使用すると、すべてが少し複雑になります。 イベントループのclose()メソッドは、既に停止している場合にのみ呼び出すことができます。 stop()メソッドの後。 これは、信号を使用するのが最適です。
def handler(loop): loop.remove_signal_handler(signal.SIGTERM) loop.stop() loop = asyncio.get_event_loop() loop.add_signal_handler(signal.SIGTERM, handler, loop) try: loop.run_forever() finally: loop.close()
さらにコード例では、プログラムの正しい完成にではなく、本質に集中します。
ブロッキングコード
当然、すべてに非同期ライブラリがあるわけではありません。 一部のコードはブロックされたままなので、イベントループをブロックしないように何らかの方法で実行する必要があります。 これを行うために、イベントループでメインスレッドをブロックすることなく、組み込みプールのスレッドの1つで渡されたものを実行する適切なrun_in_executor()メソッドがあります。 すべて問題ありませんが、これには2つの問題があります。 まず、標準プールのサイズはわずか5です。次に、asyncioには、組み込みプールでこのように実行される同期DNSリゾルバがあります。 これは、同期操作が5つのスレッドのみのプールと、getaddrinfo()を実行する必要のあるすべてのスレッドで競合することを意味します。 方法は、プールを使用することです。 常に:
def blocking_function(): time.sleep(42) pool = ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) loop = asyncio.get_event_loop() loop.run_in_executor(pool, blocking_function) loop.close()
陰湿な未来
Futureには非常に興味深い機能が1つあります。例外が発生した場合、将来について明示的に尋ねない限り、そのことについて何も知りません。 ドキュメントには、このトピックに関する良い例があります。 gcが将来のオブジェクトを削除する場合にのみ、例外があったことがわかります。 ここから簡単なルールが続きます-常に未来の結果を確認してください。 あなたの考えによれば、将来のコードは無限ループでスピンするだけで、結果を確認する場所はないように見えますが、次のように例外を処理する必要があります:
async def handle_exception(): try: await bug() except Exception: print('TADA!') async def bug(): raise Exception() loop = asyncio.get_event_loop() loop.create_task(handle_exception()) loop.run_forever() loop.close()
awaitおよび__init __()
不可能。 __init __()マジックメソッドに非同期コードを含めることはできません。 2つの方法があります。 または、クラスを別のメソッドにします。たとえば、既にコルーチンになっているinitialize()などです。 これには初期化のためのすべての非同期コードが含まれ、オブジェクトの作成後に呼び出す必要があります。 ひどいですね。 したがって、ファクトリ関数を使用するのが慣例です。 例で説明します。
class Foo: def __init__(self, reader, writer, loop, *args, **kwargs): self._reader = reader self._writer = writer self._loop = loop async def create_foo(loop): reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop) return Foo(reader, writer, loop) loop = asyncio.get_event_loop() foo = loop.run_until_complete(create_foo(loop)) print(foo) loop.close()
ウェイクアップネオ
イベントループでスピンし、定期的にバッファをフラッシュするタスクがあるとします。 このコードを書くことができます:
async def flush_task(): while True: # flushing... await asyncio.sleep(FLUSH_TIMEOUT)
create_task()を作成します-そして、1つのことを除いてすべてがうまくいくようです:最後にバッファの内容を強制的にフラッシュする必要がある場合はどうすればよいですか? Taskuを「目覚めさせる」方法は? ここで、同期プリミティブが役立ちます。
class Foo: def __init__(self, loop, *args, **kwargs): self._loop = loop self._waiter = asyncio.Event() self._flush_future = self._loop.create_task(self.flush_task()) async def flush_task(self): while True: try: await asyncio.wait_for(self._waiter.wait(), timeout=FLUSH_TIMEOUT, loop=self._loop) except asyncio.TimeoutError: pass # flushing ... self._waiter.clear() def force_flush(): self._waiter.set() loop = asyncio.get_event_loop() foo = Foo(loop) loop.run_forever() loop.close()
テスト中
非同期コードのテストは可能であり、必要です。 これを行うには、同期コードの場合と同じくらい簡単です。
class TestCase(unittest.TestCase): def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(None) def tearDown(self): self.loop.close() def test_001(self): async def func(): self.assertEqual(42, 42) self.loop.run_until_complete(func())
テストは完全に分離されています。 新しいテストはそれぞれ独自のイベントループを使用します。 または、さらに進んで、便利なデコレータがあるpytestを使用できます。
インスピレーションの源
まず第一に、個人的な経験。 上記の多くは、「熊手釣り」の結果として実現され、その後、ドキュメントとソースの非同期を研究しました。 また、優れた例として、aiohttp、aioredis、aiopgなどの一般的なライブラリのソースがありました。
最後まで記事を読んでくれたみんなに感謝します。 asyncioで頑張ってください!