已复制
全屏展示
复制代码

Airflow实用技巧与最佳实践


· 5 min read

一. 调度系统选择

1.1 为什么选择 Airflow

  • 方便查看任务之间的依赖关系(在dag文件中跳转查看、而不必要在图形界面中看得眼花缭乱)。
  • 功能强大,自定义能力强。
  • 2 万千用户的选择,github星多,社区活跃,遇到问题方便解决。
  • 避免了任务不进代码库的问题(如果使用dophinscheduler,它可以在图形界面添加任务,时间长了就不知道这个任务的作者、用途、来源)。

1.2 Dolphin 还使用吗

Airflow对于开发来说是比较好用的,但是对应产品、运营、分析师就不友好的,可以在搭建一套 dolphinscheduler,用于产品、运营、分析师的自定义定时任务查询,不管是 Airflow 还是 dolphin,都应该做好任务归属问题,避免时间久了成为 “孤儿”。

二. Airflow注意事项

  • dag文件中不允许有中文出现。
  • 默认使用的是UTC时间,所以在调用脚本时要统一加上时区,同时配置airflow.cfg两个时区。
default_timezone = Asia/Shanghai
default_ui_timezone = Asia/Shanghai
  • 创建专门管理 dag 的 git 项目,这项目可能会包含多个项目的dag,可以统一部署dag、dag可进入版本管理。
  • 确定task放在哪个dag文件中?根据项目区分,同一个项目的task放在同一个dag文件,不要按照执行时间放置。
  • 如果同一个项目有月、周、天、小时级别的任务,则dag命名示例如下:
dag_poke_day.py   # 负责月、周、天级别任务
dag_poke_hour.py  # 负责 小时 级别任务


# 负责其他类型时间的任务,可以有多个,根据场景需求创建 dag
# 如果同时需要分钟级、指定时间型任务,可使用自定义动态operator实现
dag_poke_${scene}.py

三. Airflow最佳示例

3.1 任务dag文件

  • dag_poke_day.py 天级别 dag 示例,注意:下面的示例中的中文一定要去掉,Airflow 不支持中文。
import datetime
from airflow import DAG
from textwrap import dedent
from airflow.sensors.date_time import DateTimeSensor
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.apache.hdfs.sensors.hdfs import HdfsSensor
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor


