已复制
全屏展示
复制代码

Airflow 之 BranchPythonOperator


· 1 min read

一. 简要说明

BranchPythonOperator 是 Airflow 中的一种Operator,用于根据条件选择不同的分支路径。它是 PythonOperator 的一种特殊形式,用于执行 Python 可调用函数来决定任务执行的分支。使用 BranchPythonOperator 时,需要提供一个 Python 可调用函数,它将根据条件逻辑返回选择的分支路径。

二. 实际示例

  • dag2.py ,下面是一个比较夸张的示例,一周当中,每天执行不同的 task 任务。
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator, BranchPythonOperator


def branch_choice(**context):
    logical_date = context['dag_run'].logical_date
    week_day = logical_date.weekday()
    selected_task = ""
    if week_day == 0:
        selected_task = "task1"
    elif week_day == 1:
        selected_task = "task2"
    elif week_day == 2:
        selected_task = "task3"
    elif week_day == 3:
        selected_task = "task4"
    elif week_day == 4:
        selected_task = "task5"
    elif week_day == 5:
        selected_task = "task6"
    elif week_day == 6:
        selected_task = "task7"
    return selected_task


with DAG(
        dag_id='dag2',
        schedule_interval="* * * * *",
        start_date=datetime(2023, 6, 27, 17),
        catchup=False,
        tags=['example'],
) as dag:
    branch_choice_task = BranchPythonOperator(
        task_id='branch_choice_task',
        python_callable=branch_choice
    )

    # tasks from monday to sunday
    task1 = PythonOperator(task_id='task1', python_callable=lambda: print('Hi from task1'))
    task2 = PythonOperator(task_id='task2', python_callable=lambda: print('Hi form task2'))
    task3 = PythonOperator(task_id='task3', python_callable=lambda: print('Hi form task3'))
    task4 = PythonOperator(task_id='task4', python_callable=lambda: print('Hi form task4'))
    task5 = PythonOperator(task_id='task5', python_callable=lambda: print('Hi form task5'))
    task6 = PythonOperator(task_id='task6', python_callable=lambda: print('Hi form task6'))
    task7 = PythonOperator(task_id='task7', python_callable=lambda: print('Hi form task7'))

    # only run one task according to branch_choice_task
    branch_choice_task >> [task1, task2, task3, task4, task5, task6, task7]
  • 下面是执行情况,由于今天是 2023-06-27 周二,所以只执行了 task2,其他的 task 都跳过了。

文章推荐