已复制
全屏展示
复制代码

Airflow任务与DAG执行监控实践


· 5 min read

一. DAG内部任务监控

内部任务监控
  • 当某个 task 执行完毕后,如果 task 失败则发送报警,在 default_args 里面配置on_failure_callback
  • 当某个 dag 执行完毕后,如果 dag 失败则发送报警,在 DAG 参数里面配置 on_failure_callback
一个完整示例
  • dag1.py
import os
import datetime
from airflow import DAG
from typing import Callable, Optional, List, Dict
from airflow.operators.python import PythonOperator


class PythonOperatorNew(PythonOperator):
    def __init__(
            self,
            *,
            python_callable: Callable,
            op_args: Optional[List] = None,
            op_kwargs: Optional[Dict] = None,
            templates_dict: Optional[Dict] = None,
            templates_exts: Optional[List[str]] = None,
            **kwargs,
    ) -> None:
        assert 'owner' in kwargs, "you must specify owner like this: owner=xxx "
        super().__init__(
            python_callable=python_callable,
            op_args=op_args,
            op_kwargs=op_kwargs,
            templates_dict=templates_dict,
            templates_exts=templates_exts,
            **kwargs
        )


def send_alarm(message):
    print("send alarm: " + message)
    os.system('echo "%s" >> /tmp/xxx.log' % message)


def task_failure_call_this(context):
    interval = 8
    dag = context['dag']
    dag_run = context['dag_run']
    task_instance = context['task_instance']

    dag_id = dag.dag_id
    task_id = task_instance.task_id
    task_owner = task_instance.task.owner
    logical_date = dag_run.logical_date + datetime.timedelta(hours=interval)
    message = 'airflow task failed => dag_id:%s, task_id:%s, task_owner:%s, logical_date:%s' % (
        dag_id, task_id, task_owner, logical_date.strftime('%Y-%m-%d %H:%M:%S')
    )
    print("task_failure_call_this message: " + message)
    send_alarm(message)


def dag_failure_call_this(context):
    interval = 8
    dag = context['dag']
    dag_run = context['dag_run']

    dag_id = dag.dag_id
    dag_owners = dag.owner
    logical_date = dag_run.logical_date + datetime.timedelta(hours=interval)
    dag_fail_reason = context['reason']
    message = 'airflow dag failed => dag_id:%s, dag_owners:[%s], logical_date:%s, reason:%s' % (
        dag_id, dag_owners, logical_date.strftime('%Y-%m-%d %H:%M:%S'), dag_fail_reason
    )
    print("dag_failure_call_this message: " + message)
    send_alarm(message)


def task_1(**context):
    print("task_1 is running.")
    if context['dag_run'].logical_date.minute % 2 == 0:
        raise Exception("error from task_1")


def task_2(**context):
    print("task_2 is running.")
    if context['dag_run'].logical_date.minute % 2 != 0:
        raise Exception("error from task_2")


def task_3(**context):
    print("task_3 is running.")
    raise Exception("error from task_3")


default_args = {
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=1),
    'depends_on_past': False,
    'on_failure_callback': task_failure_call_this
}

with DAG(
        dag_id='dag1',
        schedule_interval="* * * * *",
        start_date=datetime.datetime(2023, 6, 28, 17),
        catchup=False,
        tags=['bigdata'],
        default_args=default_args,
        on_failure_callback=dag_failure_call_this,
) as dag:
    task1 = PythonOperatorNew(task_id='task1', python_callable=task_1, owner='yuziyue1')
    task2 = PythonOperatorNew(task_id='task2', python_callable=task_2, owner='yuziyue2')
    task3 = PythonOperatorNew(task_id='task3', python_callable=task_3)
  • 首先自定了PythonOperatorNew,强制用户定义 task 时指定 owner,便于任务失败时发送通知。
  • dag1每分钟运行一次
  • task 失败后会 1 分钟后再尝试一次,尝试失败后 task 标记为失败,并调用指定函数(你可以配置发送通知)。
  • dag运行结束后,如果 dag 中有任意一个 task 运行失败,则调用指定函数(你可以配置发送通知)。
  • dag 或 task 失败后可获取这些变量 dag_id, task_id, task_owner, dag_owners, logical_date, dag_fail_reason
  • dag_owners:表示 dag 中的所有 task 的 owner 集合。

