ããã«ã¡ã¯ãHabrïŒ ãã®èšäºã§ã¯ãããšãã°äŒæ¥ã®DWHã€ã³ãã©ã¹ãã©ã¯ãã£ãDataLakeã§ãããããŒã¿åŠçããã»ã¹ãéçºããããã®åªããããŒã«ã«ã€ããŠèª¬æããŸãã Apache AirflowïŒä»¥éAirflowïŒã«ã€ããŠã§ãã 圌ã¯Habréã«äžåœã«æ³šæã奪ãããŠãããäž»ãªéšåã§ã¯ãETL / ELTããã»ã¹ã®ã¹ã±ãžã¥ãŒã©ãŒãéžæããéã«å°ãªããšãAirflowãæ€èšããå¿ èŠãããããšãçŽåŸãããããšããŸãã
ç§ã¯ä»¥åãTinkoff Bankã§åããŠãããšãã«DWHã®ãããã¯ã«é¢ããäžé£ã®èšäºãæžããŸããã çŸåšãç§ã¯Mail.Ru GroupããŒã ã®äžå¡ãšãªããã²ãŒã ã®æ¹åæ§ã«é¢ããããŒã¿åæã®ããã®ãã©ãããã©ãŒã ã®éçºã«æºãã£ãŠããŸãã å®éããã¥ãŒã¹ãšèå³æ·±ããœãªã¥ãŒã·ã§ã³ã衚瀺ãããããããŒã ãšç§ã¯ããã§ããŒã¿åæã®ãã©ãããã©ãŒã ã«ã€ããŠã話ããŸãã
ããããŒã°
ããã§ã¯å§ããŸãããã æ°æµãšã¯äœã§ããïŒ ããã¯ãäœæ¥ããã»ã¹ã®éçºãèšç»ãããã³ç£èŠã®ããã®ã©ã€ãã©ãªïŒãŸãã¯ã©ã€ãã©ãªã®ã»ãã ïŒã§ãã Airflowã®äž»ãªæ©èœïŒPythonã³ãŒãã¯ãããã»ã¹ãèšè¿°ããïŒéçºããïŒããã«äœ¿çšãããŸãã ããã¯ããããžã§ã¯ããšéçºãæŽçããããã®å€ãã®å©ç¹ãæå³ããŸããå®éãïŒããšãã°ïŒETLãããžã§ã¯ãã¯åãªãPythonãããžã§ã¯ãã§ãããç¹å®ã®ã€ã³ãã©ã¹ãã©ã¯ãã£ãããŒã ãµã€ãºãããã³ãã®ä»ã®èŠä»¶ãèæ ®ããŠã奜ããªããã«æŽçã§ããŸãã éå ·ãšããŠããã¹ãŠãç°¡åã§ãã ããšãã°ãPyCharm + Gitã䜿çšããŸãã ããã¯çŸããããšãŠãå¿«é©ã§ãïŒ
次ã«ãAirflowã®äž»èŠãªãšã³ãã£ãã£ãæ€èšããŸãã ãããã®æ¬è³ªãšç®çãç解ããããããã»ã¹ã®ã¢ãŒããã¯ãã£ãæé©ã«ç·šæããŸãã ãããããäž»ãªæ¬è³ªã¯æåéå·¡åã°ã©ãïŒä»¥éãDAGãšåŒã³ãŸãïŒã§ãã
DAG
DAGã¯ãç¹å®ã®ã¹ã±ãžã¥ãŒã«ã§å³å¯ã«å®çŸ©ãããé åºã§å®è¡ããã¿ã¹ã¯ã®äžçš®ã®ã»ãã³ãã£ãã¯ãªçµã¿åããã§ãã Airflowã¯ãDAGããã®ä»ã®ãšã³ãã£ãã£ãæäœããããã®äŸ¿å©ãªWebããŒã¹ã®ã€ã³ã¿ãŒãã§ã€ã¹ãæäŸããŸãã
DAGã¯æ¬¡ã®ããã«ãªããŸãã
éçºè ã¯ãDAGãèšèšãããšãã«ãDAGå ã®ã¿ã¹ã¯ãæ§ç¯ããããªãã¬ãŒã¿ãŒã®ã»ãããäœæããŸãã ããã§ããã1ã€ã®éèŠãªãšã³ãã£ãã£ã§ããAirflow Operatorã«ã€ããŠèª¬æããŸãã
ãªãã¬ãŒã¿ãŒ
æŒç®åã¯ãäœæãããã¿ã¹ã¯ã€ã³ã¹ã¿ã³ã¹ã«åºã¥ãããšã³ãã£ãã£ã§ãããã¿ã¹ã¯ã€ã³ã¹ã¿ã³ã¹ã®å®è¡äžã«äœãèµ·ããããèšè¿°ããŸãã GitHubã䜿çšãããšã¢ãããŒãªãªãŒã¹ã«ã¯ãããã«äœ¿çšã§ããäžé£ã®ãªãã¬ãŒã¿ãŒãå«ãŸããŠããŸãã äŸïŒ
- BashOperatorã¯ãbashã³ãã³ããå®è¡ããããã®æŒç®åã§ãã
- PythonOperatorã¯ãPythonã³ãŒããåŒã³åºãããã®æŒç®åã§ãã
- EmailOperator-é»åã¡ãŒã«ãéä¿¡ããããã®æŒç®åã
- HTTPOperatorã¯ãhttpèŠæ±ãåŠçããããã®æŒç®åã§ãã
- SqlOperator-SQLã³ãŒããå®è¡ããããã®æŒç®åã
- ã»ã³ãµãŒ-ã€ãã³ãïŒé©åãªæéãå¿ èŠãªãã¡ã€ã«ã®å€èŠ³ãããŒã¿ããŒã¹å ã®è¡ãAPIããã®å¿çãªã©ïŒãåŸ æ©ãããªãã¬ãŒã¿ãŒã
ããå ·äœçãªæŒç®åããããŸãïŒDockerOperatorãHiveOperatorãS3FileTransferOperatorãPrestoToMysqlOperatorãSlackOperatorã
ãŸããç¬èªã®ç¹æ§ã«çŠç¹ãåãããŠãªãã¬ãŒã¿ãŒãéçºãããããžã§ã¯ãã§äœ¿çšããããšãã§ããŸãã ããšãã°ãMongoDBToHiveViaHdfsTransferãMongoDBããHiveã«ããã¥ã¡ã³ãããšã¯ã¹ããŒãããããã®æŒç®åãClickHouseãæäœããããã®ããã€ãã®æŒç®åCHLoadFromHiveOperatorããã³CHTableLoaderOperatorãäœæããŸããã å®éãåºæ¬çãªæŒç®åã«åºã¥ããŠé »ç¹ã«äœ¿çšãããã³ãŒãããããžã§ã¯ãã«è¡šç€ºããããšããã«ããããæ°ããæŒç®åã«ã¢ã»ã³ãã«ããæ¹æ³ãèããããšãã§ããŸãã ããã«ããããããªãéçºãç°¡åã«ãªãããããžã§ã¯ãå ã®ãªãã¬ãŒã¿ãŒã®ã©ã€ãã©ãªãŒãè£å ããŸãã
ããã«ãã¿ã¹ã¯ã®ããããã¹ãŠã®ã€ã³ã¹ã¿ã³ã¹ãå®è¡ããå¿ èŠããããŸãã次ã«ãã¹ã±ãžã¥ãŒã©ã«çŠç¹ãåœãŠãŸãã
ãã©ã³ããŒ
Airflowã®ã¿ã¹ã¯ã¹ã±ãžã¥ãŒã©ã¯Celeryäžã«æ§ç¯ãããŠããŸãã Celeryã¯ããã¥ãŒãšã¿ã¹ã¯ã®éåæããã³åæ£å®è¡ãæŽçã§ããPythonã©ã€ãã©ãªã§ãã æ°æµåŽã§ã¯ããã¹ãŠã®ã¿ã¹ã¯ãããŒã«ã«åå²ãããŸãã ããŒã«ã¯æåã§äœæãããŸãã ååãšããŠããããã®ç®æšã¯ããœãŒã¹ã§ã®äœæ¥ã®è² è·ãå¶éããããDWHå ã®ã¿ã¹ã¯ãé¡ååããããšã§ãã ããŒã«ã¯ãWebã€ã³ã¿ãŒãã§ãŒã¹ãä»ããŠç®¡çã§ããŸãã
åããŒã«ã«ã¯ã¹ãããæ°ã«å¶éããããŸãã DAGãäœæãããšãããŒã«ãäžããããŸãïŒ
ALERT_MAILS = Variable.get("gv_mail_admin_dwh") DAG_NAME = 'dma_load' OWNER = 'Vasya Pupkin' DEPENDS_ON_PAST = True EMAIL_ON_FAILURE = True EMAIL_ON_RETRY = True RETRIES = int(Variable.get('gv_dag_retries')) POOL = 'dma_pool' PRIORITY_WEIGHT = 10 start_dt = datetime.today() - timedelta(1) start_dt = datetime(start_dt.year, start_dt.month, start_dt.day) default_args = { 'owner': OWNER, 'depends_on_past': DEPENDS_ON_PAST, 'start_date': start_dt, 'email': ALERT_MAILS, 'email_on_failure': EMAIL_ON_FAILURE, 'email_on_retry': EMAIL_ON_RETRY, 'retries': RETRIES, 'pool': POOL, 'priority_weight': PRIORITY_WEIGHT } dag = DAG(DAG_NAME, default_args=default_args) dag.doc_md = __doc__
DAGã¬ãã«ã§æå®ãããããŒã«ã¯ãã¿ã¹ã¯ã¬ãã«ã§åå®çŸ©ã§ããŸãã
Airflowã®ãã¹ãŠã®ã¿ã¹ã¯ã®ã¹ã±ãžã¥ãŒãªã³ã°ãæ
åœããå¥ã®ããã»ã¹ïŒã¹ã±ãžã¥ãŒã©ãŒïŒããããŸãã å®éãã¹ã±ãžã¥ãŒã©ã¯ãå®è¡ããã¿ã¹ã¯ãèšå®ãããã¹ãŠã®ã¡ã«ããºã ãåŠçããŸãã ã¿ã¹ã¯ã¯ãå®è¡ã«å
¥ãåã«ããã€ãã®æ®µéãçµãŸãã
- åã®ã¿ã¹ã¯ã¯DAGã§å®äºããŠãããæ°ããã¿ã¹ã¯ããã¥ãŒã«å ¥ããããšãã§ããŸãã
- ãã¥ãŒã¯ã¿ã¹ã¯ã®åªå 床ã«å¿ããŠãœãŒãããïŒåªå 床ã管çã§ããŸãïŒãããŒã«ã«ç©ºãã¹ããããããã°ãã¿ã¹ã¯ãåäœãããããšãã§ããŸãã
- ç¡æã®ã»ããªãããå Žåãã¿ã¹ã¯ã¯ããã«è¡ããŸãã ã¿ã¹ã¯ã§ããã°ã©ã ããäœæ¥ã¯ã1ã€ãŸãã¯å¥ã®æŒç®åã®äœ¿çšãéå§ããŸãã
ç°¡åã§ãã
ã¹ã±ãžã¥ãŒã©ã¯ãããŸããŸãªãã¹ãŠã®DAGããã³DAGå ã®ãã¹ãŠã®ã¿ã¹ã¯ã§å®è¡ãããŸãã
ã¹ã±ãžã¥ãŒã©ãDAGãšã®é£æºãéå§ããã«ã¯ãDAGãã¹ã±ãžã¥ãŒã«ãèšå®ããå¿ èŠããããŸãã
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')
@once
ã @hourly
ã @daily
ã @weekly
ã @monthly
ã @yearly
äžé£ã®æ¢è£œã®ããªã»ããããããŸãã
cronåŒã䜿çšããããšãã§ããŸãã
dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')
å®è¡æ¥
Airflowã®ä»çµã¿ãç解ããã«ã¯ãDAGã®å®è¡æ¥ãäœã§ããããç解ããããšãéèŠã§ãã Airflowã§ã¯ãDAGã«ã¯å®è¡æ¥ãã£ã¡ã³ã·ã§ã³ããããŸããã€ãŸããDAGã®ã¹ã±ãžã¥ãŒã«ã«å¿ããŠãå®è¡æ¥ããšã«ã¿ã¹ã¯ã€ã³ã¹ã¿ã³ã¹ãäœæãããŸãã ãŸããå®è¡æ¥ããšã«ã¿ã¹ã¯ãç¹°ãè¿ãå®è¡ã§ããŸããããšãã°ãDAGã¯è€æ°ã®å®è¡æ¥ã§åæã«åäœã§ããŸãã ããã¯ããã«æ確ã«è¡šç€ºãããŸãã
æ®å¿µãªããïŒãããŠå¹žããªããšã«ãç¶æ³ã«ãã£ãŠç°ãªããŸãïŒãDAGã§ã®ã¿ã¹ã¯ã®å®è£ ãæ£ããå Žåãåã®å®è¡æ¥ã§ã®å®è¡ã¯èª¿æŽãèæ ®ã«å ¥ããŸãã ããã¯ãæ°ããã¢ã«ãŽãªãºã ã䜿çšããŠéå»ã®æéã®ããŒã¿ãåèšç®ããå¿ èŠãããå Žåã¯è¯ãã§ãããçµæã®åçŸæ§ã倱ããããããæªãã§ãïŒãã¡ãããå¿ èŠãªããŒãžã§ã³ã®ãœãŒã¹ãGitããè¿ããŠå¿ èŠãªãã®ãããäžåºŠèšç®ããå¿ èŠã¯ãããŸããïŒã
ã¿ã¹ã¯çæ
DAGã®å®è£ ã¯Pythonã³ãŒãã§ãããããããšãã°ã·ã£ãŒãã£ã³ã°ããããœãŒã¹ã§äœæ¥ãããšãã«ã³ãŒãã®éãæžããããã®éåžžã«äŸ¿å©ãªæ¹æ³ããããŸãã 3ã€ã®MySQLã·ã£ãŒãããœãŒã¹ãšããŠäœ¿çšããããããã«ã¢ã¯ã»ã¹ããŠããŒã¿ãååŸããå¿ èŠããããŸãã ããã«ãç¬ç«ããŠäžŠè¡ããŠã DAGã®Pythonã³ãŒãã¯æ¬¡ã®ããã«ãªããŸãã
connection_list = lv.get('connection_list') export_profiles_sql = ''' SELECT id, user_id, nickname, gender, {{params.shard_id}} as shard_id FROM profiles ''' for conn_id in connection_list: export_profiles = SqlToHiveViaHdfsTransfer( task_id='export_profiles_from_' + conn_id, sql=export_profiles_sql, hive_table='stg.profiles', overwrite=False, tmpdir='/data/tmp', conn_id=conn_id, params={'shard_id': conn_id[-1:], }, compress=None, dag=dag ) export_profiles.set_upstream(exec_truncate_stg) export_profiles.set_downstream(load_profiles)
DAGã¯æ¬¡ã®ããã«ãªããŸãã
ãã®å Žåãèšå®ã調æŽããŠDAGãæŽæ°ããã ãã§ãã·ã£ãŒããè¿œå ãŸãã¯åé€ã§ããŸãã 䟿å©ã«ïŒ
ããè€éãªã³ãŒãçæã䜿çšã§ããŸããããšãã°ãããŒã¿ããŒã¹ã®åœ¢ã§ãœãŒã¹ãæäœããããããŒãã«æ§é ãããŒãã«ãæäœããã¢ã«ãŽãªãºã ãèšè¿°ããããDWHã€ã³ãã©ã¹ãã©ã¯ãã£ã®æ©èœãèæ ®ããŠãNããŒãã«ãã¹ãã¬ãŒãžã«ããŒãããããã»ã¹ãçæãããã§ããŸãã ãŸãã¯ãããšãã°ããªã¹ã圢åŒã®ãã©ã¡ãŒã¿ãŒã®æäœããµããŒãããªãAPIã®æäœã§ã¯ããã®ãªã¹ãããDAGã§Nåã®ã¿ã¹ã¯ãçæããAPIã®ãªã¯ãšã¹ãã®äžŠåæ§ãããŒã«ã«å¶éããAPIããå¿ èŠãªããŒã¿ãæœåºã§ããŸãã æè»ã«ïŒ
ãªããžããª
Airflowã«ã¯ãã¿ã¹ã¯ãDAGãæ¥ç¶èšå®ãã°ããŒãã«å€æ°ãªã©ã®ã¹ããŒã¿ã¹ãä¿åããç¬èªã®ããã¯ãšã³ããªããžããªãããŒã¿ããŒã¹ïŒMySQLãŸãã¯PostgresãããããPostgresããããŸãïŒããããŸãã Airflowã®ãªããžããªã¯éåžžã«ã·ã³ãã«ïŒçŽ20ããŒãã«ïŒã§ãããç¬èªã®ããã»ã¹ãæ§ç¯ãããå Žåã«äŸ¿å©ã ãšèšããŸãã ã¯ãšãªã®äœææ¹æ³ãç解ããåã«é·ãéå³ããå¿ èŠããã£ãInformaticaãªããžããªå ã®100,500åã®ããŒãã«ãæãåºããŸãã
ã¢ãã¿ãªã³ã°
ãªããžããªãåçŽãªãããèªåã«éœåã®è¯ãã¿ã¹ã¯ãç£èŠããããã»ã¹ãèªåã§æ§ç¯ã§ããŸãã Zeppelinã®ã¡ã¢åž³ã䜿çšããŠãã¿ã¹ã¯ã®ã¹ããŒã¿ã¹ã確èªããŸãã
ããã¯ãAirflowèªäœã®Webã€ã³ã¿ãŒãã§ã€ã¹ã§ããå¯èœæ§ããããŸãã
ãšã¢ãããŒã³ãŒããéããŠãããããTelegramã«ã¢ã©ãŒããè¿œå ããŸããã ã¿ã¹ã¯ã®åäœæ¥ã€ã³ã¹ã¿ã³ã¹ã¯ããšã©ãŒãçºçããå ŽåãéçºããŒã ãšãµããŒãããŒã å šäœã§æ§æãããTelegramã®ã°ã«ãŒãã«ã¹ãã éä¿¡ããŸãã
Airflowã®ã¿ã¹ã¯ã®å šäœåã§ããZeppelinãä»ããŠãTelegramïŒå¿ èŠãªå ŽåïŒãä»ããŠè¿ éãªå¿çãååŸããŸãã
åèš
æ°æµã¯äž»ã«ãªãŒãã³ãœãŒã¹ã§ãããããããå¥è·¡ãæåŸ ããå¿ èŠã¯ãããŸããã å®çšçãªãœãªã¥ãŒã·ã§ã³ãæ§ç¯ããããã«æéãšåŽåãè²»ããæºåãããŠãã ããã éæç®æšãç§ãä¿¡ããŠãããã¯äŸ¡å€ãããã éçºé床ãæè»æ§ãæ°ããããã»ã¹ã®è¿œå ã®å®¹æã-ããªãã¯ããã奜ãã«ãªãã§ãããã ãã¡ããããããžã§ã¯ãã®çµç¹ããšã¢ãããŒèªäœã®å®å®æ§ã«å€ãã®æ³šæãæãå¿ èŠããããŸããå¥è·¡ã¯ãããŸããã
çŸåšãæ¯æ¥çŽ6.5äžã®ã¿ã¹ã¯ãå®è¡ãããšã¢ãããŒããããŸãã ãããã¯æ§è³ªããŸã£ããç°ãªããŸãã ããŸããŸãªéåžžã«å ·äœçãªãœãŒã¹ããã¡ã€ã³DWHã«ããŒã¿ãèªã¿èŸŒãã¿ã¹ã¯ããããŸããã¡ã€ã³DWHã«ãŠã£ã³ããŠãèšç®ããã¿ã¹ã¯ããããŸããé«éDWHã«ããŒã¿ãå ¬éããã¿ã¹ã¯ããããŸããå€ãã®ç°ãªãã¿ã¹ã¯ããããŸãã æ°åã§èšãã°ãããã¯DWHïŒHadoopïŒå ã®ããŸããŸãªè€éãã®230ã® ELTã¿ã¹ã¯ãçŽ250ã®ãœãŒã¹ããŒã¿ããŒã¹ã§ã ãããã¯ãDWHãšELTã§ETLããŒã¿åŠçã«åãããŠãã4人ã®ETLéçºè ã®ããŒã ã§ãã DWHå ã®ããŒã¿åŠçãšããã¡ãããµãŒãã¹ã€ã³ãã©ã¹ãã©ã¯ãã£ãæ±ãå¥ã®ç®¡çè ã
ä»åŸã®èšç»
ããã»ã¹ã®æ°ã¯å¿ ç¶çã«å¢å ããŠããããšã¢ãããŒã€ã³ãã©ã¹ãã©ã¯ãã£ã«é¢ããŠè¡ãäž»ãªããšã¯ã¹ã±ãŒãªã³ã°ã§ãã Airflowã¯ã©ã¹ã¿ãŒãæ§ç¯ããCeleryã¯ãŒã«ãŒã«ããã€ãã®ã¬ãã°ãå²ãåœãŠãã¿ã¹ã¯ã¹ã±ãžã¥ãŒãªã³ã°ããã»ã¹ãšãªããžããªã§è€è£œããããäœæããŸãã
ãšãããŒã°
ãã¡ãããããã¯ããšã¢ãããŒã«ã€ããŠäŒãããããšãšã¯ã»ã©é ãã§ãããäž»ãªãã€ã³ãã匷調ããããšããŸããã é£æ¬²ã¯é£äºãšãšãã«æ¥ãŸãã®ã§ãè©ŠããŠã¿ãŠãã ããã