Celeryを使用しないdjangoプロジェクトでのRabbitMQの使用、およびCelery 3.0の新機能

ほとんどのPythonプログラマーは、すでにある程度Celeryに精通していると思います。 最初の部分ではセロリなしでRabbitMQを使用する方法を説明し、2番目の部分ではセロリ3.0の新しい機能の概要を説明します。

Django-Celery-RabbitMQバンドルのインストールについては、 こちらをご覧ください

RabbitMQの使用については、RabbitMQのWebサイトに、また、 こちらに詳しく記載されています。



インストールと構成を簡単に思い出してください。

RabbitMQ:

sudo apt-get install rabbitmq-server





ユーザーを追加:

 $ rabbitmqctl add_user myuser mypassword $ rabbitmqctl add_vhost '/' $ rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
      
      





簡単に言うと、セロリ、RabbitMQの設定:

settings.pyで

 import djcelery os.environ["CELERY_LOADER"] = "django" djcelery.setup_loader() AMQP_HOST = 'localhost' BROKER_HOST='localhost' BROKER_PORT = 5672 BROKER_VHOST = "/" BROKER_USER = "myuser" BROKER_PASSWORD = "mypassword" INSTALLED_APPS+='djcelery'
      
      





ステートメント:1つの小さなタスクを非同期にするために、セロリを使用する必要はまったくありません。 RabbitMQで取得できます。

証明:

反対から始めましょう:

タスク:指定された送信者からの手紙があるかどうか電子メールを確認します。手紙がない場合は、1分以内に確認を繰り返します(ある場合)。

poplib、電子メールを使用します。

事前に指定された送信者からメールを受信し、タスクデコレータでラップする関数を作成します

この関数は、メールの送信元であるメールアドレス、パスワード、メールアドレスを受け取り、ステータス(Ok、エラー)とメッセージを返します。

tasks.pyで

 from celery.task import task, periodic_task from celery.task.schedules import crontab import poplib import email @task def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return 'Error', 'Email is blocked' try: print p.list() except: return 'Error', 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_email_from = m['From'] if this_email_from.find(mail_from) >= 0: print this_email_from m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return 'Ok', content else: pass except Exception, e: return 'Error', unicode(e, 'utf8') raise mail_content.retry(exc=e, countdown=30)
      
      





コードの最後の行は、メッセージが見つからなかった場合に30秒後にタスクを再開することを説明しています。

これで、次のようにタスクを実行できます。

 >>>res = mail_content.delay('user@domen', 'password', 'email_from@domen.email.from')
      
      





この場合、実行はすぐに開始されるか、次のようになります。

 >>>res = mail_content.apply_async(('user@domen', 'password', 'email_from@domen.email.from'), countdown=30)
      
      





この場合、実行は30秒後に開始されます。

(まず、セロリサーバーを起動する必要があります。

python manage.py celeryd

そして、別のウィンドウでシェルを実行します:

python manage.pyシェル、

そして、すでにシェルからこれらのコマンドを呼び出すようになりました)

結果を得るには

 >>>res.get() () >>>res.info
      
      





(結果がまだない場合はNoneを返し、結果がある場合は結果を返します)

しかし、結果があるかどうかを確認することは必ずしも便利ではなく、常に不必要なアクションを実行することを意味します。

タスクの完了後に関数を呼び出すために、コールバックを実装できます。 セロリがインストールされていて、結果を受け入れる機能をタスクにできる場合は、次のサブセクションに進むことができます。 セロリなしでやりたいのは、ピカとrabbitMQに基づいてコールバックを整理する方法です。

AMQPを使用するには、pikaパッケージをインストールします。

 $ sudo pip install pika==0.9.5
      
      





このライブラリとここで説明する RabbitMQを使用したHello Worldの詳細