二. DAG运行情况监控

上面是 dag 内部的运行情况,如果某个 dag 一直运行着且不报错,这时我们怎么知道它是什么状态呢? 解决办法就是再定义一个 dag, 专门用来监控所有的 dag 运行状态的: 当某个时间点后,检查指定的dag、task,如果状态不是成功的(failed、running)就报警,主要是下面这两种情况:

  • 在指定时间点检查 dag_run 是否完成。
  • 在指定时间点检查 dag_run 下面的指定 task 是否完成。

  • airflow_dag_monitor.py
import json
import os
import requests
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.date_time import DateTimeSensor

AIRFLOW_HEADERS = {
    'Accept': 'application/json'
}
AIRFLOW_AUTH = ("username_xxx", "password_xxx")


def get_local_logical_date(lg, interval=8):
    time_string = lg[:-3] + lg[-2:]
    lg_date = datetime.datetime.strptime(time_string, '%Y-%m-%dT%H:%M:%S%z')
    return (lg_date + datetime.timedelta(hours=interval)).strftime('%Y-%m-%d %H:%M:%S')


def target_datetime(logical_date, hour, minute=0, second=0, interval=8):
    # return today's datetime
    cst_ = (logical_date + datetime.timedelta(hours=interval + 24))
    str_hour = "{:0>2d}".format(hour)
    str_minute = "{:0>2d}".format(minute)
    str_second = "{:0>2d}".format(second)
    target_datetime_str = cst_.strftime("%Y-%m-%d") + " %s:%s:%s" % (
        str_hour, str_minute, str_second
    )
    return target_datetime_str


def send_alarm(message):
    print("send alarm: " + message)
    os.system('echo "%s" >> /tmp/xxx.log' % message)


def task_failure_call_this(context):
    interval = 8
    dag = context['dag']
    dag_run = context['dag_run']
    task_instance = context['task_instance']

    dag_id = dag.dag_id
    task_id = task_instance.task_id
    task_owner = task_instance.task.owner
    logical_date = dag_run.logical_date + datetime.timedelta(hours=interval)
    message = 'airflow task failed => dag_id:%s, task_id:%s, task_owner:%s, logical_date:%s' % (
        dag_id, task_id, task_owner, logical_date.strftime('%Y-%m-%d %H:%M:%S')
    )
    print("task_failure_call_this message: " + message)
    send_alarm(message)


def dag_failure_call_this(context):
    interval = 8
    dag = context['dag']
    dag_run = context['dag_run']

    dag_id = dag.dag_id
    dag_owners = dag.owner
    logical_date = dag_run.logical_date + datetime.timedelta(hours=interval)
    dag_fail_reason = context['reason']
    message = 'airflow dag failed => dag_id:%s, dag_owners:[%s], logical_date:%s, reason:%s' % (
        dag_id, dag_owners, logical_date.strftime('%Y-%m-%d %H:%M:%S'), dag_fail_reason
    )
    print("dag_failure_call_this message: " + message)
    send_alarm(message)


