Airflow任务与DAG执行监控实践
一. DAG内部任务监控
内部任务监控
- 当某个 task 执行完毕后,如果 task 失败则发送报警,在
default_args
里面配置on_failure_callback
。 - 当某个 dag 执行完毕后,如果 dag 失败则发送报警,在
DAG
参数里面配置on_failure_callback
。
一个完整示例
- dag1.py
import os
import datetime
from airflow import DAG
from typing import Callable, Optional, List, Dict
from airflow.operators.python import PythonOperator
class PythonOperatorNew(PythonOperator):
def __init__(
self,
*,
python_callable: Callable,
op_args: Optional[List] = None,
op_kwargs: Optional[Dict] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[List[str]] = None,
**kwargs,
) -> None:
assert 'owner' in kwargs, "you must specify owner like this: owner=xxx "
super().__init__(
python_callable=python_callable,
op_args=op_args,
op_kwargs=op_kwargs,
templates_dict=templates_dict,
templates_exts=templates_exts,
**kwargs
)
def send_alarm(message):
print("send alarm: " + message)
os.system('echo "%s" >> /tmp/xxx.log' % message)
def task_failure_call_this(context):
interval = 8
dag = context['dag']
dag_run = context['dag_run']
task_instance = context['task_instance']
dag_id = dag.dag_id
task_id = task_instance.task_id
task_owner = task_instance.task.owner
logical_date = dag_run.logical_date + datetime.timedelta(hours=interval)
message = 'airflow task failed => dag_id:%s, task_id:%s, task_owner:%s, logical_date:%s' % (
dag_id, task_id, task_owner, logical_date.strftime('%Y-%m-%d %H:%M:%S')
)
print("task_failure_call_this message: " + message)
send_alarm(message)
def dag_failure_call_this(context):
interval = 8
dag = context['dag']
dag_run = context['dag_run']
dag_id = dag.dag_id
dag_owners = dag.owner
logical_date = dag_run.logical_date + datetime.timedelta(hours=interval)
dag_fail_reason = context['reason']
message = 'airflow dag failed => dag_id:%s, dag_owners:[%s], logical_date:%s, reason:%s' % (
dag_id, dag_owners, logical_date.strftime('%Y-%m-%d %H:%M:%S'), dag_fail_reason
)
print("dag_failure_call_this message: " + message)
send_alarm(message)
def task_1(**context):
print("task_1 is running.")
if context['dag_run'].logical_date.minute % 2 == 0:
raise Exception("error from task_1")
def task_2(**context):
print("task_2 is running.")
if context['dag_run'].logical_date.minute % 2 != 0:
raise Exception("error from task_2")
def task_3(**context):
print("task_3 is running.")
raise Exception("error from task_3")
default_args = {
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1),
'depends_on_past': False,
'on_failure_callback': task_failure_call_this
}
with DAG(
dag_id='dag1',
schedule_interval="* * * * *",
start_date=datetime.datetime(2023, 6, 28, 17),
catchup=False,
tags=['bigdata'],
default_args=default_args,
on_failure_callback=dag_failure_call_this,
) as dag:
task1 = PythonOperatorNew(task_id='task1', python_callable=task_1, owner='yuziyue1')
task2 = PythonOperatorNew(task_id='task2', python_callable=task_2, owner='yuziyue2')
task3 = PythonOperatorNew(task_id='task3', python_callable=task_3)
- 首先自定了
PythonOperatorNew
,强制用户定义 task 时指定owner
,便于任务失败时发送通知。 dag1
每分钟运行一次- task 失败后会 1 分钟后再尝试一次,尝试失败后 task 标记为失败,并调用指定函数(你可以配置发送通知)。
- dag运行结束后,如果 dag 中有任意一个 task 运行失败,则调用指定函数(你可以配置发送通知)。
- dag 或 task 失败后可获取这些变量
dag_id, task_id, task_owner, dag_owners, logical_date, dag_fail_reason
dag_owners
:表示 dag 中的所有 task 的 owner 集合。
二. DAG运行情况监控
上面是 dag 内部的运行情况,如果某个 dag 一直运行着且不报错,这时我们怎么知道它是什么状态呢? 解决办法就是再定义一个 dag, 专门用来监控所有的 dag 运行状态的: 当某个时间点后,检查指定的dag、task,如果状态不是成功的(failed、running)就报警,主要是下面这两种情况:
- 在指定时间点检查 dag_run 是否完成。
- 在指定时间点检查 dag_run 下面的指定 task 是否完成。
- airflow_dag_monitor.py
import json
import os
import requests
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.date_time import DateTimeSensor
AIRFLOW_HEADERS = {
'Accept': 'application/json'
}
AIRFLOW_AUTH = ("username_xxx", "password_xxx")
def get_local_logical_date(lg, interval=8):
time_string = lg[:-3] + lg[-2:]
lg_date = datetime.datetime.strptime(time_string, '%Y-%m-%dT%H:%M:%S%z')
return (lg_date + datetime.timedelta(hours=interval)).strftime('%Y-%m-%d %H:%M:%S')
def target_datetime(logical_date, hour, minute=0, second=0, interval=8):
# return today's datetime
cst_ = (logical_date + datetime.timedelta(hours=interval + 24))
str_hour = "{:0>2d}".format(hour)
str_minute = "{:0>2d}".format(minute)
str_second = "{:0>2d}".format(second)
target_datetime_str = cst_.strftime("%Y-%m-%d") + " %s:%s:%s" % (
str_hour, str_minute, str_second
)
return target_datetime_str
def send_alarm(message):
print("send alarm: " + message)
os.system('echo "%s" >> /tmp/xxx.log' % message)
def task_failure_call_this(context):
interval = 8
dag = context['dag']
dag_run = context['dag_run']
task_instance = context['task_instance']
dag_id = dag.dag_id
task_id = task_instance.task_id
task_owner = task_instance.task.owner
logical_date = dag_run.logical_date + datetime.timedelta(hours=interval)
message = 'airflow task failed => dag_id:%s, task_id:%s, task_owner:%s, logical_date:%s' % (
dag_id, task_id, task_owner, logical_date.strftime('%Y-%m-%d %H:%M:%S')
)
print("task_failure_call_this message: " + message)
send_alarm(message)
def dag_failure_call_this(context):
interval = 8
dag = context['dag']
dag_run = context['dag_run']
dag_id = dag.dag_id
dag_owners = dag.owner
logical_date = dag_run.logical_date + datetime.timedelta(hours=interval)
dag_fail_reason = context['reason']
message = 'airflow dag failed => dag_id:%s, dag_owners:[%s], logical_date:%s, reason:%s' % (
dag_id, dag_owners, logical_date.strftime('%Y-%m-%d %H:%M:%S'), dag_fail_reason
)
print("dag_failure_call_this message: " + message)
send_alarm(message)
def check_latest_dag_run_status(**kwargs):
dag_ids = kwargs['monitored_dag_ids']
for dag_id in dag_ids:
print("check_dag_status: " + dag_id + "...")
url = 'http://xxx/api/v1/dags/%s/dagRuns?limit=1&order_by=-start_date' % dag_id
r = requests.get(url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
dag_runs = r.json()
print("dag_id latest dag run :" + json.dumps(dag_runs["dag_runs"][0]))
state = dag_runs["dag_runs"][0]["state"]
dag_id = dag_runs["dag_runs"][0]["dag_id"]
utc_logical_date = dag_runs["dag_runs"][0]["logical_date"]
local_logical_date = get_local_logical_date(utc_logical_date)
if state != 'success':
msg = "Detect dag_id:%s state is %s, logical_date: %s please check" % (
dag_id, state, local_logical_date)
send_alarm(msg)
def check_latest_task_instance_status(**kwargs):
monitored_task_ids = kwargs['params']['monitored_task_ids']
for dag_item in monitored_task_ids:
dag_id = dag_item['dag_id']
task_ids = dag_item['task_ids']
latest_dag_run_url = 'http://xxx/api/v1/dags/%s/dagRuns?limit=1&order_by=-start_date' % dag_id
r = requests.get(latest_dag_run_url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
dag_runs = r.json()
dag_run_id = dag_runs["dag_runs"][0]["dag_run_id"]
utc_logical_date = dag_runs["dag_runs"][0]["logical_date"]
local_logical_date = get_local_logical_date(utc_logical_date)
error_ts_url = 'http://xxx/api/v1/dags/%s/dagRuns/%s/taskInstances?limit=1000&state=failed&state=upstream_failed' % (
dag_id, dag_run_id)
r = requests.get(error_ts_url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
task_instances = r.json()
task_instances_list = task_instances['task_instances']
error_tasks = []
for ts in task_instances_list:
task_id = ts['task_id']
if task_id in task_ids:
error_tasks.append(task_id)
if len(error_tasks) > 0:
message = 'Detect These tasks failed: %s. logical_date: %s' % (
','.join(error_tasks), local_logical_date)
send_alarm(message)
with DAG(
dag_id='airflow_dag_monitor',
description='airflow_dag_monitor for monitor other dags',
default_args={
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
'depends_on_past': False,
'on_failure_callback': task_failure_call_this
},
max_active_tasks=3,
schedule_interval='10 0 * * *',
start_date=datetime.datetime(2023, 6, 28, 17),
catchup=False,
tags=['bigdata'],
on_failure_callback=dag_failure_call_this,
user_defined_macros={
"target_datetime": target_datetime,
"cst_second": cst_second,
},
) as dag:
execute_hour_at_7 = DateTimeSensor(
task_id='execute_hour_at_7',
mode='reschedule',
poke_interval=300,
target_time="{{ target_datetime(logical_date,hour=7) }}",
)
execute_hour_at_8 = DateTimeSensor(
task_id='execute_hour_at_8',
mode='reschedule',
poke_interval=300,
target_time="{{ target_datetime(logical_date,hour=8) }}",
)
check_xxx_task_instance = PythonOperator(
task_id='check_xxx_task_instance',
python_callable=check_latest_task_instance_status,
provide_context=True,
params={
'monitored_task_ids': [
{
'dag_id': 'custom_dag_name',
'task_ids': ('task1', 'task2')
}
]
}
)
[execute_hour_at_7] >> check_xxx_task_instance
check_xxx_dag_run = PythonOperator(
task_id='check_xxx_dag_run',
python_callable=check_latest_dag_run_status,
op_kwargs={
'monitored_dag_ids': ['dag_poke_day']
},
)
[execute_hour_at_8] >> check_xxx_dag_run
- 首先 airflow_dag_monitor 它本身也是一个 dag,它的 task 运行失败了也要报警。所以需要
task_failure_call_this
和dag_failure_call_this
两个方法。
check_xxx_task_instance
在 7 点以后检查指定的task_ids,如果失败则报警。check_latest_task_instance_status
调用Airflow的接口查看task状态。
check_xxx_dag_run
表示在 8 点以后检查指定的monitored_dag_ids
列表里面的dag_id 的状态,如果失败则报警。check_latest_dag_run_status
调用 Airflow 的接口查看 dag 的运行情况。