已复制
全屏展示
复制代码

Airflow补数与重跑backfill


· 3 min read

一. 概要说明

在创建 dag 时,我们最好是保持 catchup=False,也就是不补数,因为在你首次创建dag的时候,不知道 dag 文件是否写对了,如果有错的话,错误的补数也就没有意义了。如果要刷的数据分区早于 dag 创建的时间,就需要手动执行,airflow 提供了 backfill 子命令来完成这个事情。

with DAG(
        ......
        catchup=False,
        ......
) as dag:
    pass
    
最佳实践
  • 如果是重跑失败的dag、或者是在创建dag之后的日期重跑,直接在web界面中重跑即可(clear)
  • 如果是在创建 dag 之前的日期、或者一整段连续时间重跑,使用 backfill 命令重跑。

backfill 命令

# 查看帮助
airflow dags backfill --help


# 主要使用两个参数
airflow dags backfill -s start_date -e end_date dag_id


# 每分钟执行一次的 dag game_monitor_dag 补数 10 次示例
airflow dags backfill -s '2023-05-30 02:00:00' -e '2023-05-30 02:10:00' game_monitor_dag
  • -s 覆盖 dag 参数里面的 start_date,YYYY-MM-DD HH:mm:ss 格式
  • -e 覆盖 dag 参数里面的 end_date,YYYY-MM-DD HH:mm:ss 格式

二. backfill 补数

  • 现在时间是 2022-05-28 22:30:06, 激活如下的 dag 时,并不会马上执行,而是会等到 22:40 才会执行,此时的 logical_datedatetime(2022, 5, 27, 22, 42)
  • 等到 22:40 以后,27号的数据已经跑完了,但是我现在想把 20号 ~ 25 号的数据补上,这是一个非常常见的场景。

dag1.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        dag_id='dag1',
        default_args=default_args,
        description='A simple DAG',
        schedule_interval='40 22 * * *',
        start_date=datetime(2022, 5, 27, 22, 42),
        catchup=False,
        tags=['newExample'],
) as dag:

    task1 = BashOperator(
        task_id='task1',
        bash_command="echo task_id: {{ task.task_id }} "
    )
    
使用 backfill 命令补数
# 如下命令会从 2022-05-20 到 2022-05-25号,依次将天的任务加到队列里面执行。
# logical_date 依次是:2022-05-20 22:40:00、2022-05-21 22:40:00、2022-05-22 22:40:00 。。。。。2022-05-25 22:40:00
# 注意:该命令是前台执行的,如果你的任务需要跑很长时间,中途万一ssh连接断掉,那后面的任务可能就会失败了,
#      所以最好在后台执行,或者使用 tmux、screen 等工具,开启新的session执行
    
# 注意是:-s 包含, -e 不包含
airflow dags backfill -s 2022-05-20 -e 2022-05-26 dag1

三. backfill 重跑

假设 2022-05-20 到 2022-05-26号的数据已经跑过了,并且状态是 Success 的,现在想把这7天的数据重新跑一次。首先清除状态,然后在 backfill

# 首先清除掉指定时间范围的 task 状态,清除以后这些状态会进入 queue 里面
airflow tasks clear -s 2022-05-20 -e 2022-05-26 dag1

# 重跑数据
airflow dags backfill -s 2022-05-20 -e 2022-05-26 dag1

文章推荐