已复制
全屏展示
复制代码

Airflow跨dag依赖的特殊情况


· 2 min read

问题场景

在使用跨dag依赖task的时候,通常情况下是依赖的dag的execution_date小于当前dag的execution_date的,如果依赖的task是在将来执行的(execution_date大于当前dag的execution_date),这时候该怎么办呢?

在网上搜索 ExternalTaskSensor 的 execution_delta 参数的时候,得到的结果都会是:两个 dag 的 execution_date 的差值,这没有错,这是有前提的,前提就是你依赖的 dag 的 execution_date 必须小于 当前 dag 的 execution_date。

解决方法

  • 假设 dag1 是每天 0:30 执行,比如 execution_date 为 2023-08-18 00:30:00
  • 假设 dag2 是每天 5:30 执行,比如 execution_date 为 2023-08-18 05:30:00
  • 假设 dag1 里面的 taska 依赖 dag2 里面的 taskb,这时在dag1里面在如何配置ExternalTaskSensor

在 dag1 中的定义外部依赖:

taska = DummyOperator(task_id='taska')

external_dag2_taskb = ExternalTaskSensor(
    task_id='external_dag2_taskb',
    external_dag_id='dag2',
    external_task_id='taskb',
    execution_delta=timedelta(minutes=-300),
    allowed_states=['success'],
    mode='reschedule',
    check_existence=True
)

external_dag2_taskb >> taska

注意:由于依赖的dag 的execution_date比自己的 execution_date 大,所以 execution_delt 要为负数。

实现原理

主要源码如下:

class ExternalTaskSensor(BaseSensorOperator):
    :param execution_delta: time difference with the previous execution to
        look at, the default is the same execution_date as the current task or DAG.
        For yesterday, use [positive!] datetime.timedelta(days=1). Either
        execution_delta or execution_date_fn can be passed to
        ExternalTaskSensor, but not both.
    :type execution_delta: Optional[datetime.timedelta]
    ......
    ......
    
    @provide_session
    def poke(self, context, session=None):
        if self.execution_delta:
            dttm = context['execution_date'] - self.execution_delta
        elif self.execution_date_fn:
            dttm = self._handle_execution_date_fn(context=context)
        else:
            dttm = context['execution_date']

        dttm_filter = dttm if isinstance(dttm, list) else [dttm]
        serialized_dttm_filter = ','.join(dt.isoformat() for dt in dttm_filter)

        self.log.info(
            'Poking for tasks %s in dag %s on %s ... ',
            self.external_task_ids,
            self.external_dag_id,
            serialized_dttm_filter,
        )

        # In poke mode this will check dag existence only once
        if self.check_existence and not self._has_checked_existence:
            self._check_for_existence(session=session)

        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)

        count_failed = -1
        if self.failed_states:
            count_failed = self.get_count(dttm_filter, session, self.failed_states)

        if count_failed == len(dttm_filter):
            if self.external_task_ids:
                raise AirflowException(
                    f'Some of the external tasks {self.external_task_ids} '
                    f'in DAG {self.external_dag_id} failed.'
                )
            else:
                raise AirflowException(f'The external DAG {self.external_dag_id} failed.')

        return count_allowed == len(dttm_filter)

实现逻辑:

  • 使用当前dag 的execution_date 减去 execution_delt 得到 dttm_filter
  • 然后把 dttm_filter 当做 execution_date 去数据库查询指定的 task_id 的状态是否是 success,如果这些task都成功了,那ExternalTaskSensor任务就成功了。
🔗

文章推荐