セロリ-分散ジョブキュー

今回、私たちは仕事で使用する素晴らしい製品について話すことにしました。 それはセロリについてです-「分散タスクキュー」。 これは、幅広い機能を備えた分散非同期ジョブキューです。 サイトビルダーでは、多くの場合、ユーザーへの応答という点で非同期のタスクを実行する必要があります。 残念なことに、この製品のhabrに関する情報はあまりありませんが、別の言及に値します。修正したいと思います。



だから、 セロリは何ができますか





に興味がありますか? 猫をお願いします。



ワーカーの構成から始めましょう。 これは、キューからタスクを実際に受信して実行するデーモンです。 推奨されるキューはRabbitMQですが、現時点では、MongoDBを使用してghettoqに限定しています。 RedisおよびRDBMSでもサポートされています。



celeryconfig.py:



CARROT_BACKEND = "ghettoq.taproot.MongoDB" BROKER_HOST = "xxx" BROKER_PORT = 27017 BROKER_VHOST = "celery" CELERY_SEND_TASK_ERROR_EMAILS = True ADMINS = ( ('Admin', 'admin@localhost'), ) CELERYD_MAX_TASKS_PER_CHILD = 5 CELERY_IMPORTS = ("tasks", ) CELERY_DISABLE_RATE_LIMITS = True CELERY_RESULT_BACKEND = "mongodb" CELERY_MONGODB_BACKEND_SETTINGS = { "host": "xxx", "port": 27017, "database": "celery", "taskmeta_collection": "my_taskmeta_collection", }
      
      







デーモンの実行:celeryd -l INFO -B

コンソールへのロギングを有効にし、オプション-Bで定期的なタスクデーモンを開始します。 後者はcelerybeatコマンドで個別に実行できます



次に、テストタスクを作成します。 構成では、タスクをインポートするため、タスクファイルはtasks.pyです。



 from celery.decorators import task from celery.decorators import periodic_task from celery.task.schedules import crontab @periodic_task(run_every=timedelta(seconds=60)) def mail_queue(): print "Task is executed every minute" @periodic_task(run_every=crontab(hour=0, minute=10)) def transactions(): print "Task is executed every day on 0:10" @task def delayed_function(id): some_function() @task def delayed_heavy_function(id): some_heavy_function()
      
      







したがって、タスクには4つのタスクがあります。 最初の2つは、 @periodic_taskデコレータでマークされています。 ただし、最後の2つはプログラムコードから直接呼び出されます。 このように:



 from tasks import delayed_function, delayed_heavy_function delayed_function.apply_async(args=[id], countdown=300) #    300  r = delayed_heavy_function.delay(id) #  (   ),   
      
      







次に、最後のタスクの結果と完了の事実を追跡するために、次のことを行います。



r.ready()#ジョブが機能した場合Trueを返します

r.result#実行された関数の値を返します。まだ実行されていない場合はNoneを返します(非同期で)

r.get()#タスクが完了するのを待ち、その結果を(同期的に)返します



変数rはcPickleを介して実行され、値をキャッシュに入れ、ajaxでタスクのステータスを問い合わせます。 または、タスクIDを取得して、キャッシュに入れることができます。 さらに、タスクIDを自分で設定できます。主なことは、それが一意であることです。



セロリを多用した後、ghettoqキューマネージャーでの遅延タスクに関連するいくつかのエラーを発見しましたが、githubで問題が作成された日に作成者によってすべて修正されました。



少し前に、バージョン2.0がリリースされましたが、これはdjangoに依存しなくなり、djangoとの統合は別のサブプロジェクトcelery-djangoに移動しました。



セロリの2つの制限を区別できます。より正確には、これらは単なる機能です。ワーカーは標準のFreeBSDでは動作しません。 pythonマルチプロセッシングはありませんが、ネットワーク上にセロリ用のカーネルを構築するためのレシピがあります。 タスクをオーバーロードするには、ワーカーを再起動して、タスクと関連関数の新しいpythonコードをロードする必要があります。 Linuxでうまく機能します。



All Articles