Apache Airflow 是我们数据平台中一个极为重要的组件,被公司内的不同团队使用。它负责我们所有的数据转换、欺诈检测机制、数据科学任务或项目以及我们在 Teya 的日常维护和内部任务。
这么说吧,我们正在运行超过300个DAGs,平均每天运行超过5,000个任务。所以可以说我们有一个中等规模的Airflow部署,为我们的用户提供切实的价值。已经8个多月没有在Airflow上出现任何事故或故障。
在这篇文章中,我想分享我们在部署过程中的重要经验,这些经验帮助我们构建了一个可扩展、稳定的环境。希望这有助于你开始在生产环境中使用Apache Airflow的过程,或者帮助你评估不同的方案并将其融入你的实际应用中。
我会根据形成我们当前使用的Airflow的关键因素将其划分成几个部分:
在这里我们使用 Kubernetes 来运行所有东西。所以 Airflow 的情况也不会例外。一开始,选择执行器似乎很明显:让我们用Kubernetes Executor吧!运行时的隔离、利用 Kubernetes 实现任务的无缝扩展以及需要管理的组件较少(例如不需要像 Redis 这样的 Celery 后端),听起来都很好。从这里我们开始了自己的旅程。
然而,我们的技术栈有一个重要的特性:大多数任务是轻量DBT增量变换,而长时间运行的模型则很少,大约 1 小时左右。
我们面临的第一问题是启动任务的开销。由于 KubernetesExecutor
每个任务都在一个单独的 Pod 中运行,有时等待 Pod 启动的时间甚至超过任务本身的执行时间。由于我们有大量小型任务的缘故,我们不得不不断等待 Kubernetes 节点的扩容以适应不断增加的 Pod 数量。
第二个问题,也是更令人痛苦的问题,是一些任务(特别是那些长时间运行的任务)由于Pod被逐出而意外失败。随着任务的激增,Pod的数量随之增加,进而增加了集群中的节点数量,任务完成后,系统便会开始缩减规模。
快速扩缩进/出问题
问题进一步加剧,因为我们使用Karpenter来优化我们在k8s集群中的资源使用。因此,在几个Pod完成后,节点缩减非常迅速。这种行为会驱逐这些节点上剩余的Pod,并将它们重新分配到其他节点,从而减少总的节点数并节省成本。
考虑到这一切,我们决定转向经典的Celery Executor。现在有了固定的worker节点,它非常适合我们的需求。现在我们有许多小型快速任务的使用场景。DBT作业的平均运行时间显著缩短了,因为我们不再需要等待它开始运行。
通过使用Airflow官方最新版的helm chart,我们能够利用KEDA autoscaler根据需要增加或减少Celery 工作器的数量,从而避免为闲置的工作器支付额外费用。它通过从Airflow数据库中获取_运行中_和_排队中_任务的数量来工作,然后根据任务数量相应地调整工作器的数量。具体取决于您的worker并发设置。具体取决于您的worker并发配置。
对于那些需要较多资源的定制作业,我们可以选择使用 KubernetesPodOperator
来运行这些任务。这样我们仍然可以为特定的依赖项实现运行时的隔离(无需在 Airflow 的镜像中安装这些依赖项),并且可以为每个任务单独定义所需的资源。
目前我们仍在考虑采用 KubernetesCeleryExecutor
,因为它允许任务被安排到两个独立的队列中——即 k8s 队列 和 Celery 队列。在某些任务更适合用 Celery 处理而其他任务更适合用 Kubernetes 处理的情况下,这会很有帮助。
数据工程团队并不是唯一一个编写Airflow DAG的团队。为了允许各个团队编写自己的DAG,我们需要采用多仓库策略来管理DAG。同时,我们还需要保持一致性并强制执行相关指南。
DAGs可以分别由不同团队编写,在独立仓库中开发,最终仍然可以运行在同一Airflow实例中。当然,我们不需要将DAGs嵌入到Airflow的镜像中,这样更方便😊
听我说,每次有人改动DAG中的一个字符时,你都不想每次都重启scheduler
和workers
,这太麻烦了。
去中心化 DAG 仓库
每个DAG最终都会被放入一个桶中,通过与这些DAG相关的特定路径上的 sync
操作。
为了这种方法才能有效,非常重要的一部分是确保CI/CD的约束条件得到执行。每个DAG名称必须以前缀为拥有该DAG的团队名称,以避免DAG ID的冲突。此外,每个DAG都会进行静态检查以确保正确的所有者分配和标签的存在情况,并捕获可能的导入错误等问题。
通过这样做,我们可以使用原生的Airflow角色来实施访问控制,并且所有的DAG都需要通过最低治理检查清单才能被提交到仓库。
为了让DAGs在Airflow中生效,我们需要将存储桶同步到运行scheduler
、workers
等Pods的本地文件系统中。为此,我们使用objinsync,这是一个轻量级的守护进程,用于将远程对象存储逐增量同步到本地文件系统。
我们在每个Airflow组件Pod中运行objinsync
作为辅助容器,频繁进行同步。因此,我们总能在几分钟内同步到DAG的新更新。这里的一个教训是也将objinsync
作为初始化容器(init容器)添加,这样它可以在主scheduler
或worker
容器启动之前同步DAG文件。这在Celery工作者重启后尤为重要——由于节点轮换或发布,导致有时任务会被分配给尚未同步DAG文件的新工作者,导致任务立即失败。
最理想的方法是,在 scheduler
中作为侧边容器运行 仅一个 objinsync
进程,并将桶中的内容复制到一个 持久卷 中。这样,这个 PV 会被挂载到所有 Airflow 组件上。这样做的好处是,DAG 在各个 Airflow 组件之间不会出现不同步的情况。
遗憾的是,我们目前还不能在这里实现这个解决方案,因为我们目前仅支持集群节点上的EBS卷。要在不同的节点挂载PV,我们需要使用ReadWriteMany
访问模式。这种访问模式目前只在AWS EKS中,使用EFS卷模式时可用。
一个折衷的解决方法是,考虑到我们现有的限制,可以使用一个 [nodeSelector](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector)
将所有 Airflow Pods 调度到同一节点。然而,我们更倾向于使用分布在不同可用区的节点来实现一个高可用的 Airflow 集群。
如果你想批量生成DAG,就可以利用DAG模板和程序化生成。再也不用手动一个一个地写DAG了 😂
也许生成动态DAG最简单的方式是使用单文件方法。你可以有一个文件,在一个循环中生成DAG对象并将其添加到globals()
字典里。
这种方法在我们刚开始根据DBT项目生成动态DAG时非常简单(关于DBT编排的话题值得单独写一篇文章,未来会详细讨论)。然而,当DAGs由调度器
定期解析时,我们发现使用这种方法会导致CPU和内存使用增加以及调度循环时间变长,尤其是解析DBT的manifest.json
文件,这个文件相当大 😅。很快我们就发现这种方法对于我们的项目规模来说不可扩展。
解决方法是采用多文件法,即为每个我们想要动态创建的DAG生成一个.py
文件。通过这种方式,我们将DAG生成过程集成到了我们的DBT项目仓库中。现在,项目也成为了一个生成DAG的来源,将动态生成的文件推送到DAG桶。
天文学者写了一篇文章介绍了单一文件方法和多文件方法这里。
当我们转到CeleryExecutor
之后,虽然解决了我们面临的一个问题,但新的问题也开始出现。运行几天(甚至几小时)之后,一些Celery任务开始因OOM(内存不足)问题而崩溃。我们已经为Pods提供了足够的内存资源,所以这感觉有点不对劲。
经过调查后,我们从Celery worker资源使用图表中看到这个。
Celery的内存泄漏
我们的任务规模不大,主要由Celery工人执行的DBT任务构成。当时内存使用量持续增加,让我们困惑,我们开始怀疑存在内存泄漏问题。
为了防止内存泄漏并有效控制任务的内存使用,我们不得不对两个重要的Celery配置项——worker_max_tasks_per_child
和 worker_max_memory_per_child
——进行调整。
第一个控制一个worker进程在被新进程替换之前能执行的最大任务数量。首先,我们需要理解Celery worker和worker进程之间的区别。一个worker节点可以启动多个worker进程,这些进程由concurrency
设置控制这些进程的数量。例如,如果你将concurrency
设置为12,而你有2个Celery worker,那么最终将生成24个worker进程。
因此,为了避免同一工作进程中的不同任务之间发生内存泄漏,偶尔重启工作进程是有好处的。如果没有设置该配置,默认是不会重启工作进程的。
第二个选项 worker_max_memory_per_child
控制每个工作进程的最大常驻内存用量,如果内存用量超过该限制,该进程将会被一个新的进程所取代。这实际上控制的是任务的内存使用。默认设置为无限制,因此建议最好设置该值。
通过调整这两个设置,我们可以在两种情况下控制内存使用:当它们达到最大任务数,或达到最大常住内存时,会回收工作进程。重要的是要记住,这些设置仅在使用 prefork 池 时才有效。详细信息请参阅 官方文档。
在 Airflow 中设置它们非常简单。你只需要更新 Airflow 中的 config_templates
文件夹里的配置,如下所示:
# config_templates/custom_celery.py # 此文件用于定义自定义Celery配置,根据需要覆盖默认配置中的某些参数。 from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG CUSTOM_CELERY_CONFIG = DEFAULT_CELERY_CONFIG.copy() CUSTOM_CELERY_CONFIG.update( { "worker_max_tasks_per_child": 整数(integer), "worker_max_memory_per_child": 整数(integer), } )
然后你可以在 values.yaml
中找到这个自定义配置项
airflow: config: celery: worker_concurrency: <int> # 工作者并发数 celery_config_options: config_templates.custom_celery.CUSTOM_CELERY_CONFIG # Celery 配置选项
这些值将取决于你的工作节点配置、内存请求和限制、并发量以及任务的内存消耗程度。这就是为什么你需要根据你的具体情况进行微调。
一个 k8s 节点可能会因为故障或由管理 Kubernetes 集群的基础设施团队安排的计划轮换而旋转。同样,在发布时,或者当你更改某些配置(例如环境变量)或基础镜像时,工作节点也会进行轮换。这当然会导致 Pods 被终止。
我们需要为这些事件做好准备,确保我们的任务不会仅仅因为一个Pod被下线(decommissioned)而失败。这对长时间运行的任务来说特别麻烦。想象一下,一个运行了大约2到3个小时的任务因为计划中的节点轮换导致任务失败。
为了防止这种情况,应根据您的具体需求设置Worker Termination Grace Period配置。这个配置会让Celery worker在被发布过程或节点轮换前关闭时等待指定的秒数。您也可以在Airflow的values.yaml
文件中轻松设置此配置。
airflow: workers: terminationGracePeriodSeconds: <终止宽限期(秒): <一个整数>(即工作进程终止前给予的宽限时间,单位为秒)>
建议设为最长任务时间的1.5倍左右,这样可以确保每个任务都能在这个时间段内完成,并且可以在工人完成任务后平稳关闭。
Airflow 最常见的使用场景之一是在某些任务完成之后发送自定义提醒,比如处理文件、清理任务或甚至是任务失败。如果你在一个多团队都在使用 Airflow 的环境中工作,你应该统一通知机制,以保持语气的一致性。
这可以避免Team A
从Airflow发送给Slack的消息与Team B
使用的格式完全不同,比如说。
自 Airflow 2.6 起,我们可以使用Notifiers来处理这种情况。社区已经提供了一套现成的 Notifiers,覆盖了最常见的场景:Slack、SQS、Jira 等等。
这里,我们从BaseNotifier
类创建了自己的自定义通知器,因此我们可以根据需要自定义通知模板,并嵌入自定义逻辑。例如,在开发环境中运行任务时,例如dev
,默认情况下,仅在任务失败时向Slack发送通知。而在生产环境中,例如prd
,将发送通知至我们的值班工具Opsgenie。
一个提醒器,多个接收对象和自定义选项
自定义通知也使用模板,团队可以使用标准格式在 Slack 中创建信息消息等。这种方法的另一个好处是,使用它的团队无需担心管理各个通知目标的秘密。
即使我们采用了最佳实践和高可用性模式,Airflow 仍然可能因各种原因而失败。因此,在基础设施层面实现可观测性、指标监控和告警机制非常重要。
当你在 Kubernetes 上运行时,你可以通过为每个感兴趣的事件设置 PrometheusRule
来完成这个操作。例如,你可以监控 scheduler
节点的健康状况、可用 worker
节点的数量,甚至可以监控特定的 Airflow 指标,比如 scheduler
循环时间。通过运行 [AlertManager](https://prometheus.io/docs/alerting/latest/alertmanager/)
,你可以向不同的目标(如 Slack、PagerDuty、Opsgenie 等平台)发送警报。
另一个明智的做法是利用 Airflow 指标 来提升环境的可观察性。在写下这些内容时,Airflow 支持将指标发送给 StatsD
和 OpenTelemetry
。相比之下,OpenTelemetry
不仅是一个更全面的框架,还支持 日志 和 跟踪。不过,当前 Airflow 尚未实现通过 OTEL 进行日志记录和跟踪(不过未来将会实现这一功能)。
此外,如果你想使用它,你还需要在Kubernetes中管理OTEL Collector的部署。这里指的是官方提供的helm chart。与 statsd 不同,官方的Airflow图表并未提供OTEL Collector。
示例的Airflow监控图
使用标准指标可以大幅改进警报功能。例如,您可以使用排队任务数量,并设置当队列在特定时间内增长过多时触发警报——您不希望队列长度超过服务水平协议(Service Level Agreement,SLA)规定的时间。
我们还会监控一些有用的指标,比如 DAG解析时长 和 调度器循环时长 ,以便快速发现可能影响Airflow核心并减慢整个应用程序运行速度的问题。
元数据数据库是成功部署Airflow至关重要的因素,因为如果处理不当,可能会严重影响其性能,甚至导致Airflow无法运行。
除了上述提到的Airflow节点和性能指标的监控之外,监控你的数据库健康状况也非常重要。这取决于你是否使用PostgreSQL或MySQL(请千万不要使用SQLite 😂),但最常见的指标包括CPU使用情况,可用存储空间,打开的连接数量等。
另一个好的做法是定期运行元数据清理工作来移除旧的和不再使用的元数据。这可以包括如 job
、dag_run
、task_instance
、log
、xcom
、sla_miss
、dags
、task_reschedule
、task_fail
等表等等。所有这些元数据在Airflow中不断累积,使得获取任务状态的查询时间比实际需要的长。此外,你是否觉得Airflow加载和导航有点慢?这种感觉或体验可能是由于元数据累积导致的。
幸好,Airflow 自带命令 airflow db clean
来完成这个任务,通过可选标志来设置其行为。详情请参阅这里。
如果你正在使用 Kubernetes,一个好方法是在 Airflow 的部署中添加一个 CronJob
定时任务,定期运行带有你指定参数的 airflow db clean
命令。根据你的实现大小或复杂度,你可能需要每天或每周运行一次该任务。
我希望这段文字能对刚开始在Kubernetes上运行Airflow的团队提供一些帮助和指导,并特别关注那些共享同一Airflow集群的多团队环境。
还有很多促成成功实施的组件和细节没有在这里提到。我们还有很多要改进的地方,路还很长。如果您也想分享自己的经验或提问,欢迎随时联系我,让我们聊聊 😄