在 airflow 中,operator 一般(not always)是原子的,也就是说,他们一般独立执行,同时也不需要和其他 operator 共享信息,如果两个 operators 需要共享信息,如 filename 之类的, 推荐将这两个 operators 组合成一个 operator。如果实在不能避免,则可以使用 XComs (cross-communication) 来实现。XComs 用来在不同tasks 之间交换信息。
看下面的示例
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# 必须是 context['task_instance']
def task1_function(**context):
print("do some work at task1.")
context['task_instance'].xcom_push(key='key1', value="value_from_task1")
# 必须是 context['task_instance']
def task2_function(**context):
print("do some work at task2.")
value_from_task1_key1 = context['task_instance'].xcom_pull(key='key1', task_ids="task1")
print("value_from_task1_key1:" + value_from_task1_key1)
with DAG(
dag_id='dag1',
default_args=default_args,
description='A simple DAG',
schedule_interval='@once',
start_date=datetime(2023, 7, 25),
catchup=False,
tags=['newExample'],
) as dag:
# task1 执行完毕后会写入数据到xcom:key=key1 value=value_from_task1
task1 = PythonOperator(
task_id='task1',
provide_context=True,
python_callable=task1_function,
)
# task2 的 task2_function 函数中获取 task1 写xcom的数据
task2 = PythonOperator(
task_id='task2',
provide_context=True,
python_callable=task2_function,
)
# task3 的 bash_command 中获取 task1 写xcom的数据
task3 = BashOperator(
task_id='task3',
bash_command="echo value from task1: {{ task_instance.xcom_pull(key='key1', task_ids='task1') }} "
)
task1 >> task2
task1 >> task3