Airflow补数与重跑backfill
一. 概要说明
在创建 dag 时,我们最好是保持 catchup=False
,也就是不补数,因为在你首次创建dag的时候,不知道 dag 文件是否写对了,如果有错的话,错误的补数也就没有意义了。如果要刷的数据分区早于 dag 创建的时间,就需要手动执行,airflow 提供了 backfill
子命令来完成这个事情。
with DAG(
......
catchup=False,
......
) as dag:
pass
最佳实践
- 如果是重跑失败的dag、或者是在创建dag之后的日期重跑,直接在web界面中重跑即可(clear)
- 如果是在创建 dag 之前的日期、或者一整段连续时间重跑,使用
backfill
命令重跑。
backfill 命令
# 查看帮助
airflow dags backfill --help
# 主要使用两个参数
airflow dags backfill -s start_date -e end_date dag_id
# 每分钟执行一次的 dag game_monitor_dag 补数 10 次示例
airflow dags backfill -s '2023-05-30 02:00:00' -e '2023-05-30 02:10:00' game_monitor_dag
- -s 覆盖 dag 参数里面的
start_date
,YYYY-MM-DD HH:mm:ss 格式 - -e 覆盖 dag 参数里面的
end_date
,YYYY-MM-DD HH:mm:ss 格式
二. backfill 补数
- 现在时间是 2022-05-28 22:30:06, 激活如下的 dag 时,并不会马上执行,而是会等到 22:40 才会执行,此时的
logical_date
为datetime(2022, 5, 27, 22, 42)
- 等到 22:40 以后,27号的数据已经跑完了,但是我现在想把 20号 ~ 25 号的数据补上,这是一个非常常见的场景。
dag1.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='dag1',
default_args=default_args,
description='A simple DAG',
schedule_interval='40 22 * * *',
start_date=datetime(2022, 5, 27, 22, 42),
catchup=False,
tags=['newExample'],
) as dag:
task1 = BashOperator(
task_id='task1',
bash_command="echo task_id: {{ task.task_id }} "
)
使用 backfill 命令补数
# 如下命令会从 2022-05-20 到 2022-05-25号,依次将天的任务加到队列里面执行。
# logical_date 依次是:2022-05-20 22:40:00、2022-05-21 22:40:00、2022-05-22 22:40:00 。。。。。2022-05-25 22:40:00
# 注意:该命令是前台执行的,如果你的任务需要跑很长时间,中途万一ssh连接断掉,那后面的任务可能就会失败了,
# 所以最好在后台执行,或者使用 tmux、screen 等工具,开启新的session执行
# 注意是:-s 包含, -e 不包含
airflow dags backfill -s 2022-05-20 -e 2022-05-26 dag1
三. backfill 重跑
假设 2022-05-20 到 2022-05-26号的数据已经跑过了,并且状态是 Success 的,现在想把这7天的数据重新跑一次。首先清除状态,然后在 backfill
# 首先清除掉指定时间范围的 task 状态,清除以后这些状态会进入 queue 里面
airflow tasks clear -s 2022-05-20 -e 2022-05-26 dag1
# 重跑数据
airflow dags backfill -s 2022-05-20 -e 2022-05-26 dag1