コルーチンを使用したイベント駆動型パラダイムのPython実装

この記事では、カスタムPythonジェネレーターを使用して、イベントの受信を切り替えるコルーチンの独自の実装を作成する方法について説明します。 結果として得られるモジュールのコードの単純さは、あなたを喜ばせ、そのようなジェネレーターを使用して取得できる言語の新しい、あまり使用されない機能を明確にします。 この記事は、 asynciotornadoなどの深刻な実装での配置方法を理解するのに役立ちます。



理論上の瞬間と免責事項



コルーチンの概念には非常に広い解釈があるので、実装でどのような特性を持つかを決定する必要があります。



その結果、コールバック関数と協調マルチタスクを使用しないイベント指向プログラミングが得られます。 このようなプログラミングパラダイムを使用する効果は、不均一なイベントに応答するタスクに対してのみ重要です。 まず第一に、これらはネットワークサーバー、ユーザーインターフェイスなどのI / O処理タスクです。別の可能なアプリケーションは、ゲームワールドのキャラクターの状態を計算するタスクです。 ただし、長時間の計算を行うタスクには、カテゴリとしては適切ではありません。

実行中のコルーチンがイベントを待機するために中断されていない間、他のすべては、予想されるイベントが既に発生していても停止状態にあることを明確に理解する必要があります。



すべての基礎



Pythonでは、リテラルおよび比sense的な意味で適切に準備されていれば、ジェネレーターはこれらすべての良い基礎となります。 より正確には、拡張ジェネレーターであり、そのAPIは最終的にPythonバージョン3.3で形成されました。 以前のバージョンでは、ジェネレーターの完了時の値(結果)の戻りは実装されておらず、1つのジェネレーターを別のジェネレーターから呼び出す便利なメカニズムがありませんでした。 それにも関わらず、コルーチンの実装は以前のものでしたが、通常のジェネレーターの制限により、私たちが得るほど「美しい」ものではありませんでした。 このテーマに関する非常に優れた記事「コルーチンと並行性に関する好奇心Course盛なコース」の唯一の欠点は、更新されたバージョンがないことです。 Pythonでのコルーチンの実装が、特にEnhanced Python Generators APIで、言語の最新のイノベーションを使用する場合。 以下は、必要な拡張ジェネレーターの機能です。

コルーチンへのメッセージの送信は、ジェネレーターの状態を設定する機能に基づきます。 以下のコードを、実行中のPythonインタープリターバージョン3.3以降のウィンドウにコピーします。

def gen_factory(): state = None while True: print("state:", state) state = yield state gen = gen_factory()
      
      





ジェネレーターが作成されました。起動する必要があります。

 >>> next(gen) state: None
      
      





初期状態が受信されます。 状態を変更します。

 >>> gen.send("OK") state: OK 'OK'
      
      





状態が変更され、結果として返されたことがわかります。 次の送信呼び出しは、すでに送信している状態を返します。



なぜこれがすべて必要なのですか?



タスクを想像してください。2秒に1回ペトロフに、3秒に1回、Ivanovに、5秒に1回、全世界に挨拶を送信します。 Pythonコードの形式では、次のようなものを想像できます。

 def hello(name, timeout): while True: sleep(timeout) print(", {}!".format(name)) hello("", 2.0) hello("", 3.0) hello("", 5.0)
      
      





よさそうですが、ペトロフだけが挨拶を受け取ります。 しかし! コードの明瞭さには影響を与えないが、その逆も同様である小さな変更-私たちのアイデアを明確にし、これはすでに期待どおりに機能します。

 @coroutine def hello(name, timeout): while True: yield from sleep(timeout) print(", {}!".format(name)) hello("", 2.0) hello("", 3.0) hello("", 5.0) run()
      
      





コードは、Pythonのスタイルのスタイルで判明しました。コールバックがなく、オブジェクトに不必要な飾り付けがなく、その中のコメントは不要です。 コルーチンデコレータ、そのバージョンのスリープ機能、および実行機能を実装するためだけに残ります。 実装では、もちろん、付加機能なしでは機能しません。 しかし、これは、ライブラリモジュールのファサードの背後にあるすべての魔法を隠すためのPython的な方法でもあります。



最も興味深い



私たちは実装を控えめに-並行性、意味を持ち、実際に協調マルチタスクの実装であるという事実を反映して、モジュールを呼び出します。 デコレーターが通常の関数からジェネレーターを作成し、それを開始する必要があることは明らかです(次への最初の呼び出しを行う)。 言語構成からのyieldは、呼び出しを次のジェネレーターに転送します。 つまり、スリープ関数は、すべての魔法を隠すことができるジェネレーターを作成する必要があります。 受信したイベントのコードのみが、それを発生させたジェネレーターに返されます。 ここでは、返された結果は処理されません。ここのコードは本質的に1つの結果しか取得できません。つまり、タイムアウトが期限切れになります。 I / Oを待機すると、たとえば(読み取り/書き込み/タイムアウト)のさまざまなタイプのイベントを返すことができます。 さらに、sleep型の関数によって生成されたジェネレーターは、どのデータ型からもyieldを返すことができるため、それらの機能はイベントの待機に限定されない場合があります。 run関数はイベントディスパッチャを起動します。そのタスクは、外部からイベントを受信するか、内部で生成するか、受信者を決定して実際に送信することです。

