セロリ:ベストプラクティス

Djangoを使用する場合、開発のある段階で、長時間実行されるタスクのバックグラウンド処理が必要になる場合があります。 この種のタスクでは、何らかの種類のツールを使用してタスクキューを管理している可能性があります。 Celeryは、現時点でpythonとDjangoの世界で同様の問題を解決するための最も人気のあるプロジェクトの1つですが、この目的のための他のプロジェクトがあります。



Celeryを使用してタスクキューを管理するプロジェクトに取り組んでいたときに、文書化することにしたベストプラクティスのいくつかが明らかになりました。 しかし、これらは、このような問題を解決するための適切なアプローチについて、またCeleryプロジェクトのコミュニティが提供する未利用の機会について、私が考えるものに対する大きな言葉です。



No.1:DBMSをAMQPブローカーとして使用しないでください



(Celeryのドキュメントに記載されている制限に加えて)これが間違っていると思う理由を説明しましょう。



DBMSは、RabbitMQなどの本格的なAMQPブローカーによって実行されるタスク向けには開発されていません。 トラフィック/ユーザーベースがそれほど多くないプロジェクトでも、「戦闘」状態になります。



人々がDBMSを使用することを決定する最も一般的な理由は、原則として、すでにWebアプリケーション用に1つのDBMSを持っているため、再度使用しないことです。 このオプションの使用は簡単で、他のコンポーネント(RabbitMQなど)について心配する必要はありません。



あまり仮説的ではないシナリオを想定してください。データベースに入れる処理のために、4人のバックグラウンドワーカーがいます。 これは、新しいタスクのデータベースを頻繁に要求する4つのプロセスを取得することを意味します。もちろん、それぞれが独自の競合スレッドを持つことができるという事実は言うまでもありません。 ある時点で、タスクの処理の遅延が増大しているため、完了よりも新しいタスクが多いことを理解しているため、ワーカーの数を増やす必要があります。 ワーカーからデータベースへの膨大な数のリクエストにより、データベースの速度が突然「沈み」始め、ディスクの入力/出力が指定された制限を超えます。これは、ワーカーが実際にデータベースに対してDDOS攻撃を開始するため、アプリケーションに影響を与え始めます。



本格的なAMQPブローカーを使用している場合、キューはメモリに配置され、ハードディスクの高負荷が排除されるため、これは起こりません。 キューにはワーカーに新しいタスクを配信するメカニズムがあるため、コンシューマ(ワーカー)が頻繁に情報を要求する必要はありません。AMQPブローカーが他の理由で過負荷になっても、ユーザーと対話するWebアプリケーションのクラッシュやブレーキは発生しません。 。



さらに進んで、開発プロセス中であっても、Dockerや、カスタマイズされたRabbitMQをそのまま提供する多くの事前構成済みイメージがある場合でも、DBMSをブローカーとして使用しないでください



No.2:より多くのキューを使用します(つまり、デフォルトでは1つだけではありません)



Celeryの使用は非常に簡単で、すぐに1つのデフォルトキューを提供します。このキューには、別のCeleryの動作が明示的に指定されるまで、すべてのタスクが配置されます。 表示されるものの最も一般的な例:



@app.task() def my_taskA(a, b, c): print("doing something here...") @app.task() def my_taskB(x, y): print("doing something here...")
      
      







celeryconfig.pyファイルで特に指定されていない限り、両方のタスクが同じキューに配置されるとどうなりますか。 私はこのアプローチがどのように正当化できるかを完全に理解しています。あなたは便利なバックグラウンドタスクを作成するデコレーターを持っています。 ここで、同じキュー内でtaskAとtaskBが完全に異なることを実行できるという事実に注意を喚起したいと思います。したがって、一方が他方よりもはるかに重要になる可能性があります。 ワーカーが1人の場合でも、重要度の低いtaskBタスクが非常に大きくなり、重要度の高いtaskAタスクが必要な注意を払えなくなる状況を想像してください。これは次のポイントにつながります。



No.3:ワーカーの優先順位を使用する



上記の問題を解決することにより、taskAタスクを1つのキューに配置し、taskBを別のキューに配置してから、x個のワーカーをQ1キューに割り当て、残りをQ2処理に割り当てます。 したがって、taskBが十分なワーカーを受け取り、残りは、長い待機と処理を引き起こすことなく、優先度の低いタスクを処理します。 したがって、自分でキューを決定します。



 CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'), Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'), )
      
      





そして、タスクをどこに送るかを決定するルーター:

 CELERY_ROUTES = { 'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'}, 'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'}, }
      
      





これにより、ワーカーは各タスクを完了できます。

 celery worker -E -l INFO -n workerA -Q for_task_A celery worker -E -l INFO -n workerB -Q for_task_B
      
      





No.4:エラーを処理するためにセロリのメカニズムを使用する



私が見たタスクのほとんどには、エラー処理メカニズムがありません。 タスクでエラーが発生すると、クラッシュします。 これは一部のタスクには便利かもしれませんが、私が見たほとんどのタスクは外部APIと対話し、ある種のネットワークエラーやその他の「リソースの可用性」の問題によりクラッシュしました。 このようなエラーを処理する最も簡単な方法は、おそらく外部APIとの対話の問題が既に修正されているため、タスクコードをいっぱいにすることです。



 @app.task(bind=True, default_retry_delay=300, max_retries=5) def my_task_A(): try: print("doing stuff here...") except SomeNetworkException as e: print("maybe do some clenup here....") self.retry(e)
      
      





タスクのデフォルトの待機時間を定義するのが好きです。タスクが再び実行を試みる前に待機し、それを超える試行回数が最終的にエラーをスローするまでにかかります(それぞれdefault_retry_delayおよびmax_retries)。 これは、私が想像できる最も単純な形式のエラー処理ですが、実際には使用されないことがわかりました。 もちろん、Celeryのドキュメントで説明されているように、Celeryにはより洗練されたエラー処理方法があります。



5番:花を使う



Flowerは、タスクとCeleryワーカーのステータスを監視するための優れたツールです。 このツールにはWebインターフェイスがあり、次のようなことができます。



機能の完全なリストは、提供されているリンクで確認できます。



No.6:必要な場合にのみタスクのステータスを追跡する



タスクのステータスは、タスクが正常に完了したかどうかに関する情報です。 一部の統計指標に役立つ場合があります。 この場合に理解しておくべき重要なこと:タスクのステータスは、結果のデータや実行した作業ではありません。そのような情報は、データベースに書き込まれた暗黙的な変更(たとえば、ユーザーの友達リストの変更など)に最も似ています。



私が見たほとんどのプロジェクトでは、デフォルトで提供されているsqliteデータベースを使用して、PostgreSQLのような大きなDBMSを使用して、完了後のタスクのステータスに関するデータを実際に気にしませんでした。 アプリケーションデータベースをロードするのはなぜですか? celeryconfig.py設定ファイルでCELERY_IGNORE_RESULT = Trueを使用し、そのようなデータを破棄します。



No.7:タスクにデータベースオブジェクト\ ORMを渡さない



地元のpython開発者グループの会議で上記について議論した後、一部の人々はリストに余分なアイテムを追加することを提案しました。 彼は何について話しているのですか? ユーザーモデルなどのデータベースオブジェクトをバックグラウンドタスクに転送しないでください。シリアル化されたオブジェクトには、既にシリアル化された不正なデータが表示される可能性があるためです。 必要な場合は、ユーザーIDをタスクに渡し、タスク自体でこのユーザーに関するデータベースを要求します。



All Articles