decorators.pyで:

 import pika import pickle import time importr settings def callback(function_to_decorate): user = settings.BROKER_USER broker_host = settings.BROKER_HOST password = settings.BROKER_PASSWORD credentials = pika.PlainCredentials(user, password) parameters = pika.ConnectionParameters(host=broker_host, credentials=credentials) def receiver(*args, **kw): (backend_function, data) = function_to_decorate(*args, **kw) pickled_obj = pickle.dumps(data) queue_name = str(time.time()) print "call_backend", backend_function.__name__ connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare( queue = queue_name) channel.basic_publish(exchange='', routing_key=queue_name, body=pickled_obj) channel.basic_consume( backend_function, queue=queue_name, no_ack = True) channel.queue_delete(queue=queue_name) connection.close() return receiver
      
      





これは、mail_content関数をラップするデコレータです(!) タスクデコレータをラップする

デコレータは、rabbitmqにメッセージを送信するための指示を追加したmail_content関数を返します

tasks.pyの関数全体を書き換えるのではなく、単に変更します

tasks.pyで:

 from decorators import * from tasks_backend import mail_analizer, mail_error @task @callback def mail_content(...): ... if (...): ... return mail_analizer, (content,) return mail_error, ('error',)
      
      





最初の引数として関数を返します。2番目は関数に渡す引数のリストです

tasks_backend.pyで

 import tasks def mail_analizer(ch, method, properties, body): email_text = pickle.loads(body) if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text)
      
      





電子メールを受け入れ、認識し、新しいタスクを開始しました。

引数はあまり便利ではないことに注意してください、これを修正してください:

decorators.pyで

 def backend(function_to_decorate): def receive(ch, method, properties, body): data=pickle.loads(body) args = data function_to_decorate(*args) return receive
      
      





これで、mail_analizer関数を次のように書き換えることができます。

 @backend def mail_analizer(email_text): if emai_text.find(u'Hello'): tasks.send_emails.delay(email_text) else: tasks.send_twitter_status.delay(email_text)
      
      





以下の機能を実行するには、デコレーターを使用します
 @callback
      
      



mail_contentと同じ:

 @backend @callback def mail_analizer(cont): print cont return send_twitter_status, (cont,)
      
      





このインターフェイスを使用して一連の関数を構築する簡単な例:

 @callback def first(*args): print first.__name__ print args return senders, args @backend @callback def senders(*args): print args return analizer, args @backend @callback def analizer( *args): print args return ended_fun, args @backend def ended_fun(*args): print ended_fun.__name__ print args
      
      





最初の関数は、デコレータによってのみラップされます
 @callback
      
      



なぜなら 彼女はウサギから何も奪いません
 @backend
      
      



-なぜなら 彼女は何も送信しません。

関数はそれ自体を呼び出すことができることに注意してください。 また、バックエンドデコレータによってラップされた関数は、rabbitmqからのみ呼び出すことができます。

まず、コールバックのみでラップされる関数を使用します。

 @callback def runer(*args): return test_func, (args) @backend @callback def test_func( *args): print args return test_func, args
      
      





関数mail_content、email_analizer、run_emailの最終バージョン:

 @backend @call_backend def mail_content(user_mail, mail_pass, mail_from): mail_server = 'pop.'+user_mail.split('@')[1] mail_login = user_mail.split('@')[0] p = poplib.POP3(mail_server) print p.getwelcome() try: p.user(mail_login) p.pass_(mail_pass) except poplib.error_proto: return mail_error, 'Email is blocked' try: print p.list() except: return mail_error, 'dont receive list of mails' numMessages = len(p.list()[1]) print numMessages for i in range(numMessages): m = email.message_from_string("\n".join(p.top(i+1, 1)[1])) try: this_from = m['From'] this_from = this_from.decode('cp1251').split('<'.decode('cp1251'))[1] if this_from.find(mail_from) >= 0: print m['From'] m = email.message_from_string('\n'.join(p.retr(i+1)[1])) content = get_text_body(m) print content return email_analizer, (content, email_from) else: pass except Exception, e: return email_error, (unicode(e, 'utf8'),) return mail_content, (user_mail, mail_pass, mail_from) @backend @call_backend def email_analizer(content, email_from): if content.find(u'Hello'): email_to = email_from text=u'Hello, my dear friend' return send_mail, (email_to, text) return send_twitter_status, (cont,) @call_backend def run_email(): '''  , , email, password, email_from ''' return mail_content, (email, password, email_from)
      
      