デコレータから始めましょう:

 class coroutine(object): """       .""" _current = None def __init__(self, callable): self._callable = callable def __call__(self, *args, **kwargs): corogen = self._callable(*args, **kwargs) cls = self.__class__ if cls._current is None: try: cls._current = corogen next(corogen) finally: cls._current = None return corogen
      
      





約束どおり、典型的なトリックであるクラスの形式で作成され、ジェネレーターを作成して実行します。 _currentを含む構造は、それを作成する装飾された関数が別のジェネレーターの本体内で呼び出された場合に、ジェネレーターの起動を回避するために追加されました。 この場合、最初の呼び出しが行われます。 また、どのジェネレーターにイベントを送信する必要があるかを把握して、スリープ関数によって作成されたジェネレーターへのチェーンを取得するのにも役立ちます。

 def sleep(timeout): """     " ".""" corogen = coroutine._current dispatcher.setup_timeout(corogen, timeout) revent = yield return revent
      
      





ここでdispatcher.setup_sleepの呼び出しを確認します。これは、timeoutパラメーターで指定された秒数が経過した後、ジェネレーターがタイムアウトイベントを待機していることをイベントマネージャーに通知します。

 from collections import deque from time import time, sleep as sys_sleep class Dispatcher(object): """   .""" def __init__(self): self._pending = deque() self._deadline = time() + 3600.0 def setup_timeout(self, corogen, timeout): deadline = time() + timeout self._deadline = min([self._deadline, deadline]) self._pending.append([corogen, deadline]) self._pending = deque(sorted(self._pending, key=lambda a: a[1])) def run(self): """   .""" while len(self._pending) > 0: timeout = self._deadline - time() self._deadline = time() + 3600.0 if timeout > 0: sys_sleep(timeout) while len(self._pending) > 0: if self._pending[0][1] <= time(): corogen, _ = self._pending.popleft() try: coroutine._current = corogen corogen.send("timeout") except StopIteration: pass finally: coroutine._current = None else: break dispatcher = Dispatcher() run = lambda: dispatcher.run()
      
      





イベントディスパッチャコードも異常ではありません。 イベントを送信する場所は、クラス変数coroutine._currentを使用して決定されます。 モジュールがロードされると、クラスのインスタンスが動作する実装で作成されます。もちろん、これはシングルトンでなければなりません。 リストの代わりにcollections.dequeクラスが使用されます。popleftメソッドの方が高速で便利だからです。 さて、これですべてです。特別な魔法はありません。 実際、すべては高度なPythonジェネレーターの実装でさらに深く隠されています。 正しく調理することしかできません。



ファイル:concurrency.py
 # concurrency.py from collections import deque from time import time, sleep as sys_sleep class coroutine(object): """       .""" _current = None def __init__(self, callable): self._callable = callable def __call__(self, *args, **kwargs): corogen = self._callable(*args, **kwargs) cls = self.__class__ if cls._current is None: try: cls._current = corogen next(corogen) finally: cls._current = None return corogen def sleep(timeout): """     " ".""" corogen = coroutine._current dispatcher.setup_timeout(corogen, timeout) revent = yield return revent class Dispatcher(object): """   .""" def __init__(self): self._pending = deque() self._deadline = time() + 3600.0 def setup_timeout(self, corogen, timeout): deadline = time() + timeout self._deadline = min([self._deadline, deadline]) self._pending.append([corogen, deadline]) self._pending = deque(sorted(self._pending, key=lambda a: a[1])) def run(self): """   .""" while len(self._pending) > 0: timeout = self._deadline - time() self._deadline = time() + 3600.0 if timeout > 0: sys_sleep(timeout) while len(self._pending) > 0: if self._pending[0][1] <= time(): corogen, _ = self._pending.popleft() try: coroutine._current = corogen corogen.send("timeout") except StopIteration: pass finally: coroutine._current = None else: break dispatcher = Dispatcher() run = lambda: dispatcher.run()
      
      







ファイル:sample.py
 # sample.py from concurency import coroutine, sleep, run @coroutine def hello(name, timeout): while True: yield from sleep(timeout) print(", {}!".format(name)) hello("", 2.0) hello("", 3.0) hello("", 5.0) run()
      
      









アウトロ



トピックが興味深い場合は、例として非同期TCP Echoサーバーを使用してI / Oイベントの期待を実装することに向かって続けることができます。 別の言語で記述された動的ライブラリとして実装された実際のイベントディスパッチャにより、Python言語よりも高速です。



All Articles