已复制
全屏展示
复制代码

Airflow 中如何使用 XComs

· 1 min read

在 airflow 中,operator 一般(not always)是原子的,也就是说,他们一般独立执行,同时也不需要和其他 operator 共享信息,如果两个 operators 需要共享信息,如 filename 之类的, 推荐将这两个 operators 组合成一个 operator。如果实在不能避免,则可以使用 XComs (cross-communication) 来实现。XComs 用来在不同tasks 之间交换信息。

看下面的示例

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


# 必须是 context['task_instance']
def task1_function(**context):
    print("do some work at task1.")
    context['task_instance'].xcom_push(key='key1', value="value_from_task1")


# 必须是 context['task_instance']
def task2_function(**context):
    print("do some work at task2.")
    value_from_task1_key1 = context['task_instance'].xcom_pull(key='key1', task_ids="task1")
    print("value_from_task1_key1:" + value_from_task1_key1)


with DAG(
        dag_id='dag1',
        default_args=default_args,
        description='A simple DAG',
        schedule_interval='@once',
        start_date=datetime(2023, 7, 25),
        catchup=False,
        tags=['newExample'],
) as dag:
    # task1 执行完毕后会写入数据到xcom:key=key1 value=value_from_task1
    task1 = PythonOperator(
        task_id='task1',
        provide_context=True,
        python_callable=task1_function,
    )

    # task2 的 task2_function 函数中获取 task1 写xcom的数据
    task2 = PythonOperator(
        task_id='task2',
        provide_context=True,
        python_callable=task2_function,
    )

    # task3 的 bash_command 中获取 task1 写xcom的数据
    task3 = BashOperator(
        task_id='task3',
        bash_command="echo value from task1: {{ task_instance.xcom_pull(key='key1', task_ids='task1') }} "
    )
    task1 >> task2
    task1 >> task3
🔗

文章推荐