こんにちは、Habr! 私の名前はディナです。Mail.RuGroupの分析タスクを解決するためのゲームデータウェアハウスを開発しています。 私たちのチームは、Apache Airflow (以降Airflow )を使用してデータ処理のバッチプロセスを開発しています 。 Airflowは、ETL / ELTプロセスを開発するためのオープンソースライブラリです。 個々のタスクは、定期的に実行されるタスクチェーン-Dagi(DAG-Directed Acyclic Graph)に結合されます。
通常、エアフロープロジェクトの80%は標準DAGです。 私の記事では、複雑な分岐を必要とする残りの20%、タスク間の通信、つまり、自明でないアルゴリズムを必要とするDAGについて説明します。
フロー制御
遷移条件
いくつかのシャードからデータを毎日収集するタスクに直面していると想像してください。 ステージング領域に同時に記録し、それらのストレージにターゲットテーブルを構築します。 何らかの理由で、操作中にエラーが発生した場合(たとえば、一部のシャードが利用できなかった場合)、DAGは次のようになります。
次のタスクに進むには、前のエラーを処理する必要があります。 オペレーターパラメーターの1つであるtrigger_ruleがこれを担当します。 デフォルト値all_successは、 以前のタスクがすべて正常に完了した場合にのみタスクが開始されることを示します。
また、 trigger_ruleは次の値を取ることができます。
- all_failed-以前のすべてのタスクが失敗した場合。
- all_done-前のタスクがすべて完了した場合、成功したかどうかは関係ありません。
- one_failed-前のタスクのいずれかがクラッシュした場合、残りの完了は不要です。
- one_success-前のタスクのいずれかが正常に完了した場合、残りのタスクの完了は不要です。
分岐
BranchPythonOperatorブランチ演算子を使用して、if-then-else ロジックを実装できます。 呼び出される関数は、次のように開始されるタスク選択アルゴリズムを実装する必要があります。 何も返せない場合、後続のすべてのタスクは実行不要としてマークされます。
この例では、シャードが利用できないことはゲームサーバーの定期的なシャットダウンにそれぞれ関連していることがわかりました。 確かに、含まれるサーバーの数を考慮してショーケースを構築する必要があります。
これは、値がone_success (前のタスクの少なくとも1つが成功した)とall_done (前のすべてのタスクが完了した)、および単一のPythonOperatorの代わりにselect_next_task分岐演算子をとるtrigger_ruleパラメーターを持つ2つのタスクの束で同じDAGのように見えます。
# , all_done = DummyOperator(task_id='all_done', trigger_rule='all_done', dag=dag) # , one_success = DummyOperator(task_id='one_success', trigger_rule='one_success', dag=dag) # def select_next_task(): success_shard_count = get_success_shard_count() if success_shard_count == 0: return 'no_data_action' elif success_shard_count == 6: return 'all_shards_action' else: return 'several_shards_action' select_next_task = BranchPythonOperator(task_id='select_next_task', python_callable=select_next_task, dag=dag)
Trigger_ruleステートメントのパラメーターのドキュメント
BranchPythonOperatorオペレーターのドキュメント
気流マクロ
エアフローオペレーターは、Jinjaを使用して渡されたパラメーターのレンダリングもサポートします。 これは強力なテンプレートエンジンであり、ドキュメントで詳細を読むことができますが、Airflowでの作業で使用するこれらの側面についてのみ説明します。
テンプレートエンジンは以下を処理します。
- template_fieldタプルで指定された演算子文字列パラメーター。
- template_extで指定された拡張子を持つ、オペレータパラメータで渡されるファイル。
- コンテキストを介して渡されるタスクエンティティのtask.render_template関数によって処理される行。 渡されたコンテキスト(
provide_context=True
)を持つPythonOperator関数の例:
def index_finder(conn_id, task, **kwargs): sql = "SELECT MAX(idtransaction) FROM {{ params.billing }}" max_id_sql = task.render_template("", sql, kwargs) ...
AirflowでJinjaを使用する方法は次のとおりです。
- もちろん、これは日付の処理です。 {{ds}}、{{yesterday_ds}}、{{tomorrow_ds}} -これらのテンプレートは、前処理の後、 YYYY-MM-DDの形式で起動日、その前日、および翌日に置き換えられます。 同じ、ただし数字のみ、ハイフンなし: {{ds_nodash}}、{{yesterday_ds_nodash}}、{{tomorrow_ds_nodash}}
- 組み込み関数を使用します。 たとえば、 {{macros.ds_add(ds、-5)}}は、数日かかるか追加する方法です。 {{macros.ds_format(ds、“%Y-%m-%d”、“%Y”)}} -日付のフォーマット。
- パラメータを渡す。 これらはparams引数で辞書として渡され、次のとおりです: {{params.name_of_our_param}}
- パラメーターで同じ方法で渡されるカスタム関数を使用します。 {{params.some_func(ds)}}
- Python組み込みライブラリの使用:
{{(macros.dateutil.relativedelta.relativedelta(day = 1、months = -params.retention_shift))。strftime( "%Y-%m-%d")}} - if-elseコンストラクトの使用:
{{dag_run.conf [“メッセージ”] if dag_run else“”}} - サイクルの構成:
{範囲内のidxの%(params.days_to_load、-1、-1)%}
{{macros.ds_add(ds、-idx)}}
{%endfor%}
以下は、エアフローインターフェイスのレンダリングパラメーターの例です。 最初に、cut_daysパラメーターで渡された日数より古いレコードを削除します。 これは、Airflowでjinjaテンプレートを使用したsqlの外観です。
処理されたSQLでは、特定の日付が既に式に置き換えられています。
2番目の例はより複雑です。 unixtimeへの日付変換を使用して、ソース上のデータのフィルタリングを簡素化します。 「{:.0f}」コンストラクトは、小数点以下の出力を取り除くために使用されます。
Jinjaは、DAGの実行日とそれに続く日付に対応する二重中括弧の間の式をunixtimeに置き換えます。
さて、最後の例では、パラメーターとして渡されるtruncshift関数を使用します。
この式の代わりに、テンプレートエンジンは関数の結果を置き換えます。
タスク間のコミュニケーション
情報源の1つである興味深いログストレージシステム。 5日ごとに、ソースはこの種の新しいテーブルsquads_02122017を作成します。 その名前には日付が含まれているため、その計算方法についての疑問が生じました。 しばらくの間、5日間すべての名前を持つテーブルを使用しました。 4つのリクエストが落ちましたが、 trigger_rule = 'one_success'で助かりました(これは、5つのタスクすべてがオプションの場合に当てはまります)。
しばらくして、1つのDAG、 XCom (クロスコミュニケーションの略)のタスク間でメッセージを交換するために、Airflowに組み込まれたtrigger_ruleテクノロジーの代わりに使用を開始しました。 XComは、キーと値のペアと、送信元のタスクの名前によって定義されます。
XComは、返される値に基づいてPythonOperatorで作成されます。 xcom_push関数を使用して、XComを手動で作成できます。 タスクが完了すると、値がコンテキストに保存され、後続のタスクは別のPythonOperatorのxcom_pull関数を使用して、または前処理された文字列内のjinjaテンプレートからXComを受け入れることができます。
テーブル名の取得は次のようになります。
def get_table_from_mysql(**kwargs): """ """ hook = MySqlHook(conn_name) cursor = hook.get_conn().cursor() cursor.execute(kwargs['templates_dict']['sql']) table_name = cursor.fetchall() # XCom 'table_name' kwargs['ti'].xcom_push(key='table_name', value=table_name[0][1]) # XCom': # return table_name[0][1] # - # , PostgreSQL select_table_from_mysql_sql = ''' SELECT table_name FROM information_schema.TABLES WHERE table_schema = 'jungle_logs' AND table_name IN ('squads_{{ macros.ds_format(ds, "%Y-%m-%d", "%d%m%Y") }}', 'squads_{{ macros.ds_format( macros.ds_add(ds, -1), "%Y-%m-%d", "%d%m%Y") }}', 'squads_{{ macros.ds_format( macros.ds_add(ds, -2), "%Y-%m-%d", "%d%m%Y") }}', 'squads_{{ macros.ds_format( macros.ds_add(ds, -3), "%Y-%m-%d", "%d%m%Y") }}', 'squads_{{ macros.ds_format( macros.ds_add(ds, -4), "%Y-%m-%d", "%d%m%Y") }}') ''' select_table_from_mysql = PythonOperator( task_id='select_table_from_mysql', python_callable=get_table_from_mysql, provide_context=True, templates_dict={'sql': select_table_from_mysql_sql}, dag=dag ) # XCom 'select_table_from_mysql' 'table_name' sensor_jh_squad_sql = ''' SELECT 1 FROM jungle_logs.{{ task_instance.xcom_pull(task_ids='select_table_from_mysql', key='table_name') }} LIMIT 1 '''
XComテクノロジーを使用するもう1つの例は、PythonOperatorから送信されたテキストを含む電子メール通知を送信することです。
kwargs['ti'].xcom_push(key='mail_body', value=mail_body)
そして、EmailOperatorオペレーター内のメッセージテキストの受信は次のとおりです。
email_notification_lost_keys = EmailOperator( task_id='email_notification_lost_keys', to=alert_mails, subject='[airflow] Lost keys', html_content='''{{ task_instance.xcom_pull(task_ids='find_lost_keys', key='mail_body') }}''', dag=dag )
おわりに
分岐方法、タスク間の通信、置換パターンについて話しました。 組み込みのエアフローメカニズムを使用すると、DAGの実装の一般的な概念から逸脱することなく、さまざまな問題を解決できます。 気流の興味深いニュアンスはこれで終わりではありません。 私の同僚と私は、このトピックに関する次の記事のアイデアを持っています。 このツールに興味がある場合は、次に読みたい内容を正確に書いてください。