Pythonでの効果的なマルチスレッド

通常のPythonから多数のhttp要求やその他のI / Oタスクを効率的に実行する方法に関する簡単なレシピを共有したいと思います。 実行できる最も正しいことは、Tornadoesやgeventなどの非同期フレームワークを使用することです。 ただし、イベントループを既存のプロジェクトに統合するのは問題があるため、このオプションは適切でない場合があります。



私の場合、すでにDjangoアプリケーションがあり、そこから月に1回、非常に小さなファイルをAWS s3にアップロードする必要がありました。 時間が経つと、ファイルの数は5万に近づき、それらをアップロードするのは面倒になりました。 ご存知のように、s3は単一のPUTリクエストでの複数の更新をサポートしていません。実験的に確立された同じデータセンター内のec2サーバーからのリクエストの最大速度は、毎秒17を超えません(ちなみに小さくありません)。 したがって、5万個のファイルの更新時間が1時間に近づき始めました。



幼少期から、Pythonプログラマーは、グローバルインタープリターロックのためにスレッド(オペレーティングシステムスレッド)を使用する意味がないことを知っています。 しかし、他のロックと同様に、このロックも時々解放されるとは限りません。 特に、これはネットワーク操作を含む入出力操作中に発生します。 これは、スレッドを使用してhttp要求を並列化できることを意味します。一方のスレッドが応答を待機している間、他方のスレッドは前のスレッドの結果を静かに処理するか、次のスレッドを準備します。



リクエストを実行するスレッドのプールが必要なだけであることがわかりました。 幸いなことに、そのようなプールはすでに作成されています。 バージョン3.2以降、すべての非同期作業を統合するために、 concurrent.futures



ライブラリがPythonに登場しました。 Pythonの2番目のバージョンには、 futuresと呼ばれるバックポートがあります。 コードはいシンプルです:



 from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(concurrency) as executor: for _ in executor.map(upload, queryset): pass
      
      





ここで、 concurrency



はワーカースレッドの数、 upload



はタスク自体を実行する関数、 queryset



はタスクに一度に1つずつ渡されるオブジェクトの反復子です。 すでに150の同時実行性を持つこのコードは、Amazonサーバーで1秒あたり約450リクエストを処理できます。



ここで、タスクに関する注意が必要です。それらはスレッドセーフでなければなりません。 つまり 並行するいくつかのタスクは共有リソースを持たないか、適切に管理する必要があります。 ここでは、インタープリターのグローバルロックは悪いヘルパーです。スレッドが最も不適切な場所で中断されないことを保証するものではありません。 urllib3、要求、またはbotoのみを使用する場合、心配することはありません。それらは既にスレッドセーフです。 他のライブラリを明確にする必要があります。 また、独自のコードはスレッドセーフです。



時間が経つにつれて、ファイルの数は20万に近づき始めました。 20万個のDjangoモデルを占有できるメモリ量はどのくらいだと思いますか? 20万先物はどうですか? そして20万タスク? ギガバイトについてすべて一緒に。 すべてを一度にエグゼキュータに送信することはオプションではないことが明らかになりました。 しかし、前のタスクの最後に新しいタスクを追加してみませんか? 最初に、スレッドの数に等しいタスク数を追加し、設定されているタスクの数、完了したタスクの数を追跡します。 先物を保管せず、配りません。 再利用可能な非常にクールな関数であることがわかります(これは最終バージョンではありません)



 from concurrent.futures import ThreadPoolExecutor, Future def task_queue(task, iterator, concurrency=10): def submit(): try: obj = next(iterator) except StopIteration: return stats['delayed'] += 1 future = executor.submit(task, obj) future.add_done_callback(upload_done) def upload_done(future): submit() stats['delayed'] -= 1 stats['done'] += 1 executor = ThreadPoolExecutor(concurrency) stats = {'done': 0, 'delayed': 0}  for _ in range(concurrency): submit() return stats
      
      





ただ3つのアクションがあります:イテレータから次のオブジェクトを選択し、そのタスクを作成するsubmit



