気流での動的DAG生成

みなさんこんにちは! 私の名前はアントンです。ロステレコムでは、中央データウェアハウスを開発しています。 リポジトリはモジュールで構成され、オーケストレーターはいくつかのInformaticaインスタンスを使用します。そのうちのいくつかは、オープンソースソリューションへの移行の一環としてAirflowに転送したいものです。 InformaticaとAirflowは根本的に異なるツールであるため、既存の実装を取得して繰り返すことはそれほど簡単ではありません。 一方で、現在の実装に可能な限り近いワークフローを取得し、一方で、最も興味深い最初のエアフローの原理である柔軟性を提供するダイナミズムを使用したいと考えました。







この短い記事では、Airflowでの真に動的なDAGの生成について説明します。 主にインドのこのテーマに関するインターネットの開発者からの多くの記事があります。これは「Airflowで動的にdagsを生成できます。例:<10個のHelloWorldタスク/ dagsを生成する例>」という形式の資料です。 しかし、Dagの生成に興味がありました。Dagは、変数の数とタスク名に応じて変化します。













現在、Airflowは、リポジトリにさらにアップロードするためにリモートソースサーバーでデータパケットを生成するモジュールを起動するために実装されています。 単純なスケジュールに従って実行されるため、詳細に検討することはあまり面白くありません。 また、中間のステージングにレイヤーでさらにロードするためにデータパケットを配信するモジュールのエアフローによるオーケストレーションが間もなく導入されます。 ここでは、一連のレーキを待っています。その説明はどこにも見つからず、私の経験を共有したいと思います。







Airflow onHabréには、Mail.ruの開発者からの基本的な事柄が詳しく説明されている記事がいくつかあります。







気流の一般的な説明

分岐、jinjaを介したパラメーター化、およびXcomを介したDAG通信







小さな用語集:



DAG / DAGは、有向非巡回グラフです。 この場合、相互に依存し、サイクルを形成しない一連のアクションを意味します。

SubDAG / Sabdag -DAGと同じですが、別のDAG内にあり、親DAGの一部として(タスクとして)起動され、個別のスケジュールはありません。

オペレーター/オペレーター -特定のアクションを実行するDAGの特定のステップ。 たとえば、PythonOperator。

タスク/タスク -DAGを開始するときのオペレーターの特定のインスタンスは、Webインターフェースで小さな四角形として視覚化されます。 たとえば、PythonOperatorはrun_taskと呼ばれ、DAG check_dagで実行されます。







動的タスク生成のアイデア、問題、欠点



入力データ:







オーケストラリポジトリにテーブルがあります。PKG_TABLEと呼びましょう。

データパケットをダウンロードする準備ができているエントリをPKG_TABLEテーブルに追加するメカニズムがあります。







私たちが望んだもの:







DAG。すぐにダウンロードできるパッケージ用に生成され、ダウンロードを開始します(ネタバレ:最終的にすべてが判明しました)。







以下のコードを使用して、LatestOnlyOperatorタスクと、pkg_subdag_factory関数の実行時に作成される依存タスクsubdagで構成されるdagを生成します。これは、PKG_TABLEテーブルからパッケージのリストを受け取り、いくつかのPythonOperatorsを生成します。 ダウンロードするパッケージがない場合、DummyOperatorが生成されます。







最初のバージョンを1つのPythonOperatorで作成し、Airflowを使用して詳細なワークフローに再配布することにしました。







# -*- coding: utf-8 -*- """  DAG    """ from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.hooks.oracle_hook import OracleHook from datetime import datetime, timedelta import logging from scripts.lib import run_load, select_pkg_data def pkg_subdag_factory( oracle_hook, parent_dag_name, child_dag_name, start_date, schedule_interval, param_dict): """ ,  DAG    PythonOperator\` (1  - 1 PythonOperator)  : oracle_hook - airflow.hooks.oracle_hook.OracleHook parent_dag_name -  ""  child_dag_name -    start_date -       schedule_interval -      param_dict -     """ dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=schedule_interval, start_date=start_date, catchup=False ) logging.info('selecting pkg data...') pkg_set = select_pkg_data(oracle_hook) if len(pkg_set): logging.info('pkg_set:') logging.info(pkg_set) for pkg in pkg_set: pkg_id = pkg[1] pkg_dict = {'pkg_data_' + str(pkg_id): pkg} param_dict.update(pkg_dict) task_name = 'pkg_' + str(pkg_id) PythonOperator( task_id=task_name, python_callable=run_load, op_kwargs={ 'oracle_hook': oracle_hook, 'param_dict': param_dict, 'pkg_id': pkg_id }, retries=0, dag=dag ) else: logging.info('Undelivered packages not found') DummyOperator(task_id='no_packages_dummy', retries=0, dag=dag) return dag interval = '*/10 * * * *' args = { 'owner': 'airflow', 'start_date': datetime(2018, 11, 12) } oracle_hook = OracleHook('ora_meta') main_dag_name = 'load' load_dag_name = 'load_packages' param_dict = { #       } main_dag = DAG( dag_id=main_dag_name, default_args=args, schedule_interval=interval, catchup=False ) subdag = SubDagOperator( subdag=pkg_subdag_factory( oracle_hook, main_dag_name, load_dag_name, args['start_date'], interval, param_dict ), task_id=load_dag_name, dag=main_dag ) #  ,       latest_only = LatestOnlyOperator(task_id='latest_only', dag=main_dag) subdag.set_upstream(latest_only)
      
      





次のスクリーンショットは、結果としてこれがどのように見えるかを示しています。

DAGの外観:













配達用の荷物がない場合のサブダグの外観:













配達用のパッケージがある場合のサブダグの外観:













問題とニュアンス





ドキュメントにあるように、「Airflowの重要な機能は、これらのDAG実行がアトミックでべき等のアイテムである<...>」、つまり「DAGは変更されずに生成されることが理解される」ということです。 この「キー機能」に違反したという事実により、次のことを学びました。









ログ内のExecution_dateおよび実際の開始時刻



最後に、Airflowの別のニュアンスで説明します。これは、他の記事で混同され、簡単な言葉で説明されていません-execution_date(すべてのログ、インターフェースなどに表示されます)および実際の開始時間。 原則として、説明はエアフロードキュメントFAQにありますが、結果は明らかではないため、説明が必要なようです。







ドキュメント :「スケジューラーは期間の終わりにジョブを開始します」

結果 :たとえば、@ dailyなどのスケジュールでdagを作成すると、execution_dateが「2018-01-01 00:00:00」の実行は、実際には「2018-02-01 00:00:00」を実行します。







便利なリンク:



キャッチアップドキュメント

LatestOnlyOperatorドキュメント

LatestOnlyOperatorに関するその他のドキュメント

LatestOnlyOperatorの使用例

いくつかのニュアンス

前回の起動の依存関係に関する質問

動的生成に関する小さな例

簡単な説明での動的生成に関する質問








All Articles