Airflow配置时区与任务调度时间
一. 时区配置
Airflow中所有的和时间相关的参数都必须保证时区的正确设置,所以先确保配置了正确的时区,时区只针对于 schedule_interval 和 web ui 生效,下面是时区示例。
$AIRFLOW_HOME/airflow.cfg
default_timezone = Asia/Shanghai
default_ui_timezone = Asia/Shanghai
尽管在设置了时区,但是 Airflow 的默认时间变量还是用的 UTC 时间。
- data_interval_start
- data_interval_end
- logical_date
- ds
- ds_nodash
- ts
- ts_nodash
二. 时间变量
注意:所有的时间变量都是 UTC 时间的,都需要加上 8 以后再使用。
Airflow中有许多时间变量,这些时间变量可以让我们在定义任务、失败重跑时非常方便。其中最常用的也是最重要的就是 logical_date
,它表示当前执行task所在的 schedule_interval
时间点的上一个 period 时间点,即通常的场景:今天处理昨天的数据(schedule_interval
每天执行一次),那 logical_date
就是昨天。当前小时处理上一个小时的数据(schedule_interval
每小时执行一次),那logical_date
就是上一个小时。我们可以把logical_date
叫做数据时间 etl_date
。
2.1 常用时间变量
变量名称 | 类型 | 变量含义 |
---|---|---|
{{ logical_date }} |
datetime | 数据时间 etl_date |
{{ data_interval_start }} |
datetime | 当前运行的 logical_date,可能是interval时间点,可能是start_date(首次激活时) |
{{ data_interval_end }} |
datetime | 下一次运行的 logical_date,通常是 interval 的时间点。 |
{{ ds }} |
str | YYYY-MM-DD形式日期,同 {{ dag_run.logical_date |
{{ ds_nodash }} |
str | YYYYMMDD形式日期,同 {{ dag_run.logical_date |
{{ ts }} |
str | 同 {{ dag_run.logical_date |
{{ ts_nodash }} |
str | 同 {{ dag_run.logical_date |
2.2 过时时间变量
下面这是是过时的时间变量,但是为了兼容旧的代码,任然可以使用。
变量名称 | 类型 | 变量含义 |
---|---|---|
{{ execution_date }} |
datetime | 同 logical_date,由于不容易被理解,所以才新出了 logical_date |
{{ next_execution_date }} |
datetime | 同 data_interval_end |
{{ prev_execution_date }} |
datetime | logical_date 的前一个时间,不存在则为 None |
{{ next_ds }} |
str | 为下一次运行的 logical_date 的 YYYY-MM-DD 形式日期,不存在下一次运行则为 None |
{{ next_ds_nodash }} |
str | 为下一次运行的 logical_date 的 YYYYMMDD 形式日期,不存在下一次运行则为 None |
{{ prev_ds }} |
str | logical_date 的前一个时间的 YYYY-MM-DD 形式日期,不存在则为 None |
{{ prev_ds_nodash }} |
str | logical_date 的前一个时间的 YYYYMMDD 形式日期,不存在则为 None |
{{ yesterday_ds }} |
str | logical_date 的前一天的 YYYY-MM-DD 形式日期(注意是前一天) |
{{ yesterday_ds_nodash }} |
str | logical_date 的前一天的 YYYYMMDD 形式日期(注意是前一天) |
{{ tomorrow_ds }} |
str | logical_date 的后一天的 YYYY-MM-DD 形式日期(注意是后一天) |
{{ tomorrow_ds_nodash }} |
str | logical_date 的后一天的 YYYYMMDD 形式日期(注意是后一天) |
三. DAG参数
start_date
表示 dag 的开始运行时间,start_date 是可以精确到秒的,所以 datetime(2022, 4, 17)
实际是 datetime(2022, 4, 17, 0, 0, 0)
,注意:不要写一个动态的时间比如 datetime.now()
,一定要写一个固定的时间。首次运行情况要根据 当前时间、schedule_interval、start_date
来判断,见后文。
end_date
dag 停止运行的时间。
catchup
是否补齐从 start_date
到上一个 scheduler_interval 时间点的数据,默认我们都设置成 False,历史数据手动刷。
schedule_interval
定时计划:分别是 分、时、日、月、周,和 linux 的 crontab 的语法保持一致。
default_args
default_args 是 task 的默认参数,在定义task的时候,可以在 Operator 的参数里面覆盖这些参数。
default_args = {
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1),
'depends_on_past': False,
}
with DAG(
....
default_args=default_args,
....
) as dag:
....
四. 首次激活dag是否立即运行
首次激活 dag 是否立即运行,主要看你的配置,如果 catchup=False
并且 DAG 是首次激活,并且该 DAG 不依赖于其他 DAG 或 TASK,主要是看 start_date
和 scheduler_interval
的值定:
- 首先,我们激活时间肯定是在某个
scheduler_interval
区间的,比如上图中的 int_2 和 int_3 之间 - 当 start_date < int_1 时,即 start_date_1,激活时立即运行。此时 logical_date 为 int_1
- 当 int_1 < start_date < int_2 时,即 start_date_2,激活时立即运行。此时 logical_date 为 start_date_2
- 当 int_2 < start_date < int_3 时,即 start_date_3,激活时不会立即运行,等到 int_3 时运行,此时 logical_date 为 int_2
- 当 int_3 < start_date < int_4 时,即 start_date_4,激活时不会立即运行,等到 int_4 时运行,此时 logical_date 为 int_3
五. 加时区 8 实际使用
我们可以自定义一个函数,专门用来计算 logical_date + 8,也就是北京时间的 logical_date,然后注册到 user_defined_macros 里面,看示例
import datetime
from airflow import DAG
from textwrap import dedent
from airflow.operators.bash import BashOperator
def cst(logical_date, interval=8):
new_date = (logical_date + datetime.timedelta(hours=interval))
return new_date
with DAG(
dag_id='test_dag',
description='run test_dag',
tags=['big_data'],
max_active_tasks=3,
catchup=False,
default_args={
'owner': 'bigdata_team',
'depends_on_past': False,
'retries': 0,
},
schedule_interval='* * * * *',
start_date=datetime.datetime(2023, 7, 16, 20, 0),
user_defined_macros={
"cst": cst,
},
) as dag:
exec_bash_script = dedent(
"""
echo current date: $(date) logical_date: {{ cst(logical_date) }} >> /tmp/task1
"""
)
task1 = BashOperator(
task_id='task1',
bash_command=exec_bash_script,
)
- /tmp/task1 日志
$ cat /tmp/task1
current date: Mon Jul 17 20:57:01 CST 2023 logical_date: 2023-07-17T20:56:00+00:00
current date: Mon Jul 17 20:58:01 CST 2023 logical_date: 2023-07-17T20:57:00+00:00