関数、タスクの最後に呼び出されて次のオブジェクトを設定するupload_done



、および最初のタスクが設定されるループです。 実行してみてください:



 stats = task_queue(upload, queryset.iterator(), concurrency=5) while True: print '\rdone {done}, in work: {delayed} '.format(**stats), sys.stdout.flush() if stats['delayed'] == 0: break time.sleep(0.2)
      
      





すばらしい、うまくいきます! クエリセットのiterator



メソッドを既に使用していiterator



executor.map



関数を使用した最初の例で使用できるようですが、 executor.map



すぐにイテレーター全体を選択し、役に立たないようにします。 すぐに、作業スレッドごとに1つのオブジェクトが実際に選択されます。



確かに問題があります。スレッドの数を増やすと、例外「ValueError:generator already running」が流れ込み始めます。 コードはすべてのスレッドから同じジェネレーターを使用するため、遅かれ早かれ2つのスレッドが同時に値を選択しようとします(実際、これは2つのスレッドしかない場合に起こりますが、確率は低くなります)。 同じことがカウンターにも当てはまり、遅かれ早かれ2つのプロセスが同時に同じ値をカウントし、両方が1つと両方を追加して、「初期番号+ 2」ではなく「初期番号+ 1」を書き込みます。 したがって、共有オブジェクトに関するすべての作業はロックでラップする必要があります。



他にも問題があります。 タスクの実行中に発生する可能性のあるエラーの処理はありません。 ctrl + cを使用して実行を中断すると、メインスレッドで例外がスローされ、残りは最後まで実行を継続します。したがって、キューを強制的に終了させるメカニズムです。 エグゼキューターにはこれらの目的のためのシャットダウンメソッドがあり、ユーザーがctrl + cを押したときにエグゼキューターを外側に与えて停止することが可能です。 しかし、より良いオプションがあります。すべての作業の最後に解決するフューチャーを作成し、外部の誰かがキャンセルした場合にエグゼキューターをクリーンアップできます。 これらのエラーをすべて考慮に入れたバージョンは次のとおりです。



 def task_queue(task, iterator, concurrency=10, on_fail=lambda _: None): def submit(): try: obj = next(iterator) except StopIteration: return if result.cancelled(): return stats['delayed'] += 1 future = executor.submit(task, obj) future.obj = obj future.add_done_callback(upload_done) def upload_done(future): with io_lock: submit() stats['delayed'] -= 1 stats['done'] += 1 if future.exception(): on_fail(future.exception(), future.obj) if stats['delayed'] == 0: result.set_result(stats)  def cleanup(_): with io_lock: executor.shutdown(wait=False)  io_lock = threading.RLock() executor = ThreadPoolExecutor(concurrency) result = Future() result.stats = stats = {'done': 0, 'delayed': 0} result.add_done_callback(cleanup) with io_lock: for _ in range(concurrency): submit()  return result
      
      





ここでは、リエントラントロックを使用する必要があります。これは、ハンドラがadd_done_callback



でハングする前に非常に短いタスクが完了する可能性があるため、同じスレッドでハンドラがすぐに実行され、ロックを再度キャプチャしようとするためです。 デッドロックが判明します。 再入可能ロックを使用すると、最初にそれをキャプチャしたスレッドが再び落ち着くことができますが、最初のスレッドがキャプチャした回数だけ解放するまで、別のスレッドから自身を取得することはできません。 このタスクキューを使用するコードは少し変更されます。



 from concurrent.futures import ThreadPoolExecutor, Future, TimeoutError  result = task_queue(upload, queryset.iterator(), concurrency=5) try: while not result.done(): try: result.result(.2) except TimeoutError: pass print '\rdone {done}, in work: {delayed} '.format(**result.stats), sys.stdout.flush() except KeyboardInterrupt: result.cancel() raise
      
      





200ミリ秒ごとに愚かに眠りに落ちる必要はもうありません。キューの完了を待ってスマートに眠りに落ちることができます。 中断した場合は、キューを停止します。



