Experimental APIを使用してAirflowでDAGトリガーを作成する方法

教育プログラムを準備する際、いくつかのツールを使用する上で定期的に困難に直面します。 そして、彼らに出会ったその瞬間には、この問題に対処するのに役立つドキュメントや記事が常に十分とは限りません。







これは、たとえば2015年に当てはまりました。ビッグデータスペシャリストプログラムでは、35人の同時ユーザーのためにSparkでHadoopクラスターを使用しました。 YARNを使用してこのようなユーザーケースの下で調理する方法は明確ではありませんでした。 その結果、自分で考え出して道を歩いた 、彼らはHabréに投稿しMoscow Spark Meetupにも出演しまし







背景



今回は、別のプログラムであるデータエンジニアについて説明します。 参加者は、ラムダとカッパという2種類のアーキテクチャを構築します。 また、lamdbaアーキテクチャでは、バッチ処理の一部として、Airflowを使用してログをHDFSからClickHouseに転送します。







一般的にすべてが良いです。 パイプラインを構築させます。 しかし、「しかし」あります。私たちのプログラムはすべて、学習プロセス自体の面で技術的です。 ラボをチェックするには、自動チェッカーを使用します。参加者は自分のアカウントに移動し、[チェック]ボタンをクリックする必要があります。しばらくすると、自分が行ったことに対する何らかのフィードバックが表示されます。 そして、私たちが問題にアプローチし始めるのはこの瞬間です。







このラボが次のように構成されていることを確認します。参加者のKafkaに制御データパケットを送信し、GobblinがこのデータパケットをHDFSに転送し、Airflowがこのデータパケットを取得してClickHouseに入れます。 秘Theは、Airflowがこれをリアルタイムで実行するのではなく、スケジュールどおりに実行することです。15分ごとにファイルのパックを取り、それを投入します。







ここで作業しているチェッカーの要求に応じて、何らかの方法でDAGを自分でトリガーする必要があることがわかりました。 グーグルでは、後のバージョンのAirflowには、いわゆるExperimental APIがあることがわかりました。 もちろん、 experimental



という言葉は怖いように聞こえますが、どうすればいいのか...突然飛ぶでしょう。







次に、Airflowのインストールから、Experimental APIを使用してDAGをトリガーするPOSTリクエストの生成までの全体の方法を説明します。 Ubuntu 16.04を使用します。







1.エアフローのインストール



Python 3とvirtualenvがあることを確認しましょう。







 $ python3 --version Python 3.6.6 $ virtualenv --version 15.2.0
      
      





このいずれかが欠落している場合は、インストールします。







次に、Airflowを引き続き使用するディレクトリを作成します。







 $ mkdir <your name of directory> $ cd /path/to/your/new/directory $ virtualenv -p which python3 venv $ source venv/bin/activate (venv) $
      
      





エアフローをインストールします。







 (venv) $ pip install airflow
      
      





作業したバージョン:1.10。







ここで、DAGファイルとAirflowプラグインが配置されるairflow_home



ディレクトリを作成する必要があります。 ディレクトリを作成しAIRFLOW_HOME



、環境変数AIRFLOW_HOME



設定します。







 (venv) $ cd /path/to/my/airflow/workspace (venv) $ mkdir airflow_home (venv) $ export AIRFLOW_HOME=<path to airflow_home>
      
      





次のステップは、SQLiteでデータストリームデータベースを作成および初期化するコマンドを実行することです。







 (venv) $ airflow initdb
      
      





データベースは、デフォルトでairflow.db



に作成されます。







気流がインストールされているかどうかを確認します。







 $ airflow version [2018-11-26 19:38:19,607] {__init__.py:57} INFO - Using executor SequentialExecutor [2018-11-26 19:38:19,745] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt [2018-11-26 19:38:19,771] {driver.py:123} INFO - Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0
      
      





コマンドが機能した場合、 airflow.cfg



構成ファイルを作成しAIRFLOW_HOME









 $ tree . ├── airflow.cfg └── unittests.cfg
      
      





AirflowにはWebインターフェイスがあります。 次のコマンドを実行して起動できます。







 (venv) $ airflow webserver --port 8081
      
      





これで、たとえば<hostname:8081>



ように、Airflowが起動されたホストのポート8081でブラウザーのWebインターフェースにアクセスできます。







2. Experimental APIの使用



これで、Airflowが構成され、準備ができました。 ただし、Experimental APIも実行する必要があります。 チェッカーはPythonで記述されているため、 requests



ライブラリを使用してすべてのリクエストがPythonで実行されます。







実際、APIはすでに単純なクエリで機能しています。 たとえば、このようなリクエストにより、その動作をテストできます。







 >>> import requests >>> host = <your hostname> >>> airflow_port = 8081 #   ,    8080 >>> requests.get('http://{}:{}/{}'.format(host, airflow_port, 'api/experimental/test').text 'OK'
      
      





そのようなメッセージを受け取った場合、これはすべてが機能していることを意味します。







ただし、DAGを有効にしたい場合、この種の要求は認証なしでは作成できないという事実に直面します。







これを行うには、いくつかのアクションを実行する必要があります。







まず、これを構成に追加する必要があります。







 [api] auth_backend = airflow.contrib.auth.backends.password_auth
      
      





次に、管理者権限を持つユーザーを作成する必要があります。







 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.Admin()) >>> user.username = 'new_user_name' >>> user.password = 'set_the_password' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
      
      





次に、DAGトリガーの実行を許可される通常の権限を持つユーザーを作成する必要があります。







 >>> import airflow >>> from airflow import models, settings >>> from airflow.contrib.auth.backends.password_auth import PasswordUser >>> user = PasswordUser(models.User()) >>> user.username = 'newprolab' >>> user.password = 'Newprolab2019!' >>> session = settings.Session() >>> session.add(user) >>> session.commit() >>> session.close() >>> exit()
      
      





これですべて準備完了です。







3. POSTリクエストの開始



POSTリクエスト自体は次のようになります。







 >>> dag_id = newprolab >>> url = 'http://{}:{}/{}/{}/{}'.format(host, airflow_port, 'api/experimental/dags', dag_id, 'dag_runs') >>> data = {"conf":"{\"key\":\"value\"}"} >>> headers = {'Content-type': 'application/json'} >>> auth = ('newprolab', 'Newprolab2019!') >>> uri = requests.post(url, data=json.dumps(data), headers=headers, auth=auth) >>> uri.text '{\n "message": "Created <DagRun newprolab @ 2019-03-27 10:24:25+00:00: manual__2019-03-27T10:24:25+00:00, externally triggered: True>"\n}\n'
      
      





要求は正常に処理されました。







したがって、DAGに処理のための時間を与え、ClickHouseテーブルにリクエストを送信して、データの制御パケットをキャッチしようとします。







検証が完了しました。








All Articles