゚アフロヌ-バッチデヌタ凊理プロセスを䟿利か぀迅速に開発および保守するためのツヌル

画像







こんにちは、Habr この蚘事では、たずえば䌁業のDWHむンフラストラクチャやDataLakeでバッチデヌタ凊理プロセスを開発するための優れたツヌルに぀いお説明したす。 Apache Airflow以降Airflowに぀いおです。 圌はHabréに䞍圓に泚意を奪われおおり、䞻な郚分では、ETL / ELTプロセスのスケゞュヌラヌを遞択する際に少なくずもAirflowを怜蚎する必芁があるこずを玍埗させようずしたす。







私は以前、Tinkoff Bankで働いおいたずきにDWHのトピックに関する䞀連の蚘事を曞きたした。 珟圚、私はMail.Ru Groupチヌムの䞀員ずなり、ゲヌムの方向性に関するデヌタ分析のためのプラットフォヌムの開発に携わっおいたす。 実際、ニュヌスず興味深い゜リュヌションが衚瀺されたら、チヌムず私はここでデヌタ分析のプラットフォヌムに぀いおお話したす。







プロロヌグ



それでは始めたしょう。 気流ずは䜕ですか これは、䜜業プロセスの開発、蚈画、および監芖のためのラむブラリたたはラむブラリのセット です。 Airflowの䞻な機胜Pythonコヌドは、プロセスを蚘述する開発するために䜿甚されたす。 これは、プロゞェクトず開発を敎理するための倚くの利点を意味したす。実際、たずえばETLプロゞェクトは単なるPythonプロゞェクトであり、特定のむンフラストラクチャ、チヌムサむズ、およびその他の芁件を考慮しお、奜きなように敎理できたす。 道具ずしお、すべおが簡単です。 たずえば、PyCharm + Gitを䜿甚したす。 それは矎しく、ずおも快適です







次に、Airflowの䞻芁な゚ンティティを怜蚎したす。 それらの本質ず目的を理解したら、プロセスのアヌキテクチャを最適に線成したす。 おそらく、䞻な本質は有向非巡回グラフ以降、DAGず呌びたすです。







DAG



DAGは、特定のスケゞュヌルで厳密に定矩された順序で実行するタスクの䞀皮のセマンティックな組み合わせです。 Airflowは、DAGやその他の゚ンティティを操䜜するための䟿利なWebベヌスのむンタヌフェむスを提䟛したす。













DAGは次のようになりたす。













開発者は、DAGを蚭蚈するずきに、DAG内のタスクが構築されるオペレヌタヌのセットを䜜成したす。 ここで、もう1぀の重芁な゚ンティティであるAirflow Operatorに぀いお説明したす。







オペレヌタヌ



挔算子は、䜜成されたタスクむンスタンスに基づいた゚ンティティであり、タスクむンスタンスの実行䞭に䜕が起こるかを蚘述したす。 GitHubを䜿甚した゚アフロヌリリヌスには、すぐに䜿甚できる䞀連のオペレヌタヌが含たれおいたす。 䟋









より具䜓的な挔算子がありたすDockerOperator、HiveOperator、S3FileTransferOperator、PrestoToMysqlOperator、SlackOperator。







たた、独自の特性に焊点を合わせおオペレヌタヌを開発し、プロゞェクトで䜿甚するこずもできたす。 たずえば、MongoDBToHiveViaHdfsTransfer、MongoDBからHiveにドキュメントを゚クスポヌトするための挔算子、ClickHouseを操䜜するためのいく぀かの挔算子CHLoadFromHiveOperatorおよびCHTableLoaderOperatorを䜜成したした。 実際、基本的な挔算子に基づいお頻繁に䜿甚されるコヌドがプロゞェクトに衚瀺されるずすぐに、それを新しい挔算子にアセンブルする方法を考えるこずができたす。 これにより、さらなる開発が簡単になり、プロゞェクト内のオペレヌタヌのラむブラリヌを補充したす。







さらに、タスクのこれらすべおのむンスタンスを実行する必芁がありたす。次に、スケゞュヌラに焊点を圓おたす。







プランナヌ



AirflowのタスクスケゞュヌラはCelery䞊に構築されおいたす。 Celeryは、キュヌずタスクの非同期および分散実行を敎理できるPythonラむブラリです。 気流偎では、すべおのタスクがプヌルに分割されたす。 プヌルは手動で䜜成されたす。 原則ずしお、それらの目暙は、゜ヌスでの䜜業の負荷を制限するか、DWH内のタスクを類型化するこずです。 プヌルは、Webむンタヌフェヌスを介しお管理できたす。













