Airflow线上任务丢失总结
一. 场景重现
我们有一个Airflow集群,发现到 1 点的时候有些任务就延迟到早上 7 点以后才开始执行,有些任务更严重,在 1 点到 7 点的任务直接丢失,7点后才开始正常调度,这是为什么呢,下面下面我用简单的例子讲述一下为什么会发生这种情况,以及怎么解决这个问题。
某Airflow集群现在有下面三个dag,这种场景在企业里面是很常见:分钟级别、小时级别、天级别的任务都有,在默认的Airflow集群配置里面是有问题的:
- 在 8 点到 23 点的所有任务都正常: 每个 dag 都按照自己的预定策略运行
- 小时级任务 0 点到 8 点出现异常:这段时间内任务会累积延迟执行,在8点以后按照顺序一次执行 0 到 7 点的 dagrun (数据执行延迟)
- 分钟级任务 0 点到 8 点的 dagrun 丢失: 在 dagrun 列表里面直接看不到 dag 实例(数据执行直接丢失)
dag | 说明 |
---|---|
dag1 | 10个task,每分钟运行运行一次,max_active_tasks=4,每次运行在1分钟内完成,且dag内部task之间没有依赖 |
dag2 | 10个task,每小时运行运行一次,max_active_tasks=4,每次运行在1小时内完成,且dag内部task之间没有依赖 |
dag3 | 500个task,每天0:30运行一次,max_active_tasks=4,从0:30持续到早上7:00,且dag内部task之间有许多依赖 |
集群的默认default pool slot是128,我们单个dag的允许的并行max_active_tasks数为4也远小于 128,为什么会丢失任务呢?
二. 原因分析
到每天 0:30 的时候,正是 dag3 运行的时候,它包含了500个task,task之间有先后依赖顺序,0:30 分以后查看 scheduler 的日志看到如下内容,发现了有91个slots是空闲的。
[2023-06-19 00:40:40,713] {scheduler_job.py:322] INFO - Figuring out tasks to run in Pool(name-default pool) with 91 open slots and 15 task instance runnig
[2023-06-19 00:40:40,713] {scheduler_job.py:349] INFO - DAG spark_sql_hourly has 4/4 running and queued tasks
[2023-06-19 00:40:40,713] {scheduler job.py:357] INFO - Not executing <TaskInstance: spark_sql_hourly.client_ios_app_log scheduled 2023-06-181 running or queued from DAG spark_sql_hourly is>= to the DAG's max active tasks limit of 4
Airflow 每秒钟都会拉取并打印 scheduled、queued 这两种状态的 task 到 scheduler 的info日志中,同时并判断是否要执行,这两种状态的 task 存储在 MySQL中,由于0:30分 dag3 产生了大量的 task,也就是说有大量的 scheduled、queued 状态的任务被Airflow拉取判断是否执行,只要当前dag 的并行task没有超过 max_active_tasks 的数据就可以执行。
查看 Airflow 源码发现,每次拉取的任务会按照 task 权重、execution_date 排序,取 limit 个 task(parallelism - 已占用的pool的个数)实例,由于 dag3 在 0:30分的时候所有的任务都进入scheduled、queued了,且权重非常高(由于task的依赖个数多,直接拉高了权重),task 任务数远大于了 256(集群配置的parallelism并行度), 所以 dag1 dag2 的任务直接不能被Airflow取出来执行。
- 拉取task实例源码
# Get all task instances associated with scheduled
# DagRuns which are not backfilled, in the given states,
# and the dag is not paused
query = (
session.query(TI)
.join(TI.dag_run)
.filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state != DagRunState.QUEUED)
.join(TI.dag_model)
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
# 重点1
.order_by(-TI.priority_weight, DR.execution_date)
)
starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
if starved_pools:
query = query.filter(not_(TI.pool.in_(starved_pools)))
# 重点2
query = query.limit(max_tis)
dag1 和 dag2 的task实例不能被正常执行,所以 dagrun 就一直是处在 running 状态,随着时间的推移,小时级任务 dag2 在 0:30 到 7 点之间,每个小时会产生一个dagrun,到 7 点有 7 个 dagrun 处于 running,7点以后 dag3 的高权重的task都执行完了,dag2 可以执行了,任务没有丢失只是延迟了(不能容忍)。
但是 dag1 由于是没分钟产生一个dag,32分钟以后会有 32个dagrun处于running状态,有集群有个配置 max_active_runs_per_dag=32
,所以32分钟后就不会再生成dagrun了,所以这些dagrun就丢失了。的7点以后延迟的 dagrun 相继开始运行,又可以生成dagrun了,所以恢复正常了。
小时级任务dag2没有超过32个dagrun,所以任务没有丢失。
三. 解决方法
Airflow在计算权重时,是在单个 dag 内部计算的,task 权重取值使用 downstream 模式计算,也就是说下游多一个依赖它的 task,那么它的权重就加 1 。然而在取出任务执行时,是全局取的,不管是哪个dag,只有是 scheduled 状态的task 实例,就按照权重全局排序,取 limit 个出来。
我们现在知道了主要是权重的原因,解决思路有两个:
- 思路一是给分钟级、小时级任务添加一个很高的基础权重9999,确保排序的时候被排在前面有机会被取出来执行。大家可能会担心 dag3 不能执行,这个担心大可不必,Airflow只是拉取了几百个task实例出来,但是真正执行的只有4个(max_active_tasks控制着)
default_args = {
'owner': 'bigdata_team',
'depends_on_past': False,
'retries': 0,
'priority_weight': 9999
}
- 思路二是把 limit 值调大,也就是集群的配置 parallelism,保证每次都可以把所有的schedule的任务取出来,它的值也不是无限调大的,最大受到max_tis_per_query的影响,max_tis_per_query默认是512,这两个参数应该同时调整,不过同时 MySQL 的压力也会增加,不过这点请求对MySQL来说可能微不足道。
# This defines the maximum number of task instances that can run concurrently in Airflow
# regardless of scheduler count and worker count. Generally, this value is reflective of
# the number of task instances with the running state in the metadata database.
parallelism = 256
# If this is too high, SQL query performance may be impacted by
# complexity of query predicate, and/or excessive locking.
# Additionally, you may hit the maximum allowable query length for your db.
# Set this to 0 for no limit (not advised)
max_tis_per_query = 512
四. 配置总结
下面重点总结下前文说到的几个配置。
max_active_runs_per_dag
一个 dag 文件允许的同时运行的最大 dagRun 个数。 在 dag 文件中用 max_active_runs
定义,如果 dag 文件中没有定义,则使用配置文件的 max_active_runs_per_dag
,我们写dag的时候通常是没有写这个参数的,所以我们应该调大集群的max_active_runs_per_dag
值。
使用场景:当 Airflow 集群资源紧张的时候,可以调大此参数,否则可能丢失dagrun
max_active_tasks_per_dag
同一个 dag 中,运行 active(running or queued)
的 task 最大数量,为 1 表示只允许一个 task 运行,为 10 表示这个 dag 中的 task 可以并行,在 dag 文件中通过 max_active_tasks 配置,如果 dag 文件中未配置,则取配置文件的 。max_active_tasks_per_dag
max_tis_per_query
默认 512,限制查询 MySQL 返回的数量大小,如果没有限制,可能会拖垮MySQL,Airflow 真正取回的 task 数量计算方法: min(max_tis_per_query,max_tis_per_query)
parallelism
整个 airflow 系统里面,所有的 dag 中的 task 允许 的并发数量,即只允许有这么多的task同时是 active
的(running or queued
)。 实际上 Airflow 会根据 Airflow集群中的所有的 task instance
的权重 和 execution_date
排序,从 MySQL 中取出 (parallelism - 已占用的pool的个数)条在 scheduled
的状态的 task Instance
来运行,而 Airflow 中的 task 权重计算方式是 downstream 方式,即某个 task 的下游任务个数即为该 task instance
的权重。如果 Airflow 中 dag1 中有上百个 task,且这些 task 有很多的依赖关系(重点,根据依赖关系计算权重),那么此时集群中的其他dag有可能权重太低导致无法从 MySQL 查询到。