已复制
全屏展示
复制代码

Airflow跨Dag任务之间依赖总结


· 6 min read

一. 简要概述

Airflow 的任务依赖主要分为:单个 Dag 内部的 task 依赖、跨 Dag 之间的 task 依赖,单个 Dag 内部很简单,通过 >> 实现即可,在多个 Dag 之间的 task 会用到 ExternalTaskSensor、ExternalTaskMarker 这两个 Operator 辅助实现。

二. 单Dag任务依赖

单个 Dag 内部直接通过 >> 实现即可,下面示例中 task2 依赖了 task1 任务。

  • test_dag.py
from airflow import DAG
import datetime
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'bigdata_team',
    'depends_on_past': False,
    'retries': 0,
}

with DAG(
        dag_id='test_dag',
        description='for test',
        default_args=default_args,
        max_active_tasks=10,
        schedule_interval='*/5 * * * *',
        start_date=datetime.datetime(2023, 6, 1, 13, 50),
        end_date=datetime.datetime(2029, 7, 1, 14, 50),
        catchup=False,
        tags=['big_data'],
        user_defined_macros={},
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command='sleep 2; echo "task1 at $(date)" >> /tmp/task',
    )

    task2 = BashOperator(
        task_id='task2',
        bash_command='sleep 2; echo "task2 at $(date)" >> /tmp/task',
    )

    [task1] >> task2

三. 跨Dag任务依赖

3.1 实现跨Dag任务依赖

现在有dag1、dag2,dag2 里面的 task 依赖 dag1 里面的 task,下面是实现细节。

  • dag1.py
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'bigdata_team',
    'depends_on_past': False,
    'retries': 0,
}

with DAG(
        dag_id='dag1',
        description='for test',
        default_args=default_args,
        max_active_tasks=10,
        schedule_interval='30 7 * * *',
        start_date=datetime.datetime(2023, 6, 1, 13, 50),
        end_date=datetime.datetime(2029, 7, 1, 14, 50),
        catchup=False,
        tags=['big_data'],
        user_defined_macros={},
) as dag:
    dag1_task = BashOperator(
        task_id='dag1_task',
        bash_command='sleep 2; echo "dag1_task at $(date)" >> /tmp/task',
    )
  • dag2.py
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor

default_args = {
    'owner': 'bigdata_team',
    'depends_on_past': False,
    'retries': 0,
}

with DAG(
        dag_id='dag2',
        description='for test',
        default_args=default_args,
        max_active_tasks=10,
        schedule_interval='30 8 * * *',
        start_date=datetime.datetime(2023, 6, 1, 13, 50),
        end_date=datetime.datetime(2029, 7, 1, 14, 50),
        catchup=False,
        tags=['big_data'],
        user_defined_macros={},
) as dag:
    # 在dag2内部定义dag1的task
    dag1_task = ExternalTaskSensor(
        task_id='dag1_task',     # 自己的 task_id,在 airflow 列表页面显示
        external_dag_id='dag1',  # 指定依赖哪一个 dag 的 id
        external_task_id='dag1_task',  # 依赖task_id,为None则依赖整个dag
        poke_interval=60,              # 两次检查的间隔时间,单位秒。
        allowed_states=['success'],    # 即当该 task 成功后才执行后续任务
        check_existence=True,    # 检查外部的dag_id 和 task_id是否存在

        # 该参数非常重要。表示当前 dag 与外部 dag 的执行时间间隔
        # 这里dag1在7:30开始执行,dag2在8:30开始执行,则该参数必须设置为60分钟
        # 否则该任务不会执行成功
        execution_delta=datetime.timedelta(minutes=60),

        # 两种模式 poke、reschedule
        # poke:在休眠期间也会占用slot
        # reschedule: 在休眠期间不会占用slot,只有在执行时才会占用
        mode='reschedule',
    )

    dag2_task = BashOperator(
        task_id='dag2_task',
        bash_command='echo "dag2_task at $(date)" >> /tmp/task',
    )

    dag1_task >> dag2_task

注意:如果dag1在dag2之后执行,execution_delta 的参数要改成负的

3.2 跨Dag任务依赖优化

使用 ExternalTaskSensor 实现依赖外部 task 的功能,看下面一个很常见的场景:

两个 dag 都是每天在指定时间运行一次,dag1 中有上百个 task,dag2 中也有上百个 task,且 dag2 中有一半的 task 都依赖 dag1 中的一半的 task。假设某天 dag1 由于某些原因一部分任务失败了,并且这部分任务被 dag2 中 task 依赖着,所以 dag2 中的那些 task 也失败了,这时怎么解决。

查明失败原因并解决掉后,在 web 界面递归 clear dag1 中失败的 task 及其下游任务,由于 dag1 中的 task 并不知道 dag2 中的 task 依赖的它,所以它并不会 clear dag2 中的task,此时我们只有去 dag2 的页面手动 clear 了。