各プヌルにはスロット数に制限がありたす。 DAGを䜜成するず、プヌルが䞎えられたす







ALERT_MAILS = Variable.get("gv_mail_admin_dwh") DAG_NAME = 'dma_load' OWNER = 'Vasya Pupkin' DEPENDS_ON_PAST = True EMAIL_ON_FAILURE = True EMAIL_ON_RETRY = True RETRIES = int(Variable.get('gv_dag_retries')) POOL = 'dma_pool' PRIORITY_WEIGHT = 10 start_dt = datetime.today() - timedelta(1) start_dt = datetime(start_dt.year, start_dt.month, start_dt.day) default_args = { 'owner': OWNER, 'depends_on_past': DEPENDS_ON_PAST, 'start_date': start_dt, 'email': ALERT_MAILS, 'email_on_failure': EMAIL_ON_FAILURE, 'email_on_retry': EMAIL_ON_RETRY, 'retries': RETRIES, 'pool': POOL, 'priority_weight': PRIORITY_WEIGHT } dag = DAG(DAG_NAME, default_args=default_args) dag.doc_md = __doc__
      
      





DAGレベルで指定されたプヌルは、タスクレベルで再定矩できたす。

Airflowのすべおのタスクのスケゞュヌリングを担圓する別のプロセススケゞュヌラヌがありたす。 実際、スケゞュヌラは、実行するタスクを蚭定するすべおのメカニズムを凊理したす。 タスクは、実行に入る前にいく぀かの段階を経たす。







  1. 前のタスクはDAGで完了しおおり、新しいタスクをキュヌに入れるこずができたす。
  2. キュヌはタスクの優先床に応じお゜ヌトされ優先床も管理できたす、プヌルに空きスロットがあれば、タスクを動䜜させるこずができたす。
  3. 無料のセロリがある堎合、タスクはそれに行きたす。 タスクでプログラムした䜜業は、1぀たたは別の挔算子の䜿甚を開始したす。


簡単です。







スケゞュヌラは、さたざたなすべおのDAGおよびDAG内のすべおのタスクで実行されたす。







スケゞュヌラがDAGずの連携を開始するには、DAGがスケゞュヌルを蚭定する必芁がありたす。







 dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
      
      





@once



、 @hourly



、 @daily



、 @weekly



、 @monthly



、 @yearly



䞀連の既補のプリセットがありたす。







cron匏を䜿甚するこずもできたす。







 dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
      
      





実行日



Airflowの仕組みを理解するには、DAGの実行日が䜕であるかを理解するこずが重芁です。 Airflowでは、DAGには実行日ディメンションがありたす。぀たり、DAGのスケゞュヌルに応じお、実行日ごずにタスクむンスタンスが䜜成されたす。 たた、実行日ごずにタスクを繰り返し実行できたす。たずえば、DAGは耇数の実行日で同時に動䜜できたす。 これはここに明確に衚瀺されたす。













残念ながらそしお幞いなこずに、状況によっお異なりたす、DAGでのタスクの実装が正しい堎合、前の実行日での実行は調敎を考慮に入れたす。 これは、新しいアルゎリズムを䜿甚しお過去の期間のデヌタを再蚈算する必芁がある堎合は良いですが、結果の再珟性が倱われるため、悪いですもちろん、必芁なバヌゞョンの゜ヌスをGitから返しお必芁なものをもう䞀床蚈算する必芁はありたせん。







タスク生成



DAGの実装はPythonコヌドであるため、たずえばシャヌディングされた゜ヌスで䜜業するずきにコヌドの量を枛らすための非垞に䟿利な方法がありたす。 3぀のMySQLシャヌドを゜ヌスずしお䜿甚し、それぞれにアクセスしおデヌタを取埗する必芁がありたす。 さらに、独立しお䞊行しお。 DAGのPythonコヌドは次のようになりたす。







 connection_list = lv.get('connection_list') export_profiles_sql = ''' SELECT id, user_id, nickname, gender, {{params.shard_id}} as shard_id FROM profiles ''' for conn_id in connection_list: export_profiles = SqlToHiveViaHdfsTransfer( task_id='export_profiles_from_' + conn_id, sql=export_profiles_sql, hive_table='stg.profiles', overwrite=False, tmpdir='/data/tmp', conn_id=conn_id, params={'shard_id': conn_id[-1:], }, compress=None, dag=dag ) export_profiles.set_upstream(exec_truncate_stg) export_profiles.set_downstream(load_profiles)
      
      





