已复制
全屏展示
复制代码

Airflow 之 ShortCircuitOperator


· 2 min read

一. 简要介绍

ShortCircuitOperator 用于在工作流中实现条件判断和短路功能。它的主要作用是基于一个条件表达式的结果来决定是否执行后续的任务。ShortCircuitOperator 可以在任务的执行过程中引入条件逻辑,以便根据某个条件的真假情况来控制工作流的执行路径。如果条件为真,任务将继续执行后续的任务;如果条件为假,任务将被短路,直接跳过下游的任务。

二. 应用场景

假如某个 DAG 有两个 task,task1 是周任务即每周一运行一次,task2 是天任务即每天运行一次,task2 依赖 task1 的周任务输出结果。举例来说,task1 周一执行,周二到周日跳过不执行,task2周一到周日都执行,只不过周一的时候必须要等到 task1 执行成功后 task2 才能执行,周二到周日不用等待 task1 的执行。

这种场景通常应用在每天的数据依赖周报的数据,但是周报又不是每天都跑的,周报只需要跑一天就行了,这种场景我们可以使用ShortCircuitOperator,用它来判断是否执行下游任务。

三. 实际示例

  • dag1.py 下面是一个完整示例,Airflow 版本为 2.6.2,这个 dag 每分钟运行一次。
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator, ShortCircuitOperator

with DAG(
        dag_id='dag1',
        schedule_interval="* * * * *",
        start_date=datetime(2023, 6, 27, 17),
        catchup=False,
        tags=['example'],
) as dag:
    # only execute at even minute
    execute_at_even_minute = ShortCircuitOperator(
        task_id='execute_at_even_minute',
        ignore_downstream_trigger_rules=False,
        python_callable=lambda **context: (context['dag_run'].logical_date.minute % 2) == 0
    )
    task1 = PythonOperator(task_id='task1', python_callable=lambda: print('Hi from task1'))
    task2 = PythonOperator(task_id='task2', python_callable=lambda: print('Hi form task2'),
                           trigger_rule='none_failed')

    # task3 also only execute at even minute
    execute_at_even_minute >> task1 >> task2
  • execute_at_even_minute中参数python_callable返回False,表示跳过,返回True,表示正常执行。这里 task1 只有在 logical_date为偶数分钟的时候才执行,奇数分钟的时候则跳过。
  • execute_at_even_minute中参数ignore_downstream_trigger_rules=False表示不会忽略下游的 trigger_rules,默认值为 True,也就是说默认情况下不管下游的 task 的 trigger_rules 自定义为什么值,Airflow 都会把它重置为 all_success,只有设置为 False 后,task2 里面的 trigger_rules才会生效。
  • task1 偶数分钟执行,奇数分钟时跳过。
  • task2 的参数 trigger_rule='none_failed'表示只要上游没有失败的,我就可以执行,这当然包括了如果是 skipped 的,task2 也是可以执行的。
  • 所以 task2 每分钟都会执行。

文章推荐