庆幸的是,Airflow 提供了一个 ExternalTaskMarker 类,它可以告诉 dag1 里面的task,dag2 中的 task 依赖了自己,在 clear dag1 的task的时候,也会把 dag2 中依赖 dag1 中的 task clear 掉。注意:在 clear 的时候,记得选择 Downstream and Recursive 两个选项。

假设 dag1 和 dag2 同时运行,看下面完整的代码:dag2_task 依赖 dag1_task,如果clear dag1_task,那么 dag2_task 也会被 clear。

  • dag1.py
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskMarker


def create_external_task_marker(child_dag_id, child_task_id):
    return ExternalTaskMarker(
        task_id=child_dag_id + "_" + child_task_id,
        external_dag_id=child_dag_id,
        external_task_id=child_task_id,
    )


default_args = {
    'owner': 'bigdata_team',
    'depends_on_past': False,
    'retries': 0,
}

with DAG(
        dag_id='dag1',
        description='for test',
        default_args=default_args,
        max_active_tasks=1,
        schedule_interval='30 9 * * *',
        start_date=datetime.datetime(2023, 8, 16, 7, 30),
        catchup=False,
        tags=['big_data'],
        user_defined_macros={},
) as dag:
    dag1_task = BashOperator(
        task_id='dag1_task',
        bash_command='sleep 60; echo "dag1_task at $(date), logical_date: {{ logical_date }}" >> /tmp/task',
    )

    [dag1_task] >> create_external_task_marker('dag2', 'external_dag1_task')

ExternalTaskMarker 标记 dag2 里面的某个上游task,其实就是 dag1_task 在 dag2 里面的一个 ExternalTaskSensor

  • dag2.py
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor

default_args = {
    'owner': 'bigdata_team',
    'depends_on_past': False,
    'retries': 0,
}

with DAG(
        dag_id='dag2',
        description='for test',
        default_args=default_args,
        max_active_tasks=1,
        schedule_interval='30 9 * * *',
        start_date=datetime.datetime(2023, 8, 16, 7, 30),
        catchup=False,
        tags=['big_data'],
        user_defined_macros={},
) as dag:
    external_dag1_task = ExternalTaskSensor(
        task_id='external_dag1_task',
        external_dag_id='dag1',
        external_task_id='dag1_task',
        allowed_states=['success'],
        check_existence=True,
        execution_delta=datetime.timedelta(minutes=0),
        mode='reschedule',
    )

    dag2_task = BashOperator(
        task_id='dag2_task',
        bash_command='echo "dag2_task at $(date)", logical_date: {{ logical_date }} >> /tmp/task',
    )

    external_dag1_task >> dag2_task

四. 时间依赖

某个 dag 每天凌晨1点开始执行,但是 dag 中有个 task 必须要等到 6 点半才能开始执行,此时可以使用 DateTimeSensor,下面的实例表示 task1 依赖 execute_at_06_30 任务。

  • test_dag.py
from datetime import timedelta, datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.date_time import DateTimeSensor


# 计算指定的时间
def target_datetime(logical_date, hour, minute=0, interval=8):
    next_cst_date = logical_date + timedelta(hours=interval + 24)
    h = '{:0>2}'.format(hour)
    m = '{:0>2}'.format(minute)
    return next_cst_date.strftime("%Y-%m-%d ") + h + ":" + m + ":00"


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
}

with DAG(
        dag_id='test_dag',
        default_args=default_args,
        description='for test',
        schedule_interval='0 1 * * *',
        start_date=datetime(2023, 5, 24, 2),
        catchup=False,
        tags=['big_data'],
        user_defined_macros={
            "target_datetime": target_datetime
        }
) as dag:
    execute_at_06_30 = DateTimeSensor(
        task_id='execute_at_06_30',
        mode='reschedule',
        poke_interval=60,  # 每隔60秒检测一次
        
        # 指定需要检测的时间6:30
        target_time="{{ target_datetime(logical_date,hour=6,minute=30) }}",
    )

    task1 = BashOperator(
        task_id="task1",
        bash_command="echo task_id:{{ task.task_id }}"
    )

    execute_at_06_30 >> task1

五. 失败任务重跑方法

使用场景:当某任务失败导致下游任务处于等待执行的状态,当失败任务解决后,使用 web 界面提供的 run、clear、mark success 按钮让任务重新执行。

  • run 按钮:如果当天的dag状态为failed,使用run只会重跑当前任务,不会自动执行下游等待的任务; 如果当天的dag状态不为failed,使用run会重跑当前任务和下游等待的任务。
  • clear 按钮:重跑当前任务和下游所有任务,不受限于当天的dag状态
  • mark success 按钮:如果通过其他方式将失败的任务恢复了,那么可以直接使用该按钮,airflow会自动执行下游等待的任务。不受限于当天的dag状态

注意:哪个任务失败就重跑哪个任务,切记直接clear整个dag,除非当天任务全部失败了。


文章推荐