已复制
全屏展示
复制代码

Airflow单dag内多个crontab任务


· 3 min read

一. 简要概述

现实场景的 dag 最佳实践

为了方便管理,我们通常把某个项目的所有task放在一个dag里面,比如每天运行一次的天级别离线任务,假如这个dag里面有些任务需要在下午17点执行,可以添加DateTimeSensor来实现,所以这一个dag里面的task就可能几百上千个。

单个dag内实现多个crontab任务

假设现在有个项目,需要定义很多的分钟级任务、同时伴随着小时级任务、以及在指定的某个时间点的任务,由于一个dag里面只能配置一个 schedule_interval, 所以没办法把这个项目的所有任务放在一个dag里面,为了方便管理,想把所有的关于这个项目的任务放到一个 dag ,并且可以定义任意的crontab任务,怎么办呢?

解决方法

我们可以把这个dag设置成每分钟都执行。假设我们的任务 t1 每 2 分钟执行一次,我们动态创建一个任务假设叫 cron.*.*.*.*.* ,每分钟去判断 t1 应该的执行时间和 execution_date 的时间,如果相等就执行 t1 任务,不等就调整这个dagrun。实现起来很简单,看代码。

二. 任务实现

需要用到一个三方包croniter

pip install croniter

下面是一个完整的 dag 示例

import re
from airflow import DAG
from croniter import croniter
from airflow.models import BaseOperator
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.exceptions import AirflowSkipException


class CronSkipOperator(BaseOperator):
    _cron_tasks = {}

    def __init__(self, cron_expr: str, **kwargs) -> None:
        assert 'task_id' not in kwargs, "task_id should be set automatically"
        kwargs['task_id'] = f"cron.{CronSkipOperator._cron_expr_2_task_id(cron_expr)}"
        kwargs['doc'] = 'cron_expr: ' + cron_expr
        super().__init__(**kwargs)
        self.cron_expr = cron_expr

    def execute(self, context):
        current_datetime = context["execution_date"] + timedelta(hours=8, minutes=1)
        print(f"cron expression is '{self.cron_expr}'")
        if not croniter.match(self.cron_expr, current_datetime):
            # skip the task if the execution date does not match the cron expression
            raise AirflowSkipException(
                f"current_datetime is {current_datetime}, "
                f"execution_date is {context['execution_date']} "
                f"and is not match cron expression '{self.cron_expr}'"
            )

    @staticmethod
    def get_or_create(cron: str, **kwargs):
        cron_expr = re.sub(r'\s{2,}', ' ', cron)
        if cron_expr not in CronSkipOperator._cron_tasks:
            CronSkipOperator._cron_tasks[cron_expr] = CronSkipOperator(cron_expr=cron_expr, **kwargs)
        return CronSkipOperator._cron_tasks[cron_expr]

    @staticmethod
    def _cron_expr_2_task_id(cron_expr: str) -> str:
        """
        encode cron expression with the available characters in task_id.
        because some special characters are not allowed for task_id.
        """
        trans_table = {
            '*': '-',
            ',': '_c',
            '-': '_h',
            '?': '_q',
            '/': '_s',
            ' ': '.'
        }
        return ''.join([ch if ch.isalnum() else trans_table[ch] for ch in cron_expr])


def cst_second(logical_date, interval=8):
    new_date = (logical_date + timedelta(hours=interval))
    return new_date.strftime("%Y-%m-%d %H:%M:%S")


default_args = {
    'owner': 'bigdata_team',
    'retries': 0,
    'depends_on_past': False,
    'priority_weight': 9999,
}

with DAG(
        dag_id='dag_project_minutely',
        description='for tasks running at any time like crontab',
        default_args=default_args,
        max_active_tasks=9999,
        schedule_interval='* * * * *',  # run every minute
        start_date=datetime(2023, 5, 15),
        catchup=False,
        tags=['big_data'],
        user_defined_macros={
            "cst_second": cst_second,
        },
) as dag:
    task1 = BashOperator(
        task_id='task1',
        bash_command="echo task1 logical_date: {{ cst_second(logical_date) }} >> /tmp/log.log"
    )

    task2 = BashOperator(
        task_id='task2',
        bash_command="echo task2 logical_date: {{ cst_second(logical_date) }} >> /tmp/log.log"
    )

    CronSkipOperator.get_or_create(cron='*/2 * * * *', retries=3) >> [task1]
    CronSkipOperator.get_or_create(cron='*/3 * * * *', retries=3) >> [task2]

  • CronSkipOperator.get_or_create() 传入任意的crontab字符串,和Linux保持一致。
  • 如果没有满足在指定的时间运行任务,则跳过当前这个 dagrun,粉红色的 dagrun 就是跳过的,不需要执行
  • 关于动态生成的没分钟 task_id 可读性问题: 在web界面点击一个 task 示例,里面可以看到实际的 cron 表达式。

文章推荐