暗くなってきました。 時間が経つにつれて、ファイルの数は150万に近づき始めました。 すべてが一定のメモリ消費量で動作するように見えたという事実にもかかわらず(スレッド、フューチャ、およびDjangoモデルの数は実行中に変化しないはずです)、メモリ消費量は依然として増加し続けていました。 queryset.iterator()



が期待どおりに機能しないことが判明しました。 オブジェクトは、イテレータから明示的に選択された場合にのみ実際に作成されますが、未処理のデータベースの応答は、ドライバによってすぐにレイクされます。 100万行あたり約500メガバイトになります。 この問題の解決策は非常に明白です。すべてのオブジェクトに一度に要求するのではなく、部分を共有する必要があります。 同時に、LIMIT 100 OFFSET 200000という形式のクエリは、DBMSが200100レコードを超える必要があることを実際に意味するため、オフセットサンプリングは避ける必要があります。 シフトする代わりに、インデックス付きのフィールドによるサンプリングを使用する必要があります。



 def real_queryset_iterator(qs, pk='pk', chunk_size=5000): qs = qs.order_by(pk)  chunk = list(qs[:chunk_size]) while chunk: for item in chunk: yield item last_pk = getattr(chunk[-1], pk) chunk = list(qs.filter(**{pk + '__gt': last_pk})[:chunk_size])
      
      





ここで、pkはプライマリよりもページネーションキーです。 ただし、多くの場合、プライマリはこの役割に適しています。 このようなイテレータは、実際には一定量のメモリを消費し、一度にフェッチするよりも遅くなりません。 ただし、スレッド数を増やすと、別の問題が発生します。 Jungでは、データベース接続はスレッドに対してローカルであるため、次のスレッドが要求を行うと、新しい接続が作成されます。 遅かれ早かれ、接続数がクリティカル数に達し、これに類似した例外が発生します。



 OperationalError: FATAL: remaining connection slots are reserved for non-replication superuser connections
      
      





正しい解決策は、すべてのスレッドに同じ接続を使用することです。 異なるスレッドから同時にリクエストを行う機能はすでに制限されています。 Dzhangにはこのための標準ツールはありませんが、これは、ハッキングを使用してthreading.local



オブジェクトを通常のオブジェクトに置き換えることで実行できます。



 from django.db import connections, DEFAULT_DB_ALIAS connections._connections = type('empty', (object,), {})() connections[DEFAULT_DB_ALIAS].allow_thread_sharing = True
      
      





ただし、これはアプリケーションの残りの部分でデータベースのスレッドセーフを無効にすることを理解する必要があるため、このオプションはコンソールから起動されたコマンドにのみ適しています。 より人道的なオプションは、各要求の後、または各要素の後に接続を閉じることです。これにより、オーバーヘッドはそれほど大きくなくなります。



 def close_connection_iterator(iterator, db=DEFAULT_DB_ALIAS): for item in iterator: connections[db].close() yield item result = task_queue( upload, close_connection_iterator(real_queryset_iterator(queryset)), concurrency=150 )
      
      





3番目の解決策があります。データベースと通信する別のスレッドを使用して、オブジェクトを残りのスレッドに渡します。 このオプションは、アプリケーションの残りの部分を破壊することはなく、接続を絶えず再開するオーバーヘッドを導入しません。 しかし、その実装は非常に複雑であり、別の記事にすぎません。



おそらくより多くの時間が経過し、ファイルの数は1,000万に増加し、新しい問題が発生します。 しかし今のところ、主な問題は、そのような更新には約8時間かかり、現在のAmazon価格でのPUTリクエストに対してのみ50ドルかかるということです。



読み取りからのいくつかの論文:

  1. PythonのI / Oのスレッドはうまく機能しますが、分離には注意する必要があります。
  2. メモリ消費を監視しながら、数十、数十万のタスクを非常に慎重に実行する必要があります。
  3. Dzhangovskoy ORMのqueryset.iterator()



    は、期待どおりに機能しません。


githubのヘルパーtask_queue



およびreal_queryset_iterator





https://gist.github.com/homm/b8caf60c11997da69b1e



All Articles