已复制
全屏展示
复制代码

Airflow配置时区与任务调度时间


· 5 min read

一. 时区配置

Airflow中所有的和时间相关的参数都必须保证时区的正确设置,所以先确保配置了正确的时区,时区只针对于 schedule_interval 和 web ui 生效,下面是时区示例。

$AIRFLOW_HOME/airflow.cfg

default_timezone = Asia/Shanghai
default_ui_timezone = Asia/Shanghai

尽管在设置了时区,但是 Airflow 的默认时间变量还是用的 UTC 时间。

  • data_interval_start
  • data_interval_end
  • logical_date
  • ds
  • ds_nodash
  • ts
  • ts_nodash

二. 时间变量

注意:所有的时间变量都是 UTC 时间的,都需要加上 8 以后再使用。

Airflow中有许多时间变量,这些时间变量可以让我们在定义任务、失败重跑时非常方便。其中最常用的也是最重要的就是 logical_date,它表示当前执行task所在的 schedule_interval 时间点的上一个 period 时间点,即通常的场景:今天处理昨天的数据(schedule_interval每天执行一次),那 logical_date 就是昨天。当前小时处理上一个小时的数据(schedule_interval每小时执行一次),那logical_date就是上一个小时。我们可以把logical_date叫做数据时间 etl_date

2.1 常用时间变量

变量名称 类型 变量含义
{{ logical_date }} datetime 数据时间 etl_date
{{ data_interval_start }} datetime 当前运行的 logical_date,可能是interval时间点,可能是start_date(首次激活时)
{{ data_interval_end }} datetime 下一次运行的 logical_date,通常是 interval 的时间点。
{{ ds }} str YYYY-MM-DD形式日期,同 {{ dag_run.logical_date
{{ ds_nodash }} str YYYYMMDD形式日期,同 {{ dag_run.logical_date
{{ ts }} str 同 {{ dag_run.logical_date
{{ ts_nodash }} str 同 {{ dag_run.logical_date

2.2 过时时间变量

下面这是是过时的时间变量,但是为了兼容旧的代码,任然可以使用。

变量名称 类型 变量含义
{{ execution_date }} datetime 同 logical_date,由于不容易被理解,所以才新出了 logical_date
{{ next_execution_date }} datetime 同 data_interval_end
{{ prev_execution_date }} datetime logical_date 的前一个时间,不存在则为 None
{{ next_ds }} str 为下一次运行的 logical_date 的 YYYY-MM-DD 形式日期,不存在下一次运行则为 None
{{ next_ds_nodash }} str 为下一次运行的 logical_date 的 YYYYMMDD 形式日期,不存在下一次运行则为 None
{{ prev_ds }} str logical_date 的前一个时间的 YYYY-MM-DD 形式日期,不存在则为 None
{{ prev_ds_nodash }} str logical_date 的前一个时间的 YYYYMMDD 形式日期,不存在则为 None
{{ yesterday_ds }} str logical_date 的前一天的 YYYY-MM-DD 形式日期(注意是前一天)
{{ yesterday_ds_nodash }} str logical_date 的前一天的 YYYYMMDD 形式日期(注意是前一天)
{{ tomorrow_ds }} str logical_date 的后一天的 YYYY-MM-DD 形式日期(注意是后一天)
{{ tomorrow_ds_nodash }} str logical_date 的后一天的 YYYYMMDD 形式日期(注意是后一天)

三. DAG参数

start_date

表示 dag 的开始运行时间,start_date 是可以精确到秒的,所以 datetime(2022, 4, 17)实际是 datetime(2022, 4, 17, 0, 0, 0)注意:不要写一个动态的时间比如 datetime.now(),一定要写一个固定的时间。首次运行情况要根据 当前时间、schedule_interval、start_date来判断,见后文。

end_date

dag 停止运行的时间。

catchup

是否补齐从 start_date 到上一个 scheduler_interval 时间点的数据,默认我们都设置成 False,历史数据手动刷。

schedule_interval

定时计划:分别是 分、时、日、月、周,和 linux 的 crontab 的语法保持一致。

default_args

default_args 是 task 的默认参数,在定义task的时候,可以在 Operator 的参数里面覆盖这些参数。

default_args = {
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=1),
    'depends_on_past': False,
}

with DAG(
        ....
        default_args=default_args,
        ....
) as dag:
    ....

四. 首次激活dag是否立即运行

首次激活 dag 是否立即运行,主要看你的配置,如果 catchup=False 并且 DAG 是首次激活,并且该 DAG 不依赖于其他 DAG 或 TASK,主要是看 start_datescheduler_interval的值定:

  • 首先,我们激活时间肯定是在某个scheduler_interval区间的,比如上图中的 int_2 和 int_3 之间
  • 当 start_date < int_1 时,即 start_date_1,激活时立即运行。此时 logical_date 为 int_1
  • 当 int_1 < start_date < int_2 时,即 start_date_2,激活时立即运行。此时 logical_date 为 start_date_2
  • 当 int_2 < start_date < int_3 时,即 start_date_3,激活时不会立即运行,等到 int_3 时运行,此时 logical_date 为 int_2
  • 当 int_3 < start_date < int_4 时,即 start_date_4,激活时不会立即运行,等到 int_4 时运行,此时 logical_date 为 int_3

五. 加时区 8 实际使用

我们可以自定义一个函数,专门用来计算 logical_date + 8,也就是北京时间的 logical_date,然后注册到 user_defined_macros 里面,看示例

import datetime
from airflow import DAG
from textwrap import dedent

from airflow.operators.bash import BashOperator


def cst(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return new_date


with DAG(
        dag_id='test_dag',
        description='run test_dag',
        tags=['big_data'],
        max_active_tasks=3,
        catchup=False,
        default_args={
            'owner': 'bigdata_team',
            'depends_on_past': False,
            'retries': 0,
        },
        schedule_interval='* * * * *',
        start_date=datetime.datetime(2023, 7, 16, 20, 0),
        user_defined_macros={
            "cst": cst,
        },
) as dag:
    exec_bash_script = dedent(
        """
        echo current date: $(date) logical_date: {{ cst(logical_date) }} >> /tmp/task1
        """
    )

    task1 = BashOperator(
        task_id='task1',
        bash_command=exec_bash_script,
    )

  • /tmp/task1 日志
$ cat /tmp/task1
current date: Mon Jul 17 20:57:01 CST 2023 logical_date: 2023-07-17T20:56:00+00:00
current date: Mon Jul 17 20:58:01 CST 2023 logical_date: 2023-07-17T20:57:00+00:00

文章推荐