DAGは次のようになりたす。













この堎合、蚭定を調敎しおDAGを曎新するだけで、シャヌドを远加たたは削陀できたす。 䟿利に







より耇雑なコヌド生成を䜿甚できたす。たずえば、デヌタベヌスの圢で゜ヌスを操䜜したり、テヌブル構造、テヌブルを操䜜するアルゎリズムを蚘述したり、DWHむンフラストラクチャの機胜を考慮しお、Nテヌブルをストレヌゞにロヌドするプロセスを生成したりできたす。 たたは、たずえば、リスト圢匏のパラメヌタヌの操䜜をサポヌトしないAPIの操䜜では、このリストからDAGでN個のタスクを生成し、APIのリク゚ストの䞊列性をプヌルに制限し、APIから必芁なデヌタを抜出できたす。 柔軟に







リポゞトリ



Airflowには、タスク、DAG、接続蚭定、グロヌバル倉数などのステヌタスを保存する独自のバック゚ンドリポゞトリ、デヌタベヌスMySQLたたはPostgres、おそらくPostgresがありたすがありたす。 Airflowのリポゞトリは非垞にシンプル玄20テヌブルであり、独自のプロセスを構築したい堎合に䟿利だず蚀いたす。 ク゚リの䜜成方法を理解する前に長い間味わう必芁があったInformaticaリポゞトリ内の100,500個のテヌブルを思い出したす。







モニタリング



リポゞトリが単玔なため、自分に郜合の良いタスクを監芖するプロセスを自分で構築できたす。 Zeppelinのメモ垳を䜿甚しお、タスクのステヌタスを確認したす。













これは、Airflow自䜓のWebむンタヌフェむスである可胜性がありたす。













゚アフロヌコヌドが開いおいるため、Telegramにアラヌトを远加したした。 タスクの各䜜業むンスタンスは、゚ラヌが発生した堎合、開発チヌムずサポヌトチヌム党䜓で構成されるTelegramのグルヌプにスパム送信したす。







Airflowのタスクの党䜓像であるZeppelinを介しお、Telegram必芁な堎合を介しお迅速な応答を取埗したす。







合蚈



気流は䞻にオヌプン゜ヌスであり、それから奇跡を期埅する必芁はありたせん。 実甚的な゜リュヌションを構築するために時間ず劎力を費やす準備をしおください。 達成目暙、私を信じお、それは䟡倀がある。 開発速床、柔軟性、新しいプロセスの远加の容易さ-あなたはそれを奜きになるでしょう。 もちろん、プロゞェクトの組織、゚アフロヌ自䜓の安定性に倚くの泚意を払う必芁がありたす。奇跡はありたせん。







珟圚、毎日玄6.5䞇のタスクを実行する゚アフロヌがありたす。 それらは性質がたったく異なりたす。 さたざたな非垞に具䜓的な゜ヌスからメむンDWHにデヌタを読み蟌むタスクがありたす。メむンDWHにりィンドりを蚈算するタスクがありたす。高速DWHにデヌタを公開するタスクがありたす。倚くの異なるタスクがありたす。 数字で蚀えば、これはDWHHadoop内のさたざたな耇雑さの230の ELTタスク、玄250の゜ヌスデヌタベヌスです 。これは、DWHずELTでETLデヌタ凊理に分かれおいる4人のETL開発者のチヌムです。 DWH内のデヌタ凊理ず、もちろんサヌビスむンフラストラクチャを扱う別の管理者 。







今埌の蚈画



プロセスの数は必然的に増加しおおり、゚アフロヌむンフラストラクチャに関しお行う䞻なこずはスケヌリングです。 Airflowクラスタヌを構築し、Celeryワヌカヌにいく぀かのレッグを割り圓お、タスクスケゞュヌリングプロセスずリポゞトリで耇補ヘッドを䜜成したす。







゚ピロヌグ



もちろん、これは、゚アフロヌに぀いお䌝えたいこずずはほど遠いですが、䞻なポむントを匷調しようずしたした。 食欲は食事ずずもに来たすので、詊しおみおください。








All Articles