def cst(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return new_date


def cst_num(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return new_date.strftime("%Y%m%d")


def cst_dash(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return new_date.strftime("%Y-%m-%d")


def cst_hour(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return new_date.strftime("%Y-%m-%d %H:00:00")


def cst_minute(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return new_date.strftime("%Y-%m-%d %H:%M:00")


def cst_second(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return new_date.strftime("%Y-%m-%d %H:%M:%S")


def partition_ymd(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    y, m, d = str(new_date)[0:4], str(new_date)[5:7], str(new_date)[8:10]
    pt = 'year=%s and month=%s and day=%s' % (y, m, d)
    return pt


def partition_dash(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return 'dt=%s' % (new_date.strftime("%Y-%m-%d"))


def partition_num(logical_date, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return 'dt=%s' % (new_date.strftime("%Y%m%d"))


def partition_hour(logical_date, hour, interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    str_hour = "{:0>2d}".format(hour)
    return 'dt=%s and hour=%s' % (new_date.strftime("%Y%m%d"), str_hour)


def target_datetime(logical_date, hour, minute=0, second=0, interval=8):
    # return today's datetime
    cst_ = (logical_date + datetime.timedelta(hours=interval + 24))
    str_hour = "{:0>2d}".format(hour)
    str_minute = "{:0>2d}".format(minute)
    str_second = "{:0>2d}".format(second)
    target_datetime_str = cst_.strftime("%Y-%m-%d") + " %s:%s:%s" % (
        str_hour, str_minute, str_second
    )
    return target_datetime_str


def date_format(logical_date, fmt="%Y-%m-%d", interval=8):
    new_date = (logical_date + datetime.timedelta(hours=interval))
    return new_date.strftime(fmt)


project1_home = '/opt/warehouse/project1'
project1_home_log = '/opt/warehouse/project1_log'

default_args = {
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
    'depends_on_past': False,
}

with DAG(
        dag_id='dag_poke_day',
        description='run dag_poke_day everyday',
        tags=['big_data'],
        max_active_tasks=3,
        catchup=False,
        default_args=default_args,
        schedule_interval='10 0 * * *',
        start_date=datetime.datetime(2023, 6, 21, 20, 0),
        user_defined_macros={
            "cst": cst,
            "cst_num": cst_num,
            "cst_dash": cst_dash,
            "cst_hour": cst_hour,
            "cst_minute": cst_minute,
            "cst_second": cst_second,
            "partition_ymd": partition_ymd,
            "partition_dash": partition_dash,
            "partition_num": partition_num,
            "target_datetime": target_datetime,
            "date_format": date_format,
            "project1_home": project1_home,
            "project1_home_log": project1_home_log,
        },
) as dag:
    # start template ##########################################
    exec_bash_script = dedent(
        """
        mkdir -p {{project1_home_log}}/{{cst_num(logical_date)}}
        bash {{project1_home}}/bin/{{params.script_name}} {{cst_dash(logical_date)}} \
            1>> {{project1_home_log}}/{{cst_num(logical_date)}}/{{params.log_name}} 2>&1
        """
    )
    # end template ##########################################

    # start sensor ##########################################
    execute_hour_at_8 = DateTimeSensor(
        task_id='execute_hour_at_8',
        mode='reschedule',
        poke_interval=60,
        target_time="{{ target_datetime(logical_date,hour=8) }}",
    )

    execute_hour_at_17 = DateTimeSensor(
        task_id='execute_hour_at_17',
        mode='reschedule',
        poke_interval=60,
        target_time="{{ target_datetime(logical_date,hour=17) }}",
    )

    app_log_ktv = HivePartitionSensor(
        task_id='app_log_ktv',
        metastore_conn_id='hive_conn',
        table='poke_topic.app_log_ktv',
        mode='reschedule',
        poke_interval=60,
        partition='{{ partition_dash(logical_date) }}',
    )

    ods_android_log = HdfsSensor(
        task_id='ods_android_log',
        hdfs_conn_id="hdfs_conn",
        mode='reschedule',
        poke_interval=60,
        filepath="/user/hive/xxxx/{{date_format(logical_date)}}*.csv",
    )
    # end sensor ##########################################

    ods_family_relation = SSHOperator(
        ssh_conn_id='ssh_datanode01',
        task_id='ods_family_relation',
        depends_on_past=False,
        command=exec_bash_script,
        params={'script_name': 'ods_family_relation.sh',
                'log_name': 'ods_family_relation'}
    )

    [execute_hour_at_8] >> ods_family_relation

3.2 监控dag文件

用于监控其他的 dag 是否正常执行,是否执行超时等作用。所有的dag检查都放在这一个dag里面,dag 里面使用 DateTimeSensor 来控制检查时间。

3.3 部署dag文件

用于将 dag 文件部署到线上环境的脚步,部署前配置免密登录。

  • deploy_dag_tool.sh
#!/bin/bash
#  1. 该脚本用于部署 dag 文件至 airflow 两台节点:x.x.x.x y.y.y.y
#  2. 该脚本需要传一个参数 dag_file_path
#  示例: bash deploy_dag_tool.sh ./dag_poke_day.py

if [[ $# -ne 1 ]]; then
    echo "please check parameters"
    exit
fi
dag_file_path=$1
dag_file=${dag_file_path##*/}  # 拿掉最后一个'/'及其左边的字符串

# 参数检测:参数1是否是.py结尾的文件
if [ "${dag_file_path##*.}" != 'py' ]; then
    echo "Parameter needs to be like 'xxx.py'"
    exit 1
fi

# 检测文件中是否包含中文字符(目前airflow不支持中文)
python "$dag_file_path" 1>/tmp/chinese_check.log 2>&1
num=$(grep -c "SyntaxError: Non-ASCII character" /tmp/chinese_check.log)
if [[ "$num" -ne "0" ]]; then
    cat /tmp/chinese_check.log
    echo "error: airflow don't support chinese characters"
    exit 1
fi

# 需要部署的 airflow 节点 && airflow 节点中 dags 文件存放目录
host_list=(username@x.x.x.x username@y.y.y.y)
remote_dag_dir=/opt/airflow/dags

# 循环部署
echo "======== start deploy ========"
for host in "${host_list[@]}"
do
    # 同步到airflow指定节点
    scp "${dag_file_path}" "$host:$remote_dag_dir"
    if [[ $? -ne 0 ]]; then
        echo "======== failed at scp ${dag_file_path} to ${host} ========"
        exit 1
    fi

    # 检查 dag 文件能否正常执行
    ssh "${host}" "python3 ${remote_dag_dir}/${dag_file}"
    if [[ $? -ne 0 ]]; then
        echo "======== failed at check ${dag_file} contents ========"
        exit 1
    fi
done
echo "======== deploy success ========"

文章推荐