Airflow跨dag依赖的特殊情况
问题场景
在使用跨dag依赖task的时候,通常情况下是依赖的dag的execution_date小于当前dag的execution_date的,如果依赖的task是在将来执行的(execution_date大于当前dag的execution_date),这时候该怎么办呢?
在网上搜索 ExternalTaskSensor 的 execution_delta 参数的时候,得到的结果都会是:两个 dag 的 execution_date 的差值,这没有错,这是有前提的,前提就是你依赖的 dag 的 execution_date 必须小于 当前 dag 的 execution_date。
解决方法
- 假设 dag1 是每天 0:30 执行,比如 execution_date 为 2023-08-18 00:30:00
- 假设 dag2 是每天 5:30 执行,比如 execution_date 为 2023-08-18 05:30:00
- 假设 dag1 里面的 taska 依赖 dag2 里面的 taskb,这时在dag1里面在如何配置ExternalTaskSensor
在 dag1 中的定义外部依赖:
taska = DummyOperator(task_id='taska')
external_dag2_taskb = ExternalTaskSensor(
task_id='external_dag2_taskb',
external_dag_id='dag2',
external_task_id='taskb',
execution_delta=timedelta(minutes=-300),
allowed_states=['success'],
mode='reschedule',
check_existence=True
)
external_dag2_taskb >> taska
注意:由于依赖的dag 的execution_date比自己的 execution_date 大,所以 execution_delt 要为负数。
实现原理
主要源码如下:
class ExternalTaskSensor(BaseSensorOperator):
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task or DAG.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: Optional[datetime.timedelta]
......
......
@provide_session
def poke(self, context, session=None):
if self.execution_delta:
dttm = context['execution_date'] - self.execution_delta
elif self.execution_date_fn:
dttm = self._handle_execution_date_fn(context=context)
else:
dttm = context['execution_date']
dttm_filter = dttm if isinstance(dttm, list) else [dttm]
serialized_dttm_filter = ','.join(dt.isoformat() for dt in dttm_filter)
self.log.info(
'Poking for tasks %s in dag %s on %s ... ',
self.external_task_ids,
self.external_dag_id,
serialized_dttm_filter,
)
# In poke mode this will check dag existence only once
if self.check_existence and not self._has_checked_existence:
self._check_for_existence(session=session)
count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
count_failed = -1
if self.failed_states:
count_failed = self.get_count(dttm_filter, session, self.failed_states)
if count_failed == len(dttm_filter):
if self.external_task_ids:
raise AirflowException(
f'Some of the external tasks {self.external_task_ids} '
f'in DAG {self.external_dag_id} failed.'
)
else:
raise AirflowException(f'The external DAG {self.external_dag_id} failed.')
return count_allowed == len(dttm_filter)
实现逻辑:
- 使用当前dag 的execution_date 减去 execution_delt 得到 dttm_filter
- 然后把 dttm_filter 当做 execution_date 去数据库查询指定的 task_id 的状态是否是 success,如果这些task都成功了,那ExternalTaskSensor任务就成功了。