def check_latest_dag_run_status(**kwargs):
    dag_ids = kwargs['monitored_dag_ids']
    for dag_id in dag_ids:
        print("check_dag_status: " + dag_id + "...")
        url = 'http://xxx/api/v1/dags/%s/dagRuns?limit=1&order_by=-start_date' % dag_id
        r = requests.get(url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
        dag_runs = r.json()
        print("dag_id latest dag run :" + json.dumps(dag_runs["dag_runs"][0]))
        state = dag_runs["dag_runs"][0]["state"]
        dag_id = dag_runs["dag_runs"][0]["dag_id"]
        utc_logical_date = dag_runs["dag_runs"][0]["logical_date"]
        local_logical_date = get_local_logical_date(utc_logical_date)
        if state != 'success':
            msg = "Detect dag_id:%s state is %s, logical_date: %s please check" % (
                dag_id, state, local_logical_date)
            send_alarm(msg)


def check_latest_task_instance_status(**kwargs):
    monitored_task_ids = kwargs['params']['monitored_task_ids']
    for dag_item in monitored_task_ids:
        dag_id = dag_item['dag_id']
        task_ids = dag_item['task_ids']
        latest_dag_run_url = 'http://xxx/api/v1/dags/%s/dagRuns?limit=1&order_by=-start_date' % dag_id
        r = requests.get(latest_dag_run_url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
        dag_runs = r.json()
        dag_run_id = dag_runs["dag_runs"][0]["dag_run_id"]
        utc_logical_date = dag_runs["dag_runs"][0]["logical_date"]
        local_logical_date = get_local_logical_date(utc_logical_date)

        error_ts_url = 'http://xxx/api/v1/dags/%s/dagRuns/%s/taskInstances?limit=1000&state=failed&state=upstream_failed' % (
            dag_id, dag_run_id)
        r = requests.get(error_ts_url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
        task_instances = r.json()
        task_instances_list = task_instances['task_instances']
        error_tasks = []
        for ts in task_instances_list:
            task_id = ts['task_id']
            if task_id in task_ids:
                error_tasks.append(task_id)
        if len(error_tasks) > 0:
            message = 'Detect These tasks failed: %s. logical_date: %s' % (
                ','.join(error_tasks), local_logical_date)
            send_alarm(message)


with DAG(
        dag_id='airflow_dag_monitor',
        description='airflow_dag_monitor for monitor other dags',
        default_args={
            'retries': 2,
            'retry_delay': datetime.timedelta(minutes=5),
            'depends_on_past': False,
            'on_failure_callback': task_failure_call_this
        },
        max_active_tasks=3,
        schedule_interval='10 0 * * *',
        start_date=datetime.datetime(2023, 6, 28, 17),
        catchup=False,
        tags=['bigdata'],
        on_failure_callback=dag_failure_call_this,
        user_defined_macros={
            "target_datetime": target_datetime,
            "cst_second": cst_second,
        },
) as dag:
    execute_hour_at_7 = DateTimeSensor(
        task_id='execute_hour_at_7',
        mode='reschedule',
        poke_interval=300,
        target_time="{{ target_datetime(logical_date,hour=7) }}",
    )
    execute_hour_at_8 = DateTimeSensor(
        task_id='execute_hour_at_8',
        mode='reschedule',
        poke_interval=300,
        target_time="{{ target_datetime(logical_date,hour=8) }}",
    )

    check_xxx_task_instance = PythonOperator(
        task_id='check_xxx_task_instance',
        python_callable=check_latest_task_instance_status,
        provide_context=True,
        params={
            'monitored_task_ids': [
                {
                    'dag_id': 'custom_dag_name',
                    'task_ids': ('task1', 'task2')
                }
            ]
        }
    )
    [execute_hour_at_7] >> check_xxx_task_instance

    check_xxx_dag_run = PythonOperator(
        task_id='check_xxx_dag_run',
        python_callable=check_latest_dag_run_status,
        op_kwargs={
            'monitored_dag_ids': ['dag_poke_day']
        },
    )
    [execute_hour_at_8] >> check_xxx_dag_run
  • 首先 airflow_dag_monitor 它本身也是一个 dag,它的 task 运行失败了也要报警。所以需要 task_failure_call_thisdag_failure_call_this 两个方法。

  • check_xxx_task_instance在 7 点以后检查指定的task_ids,如果失败则报警。
  • check_latest_task_instance_status调用Airflow的接口查看task状态。

  • check_xxx_dag_run 表示在 8 点以后检查指定的 monitored_dag_ids列表里面的dag_id 的状态,如果失败则报警。
  • check_latest_dag_run_status调用 Airflow 的接口查看 dag 的运行情况。

文章推荐