(2018年版)
ミゲル・グリンバーグ
これはMega-Tutorialの22番目の部分です。ここでは、Webサーバーとは独立して機能するバックグラウンドジョブを作成する方法を説明します。
ネタバレの下には、2018年シリーズのすべての記事のリストがあります。
- 第1章:Hello world!
- 第2章:テンプレート
- 第3章:Webフォーム
- 第4章:データベース
- 第5章:ユーザーログイン
- 第6章:プロフィールページとアバター
- 第7章:エラー処理
- 第8章:サブスクライバー、連絡先、およびフレンド
- 第9章:ページネーション
- 第10章:メールサポート
- 第11章:再構成
- 第12章:日付と時刻
- 第13章:I18nおよびL10n
- 第14章:Ajax
- 第15章:アプリケーション構造の改善
- 第16章:全文検索
- 第17章:Linuxの展開
- 第18章:Herokuでの展開
- 第19章:Dockerコンテナーでの展開
- 第20章:JavaScriptマジック
- 第21章:ユーザー通知
- 第22章:バックグラウンドタスク (この記事)
- 第23章:アプリケーションプログラミングインターフェイス(API)
注1:このコースの古いバージョンをお探しの場合は、こちらをご覧ください 。
注2:突然、私の(ミゲル)の仕事を支持して話をしたい場合、または1週間記事を待つ忍耐がない場合、私(ミゲルグリーンバーグ)はこのガイドの完全版(英語)を電子書籍またはビデオの形式で提供します。 詳細については、 learn.miguelgrinberg.comをご覧ください 。
この章では、アプリケーションの一部として実行する必要のある長いプロセスまたは複雑なプロセスの実装に焦点を当てます。 これらのプロセスは、タスクの実行中はクライアントへの応答をブロックするため、リクエストのコンテキストで同期的に実行することはできません。 クライアントがメールを送信するのに必要な3〜4秒待機する必要がないように、メールメッセージの送信をバックグラウンドストリームに移動する第10章でこのトピックについて簡単に触れました。 電子メールメッセージにストリームを使用しても問題ありませんが、問題のプロセスが非常に長い場合、このソリューションはうまく拡張できません。 一般的な方法は、長いタスクをワークフロー、またはほとんどの場合プールにアップロードすることです。
長いタスクの必要性を正当化するために、マイクロブログにエクスポート機能を導入します。これにより、ユーザーはすべてのブログ投稿を含むデータファイルを要求できます。 ユーザーがこのオプションを使用する場合、アプリケーションはすべてのユーザーメッセージをJSONファイルにエクスポートしてから、電子メールでユーザーに送信する必要があります。 これがすべて行われている間、ユーザーには完了の割合を示す通知が表示されます。
この章のGitHubリンク: Browse 、 Zip 、 Diff 。
タスクキューの概要
タスクキューは、 タスクを完了するためのワークフローを要求するための便利なソリューションをアプリケーションに提供します。 ワークフローはアプリケーションとは独立して実行され、別のシステムに常駐することさえあります。 アプリケーションとハンドラー間の通信は、 メッセージキューを介して行われます 。 アプリケーションはタスクを送信し、実行を監視してキューと対話します。 次の図は、典型的な実装を示しています。
Pythonで最も人気のあるタスクキューはCeleryです。 これは多くのオプションがあり、複数のメッセージキューをサポートするかなり複雑なパッケージです。 Pythonタスクキューのもう1つの一般的なオプションは、 Redisキューまたは単にRQです。これは、Redisメッセージキューのみをサポートしますが、Celeryよりも構成がはるかに簡単です。
CeleryとRQはどちらもFlaskアプリケーションのバックグラウンドタスクをサポートするのに非常に適しているため、RQのシンプルさはこのアプリケーションの選択に役立ちます。 ただし、同じ機能をCeleryで実装することはそれほど複雑ではありません。 RQよりもCeleryに興味がある場合は、ブログで書いた記事「 Using Celery with Flask 」 を読むことができます。
rqを使用する
RQはpip
を介してインストールされる標準のPythonパッケージです:
(venv) $ pip install rq (venv) $ pip freeze > requirements.txt
前述したように、アプリケーションとRQハンドラーの間の接続はRedisメッセージキューにあるため、Redisサーバーを起動する必要があります。 ワンクリックでRedisサーバーをインストールおよび起動して、ソースコードインストーラーをダウンロードし、システムで直接コンパイルするための多くのオプションがあります。 Windowsを使用している場合、Microsoftはここでインストーラーをサポートします 。 Linuxでは、おそらくオペレーティングシステムのパッケージマネージャーを介してパッケージとして取得できます。 Mac OS Xユーザーはbrew install redis
を開始してから、 redis-server
コマンドを使用して手動でサービスを開始redis-server
。
サービスが実行され、RQで利用可能であることを確認する場合を除き、すべてでRedisと対話する必要はありません。
タスクを作成する
RQを使用して簡単なタスクを完了する方法を紹介します。これにより、タスクに慣れることができます。 タスクはPython関数にすぎません。 新しいapp / tasks.pyモジュールに実装するタスクの例を次に示します 。
app / tasks.py :バックグラウンドタスクの例。
import time def example(seconds): print('Starting task') for i in range(seconds): print(i) time.sleep(1) print('Task completed')
このタスクは引数として秒数を取り、1秒に1回カウンターを印刷してこの時間を待機します。
RQワーカーを起動
タスクの準備ができたので、ハンドラーを開始できます。 これは、 rq worker
コマンドを使用して行われます。
(venv) $ rq worker microblog-tasks 18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1 18:55:06 Cleaning registries for queue: microblog-tasks 18:55:06 18:55:06 *** Listening on microblog-tasks...
これで、ワークフローはRedisに接続され、microblog microblog-tasks
という名前のキューで割り当て可能なすべてのタスクを監視します。 複数のハンドラーにより多くの帯域幅を持たせたい場合、必要なことはrq worker
より多くのインスタンスを実行することだけで、すべて同じキューに接続されます。 次に、ジョブがキューに表示されると、使用可能なワークフローのいずれかがジョブを選択します。 実稼働環境では、少なくともCPUで使用可能なプロセッサと同じ数のプロセッサが必要になるでしょう。
タスクの達成
次に、2番目のターミナルウィンドウを開き、その仮想環境をアクティブにします。 シェルセッションを使用して、workerでexample()
タスクを実行します。
>>> from redis import Redis >>> import rq >>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://')) >>> job = queue.enqueue('app.tasks.example', 23) >>> job.get_id() 'c651de7f-21a8-4068-afd5-8b982a6f6d32'
RQのQueue
クラスは、アプリケーションキューを表します。 これは2つの引数を取ります。これはキュー名とRedis
接続オブジェクトであり、この場合はデフォルトのURLで初期化します。 Redisサーバーが別のホストまたはポートで実行されている場合は、別のURLを使用する必要があります。
enqueue()
メソッドは、キューにジョブを追加するために使用されます。 最初の引数は、実行するタスクの名前であり、関数オブジェクトまたはインポート文字列として直接指定されます。 これにより、アプリケーション側で関数をインポートする必要がなくなるため、文字列オプションのほうがはるかに便利です。 enqueue()
指定された残りの引数はすべて、workerで実行されている関数に渡されます。
enqueue()
が呼び出されるとすぐに、ワーカーRQが実行されているターミナルの最初のウィンドウでアクティビティに気付くでしょう。 example()
関数が機能し、1秒に1回カウンターを出力することがわかります。 同時に、他の端末はブロックされず、シェルで式を評価し続けることができます。 上記の例では、 job.get_id()
メソッドをjob.get_id()
、タスクの一意の識別子を取得しました。 job
オブジェクトで使用できる別の興味深い表現は、関数が職場での作業を終了したかどうかを確認することです。
>>> job.is_finished False
上記の例で行ったように23
を渡した場合、関数は約23秒間機能します。 この時間がjob.is_finished
すると、 job.is_finished
はTrue
になりTrue
。 それは素晴らしいことではありませんか?! RQのシンプルさが本当に気に入っています!
関数が完了するとすぐに、 ワーカーは新しいジョブの待機に戻るため、さらに実験する場合は、他の引数を指定してenqueue()
呼び出しを繰り返すことができます。 タスクに関連するキューに保存されたデータは、しばらくの間(デフォルトでは500秒)そこに残りますが、最終的には削除されます。 これは重要です;タスクキューは完了したタスクの履歴を保存しません。
タスク進捗レポート
上記で使用したタスクの例は、非現実的に単純です。 原則として、長いタスクの実行中に、実行の進行状況に関する情報をアプリケーションで利用できるようにし、その情報をユーザーに表示できます。 RQは、 meta
ジョブオブジェクト属性でこれをサポートします。 example()
タスクを書き直して、進捗レポートを記録します。
app / tasks.py :進捗レポート付きのバックグラウンドタスクの例。
import time from rq import get_current_job def example(seconds): job = get_current_job() print('Starting task') for i in range(seconds): job.meta['progress'] = 100.0 * i / seconds job.save_meta() print(i) time.sleep(1) job.meta['progress'] = 100 job.save_meta() print('Task completed')
example()
この新しいバージョンは、RQ get_current_job()
関数を使用して、タスクがget_current_job()
ときにアプリケーションに返されたものと同様のジョブインスタンスを取得します。 meta
ジョブオブジェクト属性は、タスクがアプリケーションに渡すユーザーデータを記録できる辞書です。 この例では、記録のために、タスクの完了の割合を表すprogress
要素を使用します。 進行状況が更新されるたびに、 job.save_meta()
を呼び出して、アプリケーションが見つけることができるRedisにデータを書き込むjob.save_meta()
RQにjob.save_meta()
します。
アプリケーション側(現在はPythonシェルのみ)では、このタスクを実行して、次のように進行状況を追跡できます。
>>> job = queue.enqueue('app.tasks.example', 23) >>> job.meta {} >>> job.refresh() >>> job.meta {'progress': 13.043478260869565} >>> job.refresh() >>> job.meta {'progress': 69.56521739130434} >>> job.refresh() >>> job.meta {'progress': 100} >>> job.is_finished True
上記でわかるように、こちら側ではmeta
属性が読み取り可能です。 Redisからコンテンツを更新するには、 refresh()
メソッドを呼び出す必要があります。
データベース内のタスクの提出
上記の例では、タスクを実行して、その実行方法を確認するだけで十分です。 Webアプリケーションの場合、これらのタスクの1つがリクエストの一部として開始されるとすぐにこのリクエストが終了し、このタスクのコンテキスト全体が失われるため、事態はもう少し複雑になります。 アプリケーションで各ユーザーが実行するタスクを追跡する必要があるため、データベーステーブルを使用して状態を維持する必要があります。 以下に、 Task
モデルの新しい実装を示します。
app / models.py :タスクモデル。
# ... import redis import rq class User(UserMixin, db.Model): # ... tasks = db.relationship('Task', backref='user', lazy='dynamic') # ... class Task(db.Model): id = db.Column(db.String(36), primary_key=True) name = db.Column(db.String(128), index=True) description = db.Column(db.String(128)) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) complete = db.Column(db.Boolean, default=False) def get_rq_job(self): try: rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis) except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError): return None return rq_job def get_progress(self): job = self.get_rq_job() return job.meta.get('progress', 0) if job is not None else 100
このモデルと以前のモデルの興味深い違いは、主キーフィールドid
が整数ではなく文字列であることです。 これは、このモデルでは、データベースによる独自のプライマリキー生成に依存するのではなく、RQによって作成されたジョブ識別子を使用するためです。
モデルには、タスクのフルネーム(RQに渡される)、ユーザーへの表示に適したタスクの説明、タスクを要求したユーザーとの通信、およびタスクが完了したかどうかを示す論理値が格納されます。 complete
フィールドの目的は、実行中のタスクが更新の進行状況を表示するために特別な処理を必要とするため、完了したタスクをアクティブに実行されているタスクから分離することです。
get_rq_job()
メソッドは、モデルから取得できる、指定されたタスク識別子からRQ Job
インスタンスをロードするヘルパーメソッドです。 これは、Redisに存在するデータからジョブのインスタンスをロードするJob.fetch()
を使用して行われます。 get_progress()
メソッドはget_progress()
メソッドの上に構築され、タスクの完了率を返します。 この方法には興味深い提案がいくつかあります。 モデルからのジョブIDがRQキューに存在しない場合、これはタスクが既に完了しており、データが期限切れでキューから削除されていることを意味します。したがって、この場合は100%が返されます。 一方、タスクが存在するが、 meta
属性に関連する情報がない場合、タスクが完了するようにスケジュールされていると安全に想定できますが、開始する機会がなかったため、この状況では進捗として0が返されます。
データベーススキーマに変更を適用するには、新しい移行を作成し、データベースを更新する必要があります。
(venv) $ flask db migrate -m "tasks" (venv) $ flask db upgrade
新しいモデルをシェルコンテキストに追加して、インポートすることなくシェルセッションで使用できるようにすることもできます。
microblog.py :タスクモデルをシェルコンテキストに追加します。
from app import create_app, db, cli from app.models import User, Post, Message, Notification, Task app = create_app() cli.register(app) @app.shell_context_processor def make_shell_context(): return {'db': db, 'User': User, 'Post': Post, 'Message': Message, 'Notification': Notification, 'Task': Task}
FlaskとのRQ統合
Redisサービスの接続URLを構成に追加する必要があります。
class Config(object): # ... REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
いつものように、Redis接続URLは環境変数から取得され、変数が定義されていない場合、デフォルトのURLが使用されます。これは、サービスがデフォルトで同じホストとポートで実行されることを前提としています。
アプリケーションファクトリ関数は、RedisとRQの初期化を担当します。
app / _ init_ .py :RQ統合。
# ... from redis import Redis import rq # ... def create_app(config_class=Config): # ... app.redis = Redis.from_url(app.config['REDIS_URL']) app.task_queue = rq.Queue('microblog-tasks', connection=app.redis) # ...
app.task_queue
は、タスクが提示されるキューになります。 アプリケーションのどこにいてもcurrent_app.task_queue
を使用してアクセスできるため、アプリケーションにキューをアタッチすると便利です。 アプリケーションの一部を送信またはチェックしやすくするために、 User
モデルにいくつかのヘルパーメソッドを作成できます。
app / models.py :ユーザーモデルのタスクのヘルパーメソッド。
# ... class User(UserMixin, db.Model): # ... def launch_task(self, name, description, *args, **kwargs): rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id, *args, **kwargs) task = Task(id=rq_job.get_id(), name=name, description=description, user=self) db.session.add(task) return task def get_tasks_in_progress(self): return Task.query.filter_by(user=self, complete=False).all() def get_task_in_progress(self, name): return Task.query.filter_by(name=name, user=self, complete=False).first()
launch_task()
メソッドは、タスクをRQキューに渡し、データベースに追加します。 name
引数は、 app / tasks.pyで定義されている関数の名前です。 RQにapp.tasks
れると、関数はapp.tasks
追加されapp.tasks
。 関数の完全なname
を作成する名前。 description
引数は、ユーザーに提示できるタスクの明確な説明です。 ブログ投稿をエクスポートする関数では、 export_posts
という名前とExporting posts...
のExporting posts...
の説明を使用しExporting posts...
残りの引数は、タスクに渡される位置引数とキー引数です。 この関数は、 enqueue()
キューメソッドを呼び出してジョブを送信することから始まります。 返されたタスクオブジェクトにはRQによって割り当てられたタスクIDが含まれているため、これを使用して、データベースに対応するタスクオブジェクトを作成できます。
launch_task()
は、セッションに新しいTask
オブジェクトを追加しますが、コミットしません。 一般的なケースでは、1つのトランザクションで下位レベルの関数によって行われた複数の更新を組み合わせることができるため、上位レベルの関数でデータベースセッションを操作するのが最適です。 これは厳密なルールではありません。この章の後半で、子関数でコミットが実行される例外を確認します。
get_tasks_in_progress()
メソッドは、ユーザーに発行された関数の完全なリストを返します。 後で、このメソッドを使用して、ユーザーに表示されるページで実行されるタスクに関する情報を含めることがわかります。
最後に、 get_task_in_progress()
は、特定のタスクを返す以前のバージョンの単純なバージョンです。 ユーザーが同じタイプの複数のタスクを同時に実行することを禁止しているため、タスクを開始する前に、このメソッドを使用して前のタスクが現在実行されているかどうかを確認できます。
RQタスクからメールを送信する
これはメイントピックからの逸脱のように思えるかもしれませんが、上記で述べたように、バックグラウンドエクスポートタスクが完了すると、すべてのメッセージを含むJSONファイルを含むメールがユーザーに送信されます。 第11章で紹介した電子メール機能は、2つの方法で拡張する必要があります。 まず、JSONファイルを添付できるように、添付ファイルのサポートを追加する必要があります。 次に、 send_email()
関数は、バックグラウンドスレッドを使用して非同期でレターを送信します。 既に非同期のバックグラウンドタスクから電子メールを送信する場合、ストリームに基づく第2レベルのバックグラウンドタスクはあまり意味がないため、電子メールの同期送信と非同期送信の両方をサポートする必要があります。
幸いなことに、Flask-Mailは添付ファイルをサポートしているので、 send_email()
関数を拡張してそれらを追加の引数として取得し、 Message
オブジェクトで設定するだけです。 そして、優先タスクとして電子メールを送信することに加えて、論理sync
引数を追加するだけです。
app / email.py :添付ファイル付きのメールを送信します。
# ... def send_email(subject, sender, recipients, text_body, html_body, attachments=None, sync=False): msg = Message(subject, sender=sender, recipients=recipients) msg.body = text_body msg.html = html_body if attachments: for attachment in attachments: msg.attach(*attachment) if sync: mail.send(msg) else: Thread(target=send_async_email, args=(current_app._get_current_object(), msg)).start()
Message
クラスのattach()
メソッドは、添付ファイルを定義する3つの引数(ファイル名、メディアタイプ、および実際のファイルデータattach()
取ります。 ファイル名は、添付ファイルに関連付けられた受信者に表示される単なる名前であり、実際のファイルであってはなりません。 メディアタイプによって添付ファイルのタイプが決定されるため、電子メールリーダーは適切に表示できます。 たとえば、メディアタイプとしてjpg/png
を送信すると、電子メールリーダーは添付ファイルが画像であることを認識します。その場合、そのように表示できます。 ブログ投稿データファイルには、メディアタイプapplication/json
を使用するJSON形式を使用します。 3番目の最後の引数は、添付ファイルの内容を含む文字列またはバイトシーケンスです。
簡単にするために、 send_email()
attachments
引数はタプルのリストになり、各タプルには3つのattach()
引数に対応する3つの要素があります。 したがって、このリストの各要素に対して、タプルを引数としてattach()
送信する必要があります。 Pythonでは、関数に送信する引数を含むリストまたはタプルがある場合、次のような退屈な構文を使用する代わりに、 func(*args)
を使用してこのリストを引数の実際のリストに展開できますfunc(args[0], args[1], args[2])
。 たとえば、 args = [1, 'foo']
場合、呼び出しはfunc (1, 'foo')
呼び出したかのように2つの引数を送信します。 *
ない場合*
呼び出しには1つの引数が含まれ、リストになります。
電子メールの同期送信に関しては、 sync
が True
ときに直接mail.send(msg)
呼び出しに戻る必要がありました。
タスクヘルパー
上記で使用したexample()
タスクは単純なスタンドアロン関数でしたが、ブログ投稿をエクスポートする関数には、データベースへのアクセスやメール機能の送信など、アプリケーションにある機能の一部が必要になります。 これは別のプロセスで行われるため、Flask-SQLAlchemyとFlask-Mailを初期化する必要があります。これらを設定するには、Flaskアプリケーションのインスタンスが必要です。 そのため、Flaskアプリケーションのインスタンスとapp / tasks.pyモジュールの上部にアプリケーションコンテキストを追加します 。
app / tasks.py :アプリケーションとコンテキストを作成します。
from app import create_app app = create_app() app.app_context().push()
これはRQワーカーをインポートする唯一のモジュールであるため、このモジュールでアプリケーションが作成されます。 flask
コマンドを使用する場合、ルートディレクトリのmicroblog.pyモジュールがアプリケーションを作成しますが、RQワーカーはそれについて何も知らないため、タスク機能に必要な場合はアプリケーションの独自のインスタンスを作成する必要があります。 app.app_context()
メソッドはすでにいくつかの場所で見られ、コンテキストを押すと、アプリケーションがアプリケーションの「現在の」インスタンスになり、Flask-SQLAlchemyなどの拡張機能がcurrent_app.config
を使用して設定を取得できるようになります。 コンテキストがない場合、式current_app
はエラーを返します。
次に、この機能の進捗状況をどのように報告するかを考えました。 job.meta
ディクショナリを介して進行情報を送信することに加えて、クライアントに通知を送信して、ページを更新する必要なく完了率を動的に更新できるようにします。 このために、 第21章で作成したものと同様の通知メカニズムを使用します。 更新は、未読メッセージアイコンと同様に機能します。 サーバーがテンプレートを表示すると、 job.meta
から取得した「静的な」進捗情報が含まれますが、クライアントのブラウザーにページが表示されるとすぐに、通知を使用して通知がパーセンテージを動的に更新します。 通知のため、実行中のタスクの進行状況の更新は、前の例で行った方法よりも少し複雑になるため、タスクの進行状況の更新専用のデコレーター関数を作成します。
app / tasks.py :タスクの進行状況を設定します。
from rq import get_current_job from app import db from app.models import Task # ... def _set_task_progress(progress): job = get_current_job() if job: job.meta['progress'] = progress job.save_meta() task = Task.query.get(job.get_id()) task.user.add_notification('task_progress', {'task_id': job.get_id(), 'progress': progress}) if progress >= 100: task.complete = True db.session.commit()
エクスポートタスクは、 _set_task_progress()
を呼び出して、完了の割合を記録できます。 job.meta
Redis, task task.user
, , add_notification()
. task_progress
, , , , (progress number). JavaScript, .
, , , complete
. , , add_notification()
, . , , - , .
. :
app/tasks.py : .
def export_posts(user_id): try: # # except: #
try/except? , , , Flask , , , . , , , RQ, Flask, , , , RQ , . , RQ worker -, , .
, , :
app/tasks.py : .
import sys # ... def export_posts(user_id): try: # ... except: _set_task_progress(100) app.logger.error('Unhandled exception', exc_info=sys.exc_info())
, , , 100%, logger Flask , sys.exc_info()
. , flask Application logger , Flask . , 7 . app.logger
.
, , :
app/tasks.py : .
import time from app.models import User, Post # ... def export_posts(user_id): try: user = User.query.get(user_id) _set_task_progress(0) data = [] i = 0 total_posts = user.posts.count() for post in user.posts.order_by(Post.timestamp.asc()): data.append({'body': post.body, 'timestamp': post.timestamp.isoformat() + 'Z'}) time.sleep(5) i += 1 _set_task_progress(100 * i // total_posts) # except: # ...
, . ISO 8601. datetime
Python, , , ISO "Z", UTC.
- . i
, , total_posts
, . i
total_posts
0 100.
, , time.sleep(5)
. , sleep, , , .
, , data
:
app/tasks.py : .
import json from flask import render_template from app.email import send_email # ... def export_posts(user_id): try: # ... send_email('[Microblog] Your blog posts', sender=app.config['ADMINS'][0], recipients=[user.email], text_body=render_template('email/export_posts.txt', user=user), html_body=render_template('email/export_posts.html', user=user), attachments=[('posts.json', 'application/json', json.dumps({'posts': data}, indent=4))], sync=True) except: # ...
send_email()
. , attach()
Flask-Mail's Message
. - , Python json.dumps()
.
, , HTML . :
app/templates/email/export_posts.txt : Export posts text email template.
Dear {{ user.username }}, Please find attached the archive of your posts that you requested. Sincerely, The Microblog Team
HTML- :
app/templates/email/export_posts.html: Export posts HTML email template.
<p>Dear {{ user.username }},</p> <p>Please find attached the archive of your posts that you requested.</p> <p>Sincerely,</p> <p>The Microblog Team</p>
. , .
export_posts
:
app/main/routes.py : Export posts route and view function.
@bp.route('/export_posts') @login_required def export_posts(): if current_user.get_task_in_progress('export_posts'): flash(_('An export task is currently in progress')) else: current_user.launch_task('export_posts', _('Exporting posts...')) db.session.commit() return redirect(url_for('main.user', username=current_user.username))
, , . , . , get_task_in_progress()
, .
, launch_task()
. - , RQ worker app.tasks.
。 - , . Task
. .
, . , , , " ":
app/templates/user.html : .
... <p> <a href="{{ url_for('main.edit_profile') }}"> {{ _('Edit your profile') }} </a> </p> {% if not current_user.get_task_in_progress('export_posts') %} <p> <a href="{{ url_for('main.export_posts') }}"> {{ _('Export your posts') }} </a> </p> ... {% endif %}
, , , .
, . , RQ worker :
- , Redis
- , RQ.
rq worker microblog-tasks
- Flask,
flask run
(FLASK_APP
)
, . Bootstrap, . - , . - , . , . , :
app/templates/base.html : .
... {% block content %} <div class="container"> {% if current_user.is_authenticated %} {% with tasks = current_user.get_tasks_in_progress() %} {% if tasks %} {% for task in tasks %} <div class="alert alert-success" role="alert"> {{ task.description }} <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>% </div> {% endfor %} {% endif %} {% endwith %} {% endif %} ... {% endblock %} ...
. . , get_tasks_in_progress()
, . , , , , .
alert . CSS, alert-success
, alert-info. Bootstrap HTML . , , .
<span>
, id
. , JavaScript . , , -progress
. , , <span>
#<task.id> - progress
.
, "" , . , , .
<span>
, JavaScript:
app/templates/base.html : .
... {% block scripts %} ... <script> ... function set_task_progress(task_id, progress) { $('#' + task_id + '-progress').text(progress); } </script> ... {% endblock %}
id
jQuery <span>
. , , jQuery , .
, _set_task_progress()
app/tasks.py add_notification()
. , - , , 21 , . , add_notification()
, , .
JavaScript, , , unread_message_count
, . , task_progress
, set_task_progress()
, . , JavaScript:
app/templates/base.html : .
for (var i = 0; i < notifications.length; i++) { switch (notifications[i].name) { case 'unread_message_count': set_message_count(notifications[i].data); break; case 'task_progress': set_task_progress( notifications[i].data.task_id, notifications[i].data.progress); break; } since = notifications[i].timestamp; }
, , if
, unread_message_count
, switch
, , . "C", , switch . , if/elseif
. , .
, , RQ task_progress
, task_id
progress
, set_task_progress()
.
, 10 , .
, , . , , Flask-Babel , :
(venv) $ flask translate update
, , app/translations/es/LC_MESSAGES/messages.po .
, , :
(venv) $ flask translate compile
. : Redis RQ. , , , , , .
Linux
Linux, Redis , . Ubuntu Linux sudo apt-get install redis-server
.
RQ, " Gunicorn Supervisor" 17 , Supervisor, rq worker-tasks
gunicorn
. (, , production), numprocs
, , .
Heroku
Heroku, Redis . , Postgres. Redis , :
$ heroku addons:create heroku-redis:hobby-dev
URL- redis Heroku REDIS_URL
, , .
Heroku web-dyno worker dyno, rq , . procfile:
web: flask db upgrade; flask translate compile; gunicorn microblog:app worker: rq worker microblog-tasks
:
$ heroku ps:scale worker=1
Docker
Docker Redis. Redis Docker:
$ docker run --name redis -d -p 6379:6379 redis:3-alpine
redis REDIS_URL
, , MySQL. , redis:
$ docker run --name microblog -d -p 8000:5000 --rm -e SECRET_KEY=my-secret-key \ -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \ -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \ --link mysql:dbserver --link redis:redis-server \ -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \ -e REDIS_URL=redis://redis-server:6379/0 \ microblog:latest
, RQ. , , , , start up, -. docker run
, worker:
$ docker run --name rq-worker -d --rm -e SECRET_KEY=my-secret-key \ -e MAIL_SERVER=smtp.googlemail.com -e MAIL_PORT=587 -e MAIL_USE_TLS=true \ -e MAIL_USERNAME=<your-gmail-username> -e MAIL_PASSWORD=<your-gmail-password> \ --link mysql:dbserver --link redis:redis-server \ -e DATABASE_URL=mysql+pymysql://microblog:<database-password>@dbserver/microblog \ -e REDIS_URL=redis://redis-server:6379/0 \ --entrypoint venv/bin/rq \ microblog:latest worker -u redis://redis-server:6379/0 microblog-tasks
コマンドは2つの部分で指定する必要があるため、Dockerイメージのデフォルトの起動コマンドをオーバーライドするのはもう少し複雑です。引数--entrypoint
は、実行可能ファイルの名前のみを受け入れますが、引数(存在する場合)は、コマンドラインの最後のイメージとタグの後に指定する必要があります。仮想環境をアクティブにせずに機能するrq
ように、何を指定する必要があるかに注意してくださいvenv/bin/rq
。