当我第一次使用Airflow构建ETL数据管道时,在弄清为什么管道无法运行之后,我经历了许多令人难忘的“啊哈”时刻。由于技术文档无法涵盖所有内容,因此我倾向于通过试错和阅读优秀的源代码来学习新工具。在本文中,我将分享Airflow的许多实用技巧和最佳实践,以帮助您建立更可靠和可扩展的数据管道。
在Airflow中,Airflow调度程序会根据DAG文件中指定的start_date
和schedule_interval
来运行DAG,对于初学者来说,很容易被Airflow的工作计划机制弄糊涂,因为Airflow计划程序在计划时间段的末尾而不是开始时触发DAG运行是不直观的。
调度程序根据start_date
和schedule_interval
计算执行时间,并在满足其时间依赖性时触发DAG。例如,考虑以下示例DAG,该DAG每天在世界标准时间上午7点运行:
default_args = { 'owner': 'xinran.waibel', 'start_date': datetime(2019, 12, 5), } dag = DAG('sample_dag', default_args=default_args, schedule_interval='0 7 * * *')
这个DAG的具体运行时间如下图所示:
DAG的第一次运行将在其计划周期结束时(而不是在开始日期)于2019-12-06的上午7点之后触发。同样,其余的DAG运行将在每天早上7点之后执行。
Airflow中的execution_date
不是实际的运行时间,而是其计划周期的开始时间戳。例如,第一次DAG运行的execution_date
为2019–12–05 07:00:00,尽管它是在2019–12–06上执行的。但是,如果用户手动启动DAG运行,则此手动DAG运行的执行时间将恰好是触发时间。(要判断DAG运行是计划的还是手动触发的,您可以查看其DAG运行ID的前缀:Scheduled
或 manual
)。
基于上述调度机制,您应始终设定start_date
,以确保DAG按预期时间运行。
在Airflow的工作计划中,一个重要的概念是catchup
。在实现DAG的具体逻辑后,如果将catchup
设置为True,Airflow将“回填”所有过去的DAG run。 如果关闭catchup
,Airflow将执行最新的DAG run,忽略之前所有的记录。例如,假设样本DAG在2019–12–08的上午8点被Airflow拾取,如果启动catchup
,则将运行三个DAG run。但是,如果关闭了catchup
,则只会触发scheduled__2019–12–07T07:00:00+00:00
。
有两种方法可以在Airflow中配置catchup
:
scheduler
部分下,设置catchup_by_default = True
(默认)或False
。这个设置将是全局性的,但可以在单独的DAG文件中重写并使用局部设置。dag.catchup = True
或False
dag = DAG('sample_dag', catchup=False, default_args=default_args)
由于启用了catchup
,Airflow可以回填过去的DAG run,并且每个DAG run可以随时手动重新运行,因此确保DAG的幂等性非常重要。幂等性意味着多次运行同一个DAG的结果是相同的。
现在,让我们考虑一个实例DAG,该DAG每天运行Python函数,通过API检索每日营销广告的效果数据并将其加载到数据库中:
with DAG('ads_api', catchup=True, default_args=default_args, schedule_interval='@daily') as dag: def export_api_data(api_token): # Call API to extract yesterday's data from API yesterday = date.today() - timedelta(days=1) data = download_api_data(api_token, date = yesterday) # Load data to database insert_to_database(data) py_task = PythonOperator( task_id='load_yesterday_data', python_callable=export_api_data, op_kwargs={'api_token': secret_api_token} )
Python函数export_api_data
使用datetime
库动态获取昨天的日期,从API下载昨天的广告数据,然后将下载的数据插入到目标数据库中。如果没有回填过去的DAG run并且所有DAG run仅执行一次,则该DAG将具有正确的结果。但是这个DAG违反了幂等性:
start_date
设置为2019–12–01,并在2019–12–08号正式运行DAG,Airflow会先回填过去7天的DAG run。由于昨天的日期是在export_api_data
中计算的,因此所有回填的DAG run都将具有yesterday = 2019–12–07
,这样会多次下载同一天的数据并上传到数据库中。我们对DAG文件进行改进以解决这些问题:
with DAG('ads_api', catchup=True, default_args=default_args, schedule_interval='@daily') as dag: def export_api_data(api_token, yesterday): # Call API to extract yesterday's data from API data = download_api_data(api_token, date = yesterday) # IMPROVEMENT: Delete yesterday's partition from database delete_partition_database(yesterday) # Load data to database insert_to_database(data) py_task = PythonOperator( task_id='load_yesterday_data', python_callable=export_api_data, op_kwargs={ 'api_token': secret_api_token, 'yesterday': '{{ ds }}' # IMPROVEMENT } )
{{ ds }}
代替了datetime
库,以获取DAG运行的execution_date
,该日期与DAG的实际运行日期无关。Airflow包含两个关键组件:
调度程序每隔几秒钟扫描并编译所有合规的DAG文件,以检测DAG的变化并检查是否可以触发任务。保持DAG文件的简单性(因为它本质上是配置文件)非常重要,这样Airflow调度程序就可以花费较少的时间和资源来处理它们。DAG文件不应进行任何实际的数据处理。
更改现有DAG的DAG ID等同于创建一个全新的DAG,因为Airflow实际上会在元数据数据库中添加一个新条目,而不会删除旧条目。这可能会引起额外的麻烦,因为如果启动了catchup
,您将丢失所有DAG运行历史记录,并且Airflow将尝试回填所有历史的DAG run。除非完全必要,否则不要重命名DAG。
删除DAG文件不会删除其DAG run的历史记录和其他元数据。您需要使用Airflow UI中的Delete按钮或airflow delete_dag
来显式删除元数据。如果在删除所有先前的元数据之后再次加载相同的DAG,它将再次被视为全新的DAG(如果您想立即重新运行所有过去的DAG,这将非常方便)。
以下是主要知识的摘要:
start_date
并不是第一次DAG开始运行的时间。start_date
以确保DAG run按预期回填。数据黑客:专注数据工程和机器学习,提供开源数据接口。
作者:Xinran Waibel
来源:Medium
原文:Apache Airflow Tips and Best Practices
翻译:数据黑客