Airflow 之 BranchPythonOperator
一. 简要说明
BranchPythonOperator
是 Airflow 中的一种Operator
,用于根据条件选择不同的分支路径。它是 PythonOperator
的一种特殊形式,用于执行 Python 可调用函数来决定任务执行的分支。使用 BranchPythonOperator
时,需要提供一个 Python 可调用函数,它将根据条件逻辑返回选择的分支路径。
二. 实际示例
- dag2.py ,下面是一个比较夸张的示例,一周当中,每天执行不同的 task 任务。
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator, BranchPythonOperator
def branch_choice(**context):
logical_date = context['dag_run'].logical_date
week_day = logical_date.weekday()
selected_task = ""
if week_day == 0:
selected_task = "task1"
elif week_day == 1:
selected_task = "task2"
elif week_day == 2:
selected_task = "task3"
elif week_day == 3:
selected_task = "task4"
elif week_day == 4:
selected_task = "task5"
elif week_day == 5:
selected_task = "task6"
elif week_day == 6:
selected_task = "task7"
return selected_task
with DAG(
dag_id='dag2',
schedule_interval="* * * * *",
start_date=datetime(2023, 6, 27, 17),
catchup=False,
tags=['example'],
) as dag:
branch_choice_task = BranchPythonOperator(
task_id='branch_choice_task',
python_callable=branch_choice
)
# tasks from monday to sunday
task1 = PythonOperator(task_id='task1', python_callable=lambda: print('Hi from task1'))
task2 = PythonOperator(task_id='task2', python_callable=lambda: print('Hi form task2'))
task3 = PythonOperator(task_id='task3', python_callable=lambda: print('Hi form task3'))
task4 = PythonOperator(task_id='task4', python_callable=lambda: print('Hi form task4'))
task5 = PythonOperator(task_id='task5', python_callable=lambda: print('Hi form task5'))
task6 = PythonOperator(task_id='task6', python_callable=lambda: print('Hi form task6'))
task7 = PythonOperator(task_id='task7', python_callable=lambda: print('Hi form task7'))
# only run one task according to branch_choice_task
branch_choice_task >> [task1, task2, task3, task4, task5, task6, task7]
- 下面是执行情况,由于今天是 2023-06-27 周二,所以只执行了 task2,其他的 task 都跳过了。