小計:
複雑なものは何もないといいのですが。 たとえば、1つの小さなタスク(タスク)がある場合は、セロリの代わりに使用できます。



セロリ3.0でこれを行う方法





セロリ3.0では、タスクの結果をタスク(タスク)に渡すタスクの名前を渡すことができます。

ドキュメントの例:

 @celery.task def add(x, y): return x + y add.apply_async((2, 2), link=add.s(16))
      
      





ここで、addはタスク(task)、add.sはサブタスク(subtask)であり、add(2、2)の実行後に開始し、サブタスクの最初の引数はadd(2、2)の結果、2番目の引数は16になります。合計は(2 + 2)+ 16 = 20。 ここでのサブタスクとは

タスクに関連して、mail_analizerタスク関数を作成し、引数を1つ残します-コンテンツ、@ call_backendデコレーターを削除し、次のように呼び出します。



>>> mail_content.apply_async(mail_addres、mail_password、email_from、link = mail_analizer.s())

link_error変数は、タスクでエラーが発生した場合にも提供されます。

詳細については、 こちらをご覧ください。

セロリ3.0に加えて登場しました:

団体


グループ、並行して適用されるべきタスクのリストを受け入れます:

ドキュメントの例:

 >>> from celery import group >>> res = group(add.s(i, i) for i in xrange(10))() >>> res.get(timeout=1) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]   >>> g = group(add.s(i, i) for i in xrange(10)) >>> g.apply_async()
      
      





チェーン:


コールチェーン

次のように、タスクをチェーンで呼び出すことができます。

 >>> from celery import chain @task def mul(x,y): return x*y @task def div(x,y): return x/y # (2 + 2) * 8 / 2 >>> res = chain(add.subtask((2, 2)), mul.subtask((8, )), div.subtask((2,))).apply_async() >>> res.get() == 16 >>> res.parent.get() == 32 >>> res.parent.parent.get() == 4   >>> (add.s(2, 2) | add.s(4) | add.s(8))().get() 16 </source <h5>immutable</h5>     ,            <source lang="python"> >>> add.subtask((2, 2), immutable=True)  >>> add.si(2, 2)
      
      





和音


コード:

並行して実行されるタスクのリストと、タスクのリストの実行結果のリストを受け入れるタスクを受け入れます。 これは曲がっています。

 @task def xsum(res_list): return sum(res_list) >>> from celery import chord >>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() >>> res.get() 90
      
      





チェーン(グループ)を使用してコードを取得します。

 >>> c3 = (group(add.s(i, i) for i in xrange(10) | xsum.s())) >>> res = c3() >>> res.get() 90
      
      





地図


ライクマップ(楽しい、[1,2,3])

 res=task.map([1,2])  res=[task(1), task(2)]
      
      





スターマップ


 res=add.starmap([(1,2), (2,4)])  res=[add(1,2), add(2,4)]
      
      





チャンクス


引数の長いリストをさまざまなタスクに分割し、

 >>> from proj.tasks import add >>> res = add.chunks(zip(range(100), range(100)), 10)() >>> res.get() [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38], [40, 42, 44, 46, 48, 50, 52, 54, 56, 58], [60, 62, 64, 66, 68, 70, 72, 74, 76, 78], [80, 82, 84, 86, 88, 90, 92, 94, 96, 98], [100, 102, 104, 106, 108, 110, 112, 114, 116, 118], [120, 122, 124, 126, 128, 130, 132, 134, 136, 138], [140, 142, 144, 146, 148, 150, 152, 154, 156, 158], [160, 162, 164, 166, 168, 170, 172, 174, 176, 178], [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
      
      





小計:


セロリ3.0は非常に便利なパンをたくさん提供しているので、使用すれば楽しいです。

結果:


Celeryは多くの便利なツールを提供しますが、これらの機能の90%が不要な小さなタスクの場合、メッセージキュー(ウサギ)を使用することにより、セロリを構成する必要がなくなり、サーバーの負荷が軽減され、プロジェクトの追加の依存関係がなくなります。

ご清聴ありがとうございました。



All Articles