已复制
全屏展示
复制代码

Airflow删除taskinstance实例

· 1 min read

当Airflow运行很久后(或者你有分钟级别的dag),你会发现无法打开 task instances页面,那是因为 task instances 数量太多了,需要手动删除过去无用的task instances,Airflow提供了api来实现,将其加入定时任务每天删除就行,最后不要手动操作数据库。

主要用到两个接口:

  • 接口1:获取一批dag_runs,获取是设置start_date_lte参数表示start_date时间小于此时间的都可以删除
  • 接口2:删除这批dag_runs

下面是完整代码:

# This script is used to delete these task instances (dag_runs).
import requests
import datetime

airflow_address = 'http://domain/api/v1/dags'

# curl get the oldest instances
# curl -X GET 'http://domain/api/v1/dags/your_dag_id/dagRuns?limit=2&order_by=start_date' --user "admin:xxx"

AIRFLOW_HEADERS = {'Accept': 'application/json'}
AIRFLOW_AUTH = ("admin", "xxx")  # username and password


def delete_dag_runs(dag_id, dag_runs):
    print("dag runs number ready to be deleted: " + str(len(dag_runs)))
    for dag_run_id in dag_runs:
        print("deleting " + dag_run_id + " of " + dag_id)
        dag_delete_url = f'{airflow_address}/{dag_id}/dagRuns/{dag_run_id}'
        requests.delete(dag_delete_url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)


# dag_id: dag_id
# days: keep the most recent N days of dag_runs.
def get_dag_runs(dag_id: str, days=15):
    limit = 100  # only take 100 at a time.
    max_start_date = (datetime.datetime.now() - datetime.timedelta(days=days - 1)).strftime("%Y-%m-%dT00:00:00+00:00")
    print("max_start_date: " + max_start_date)
    dag_runs_url = f'{airflow_address}/{dag_id}/dagRuns?limit={limit}&order_by=start_date&start_date_lte={max_start_date}'
    continue_flag = True
    while continue_flag:
        r = requests.get(dag_runs_url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
        dag_runs_id_list = []
        if r.status_code == 200:
            dag_runs = r.json()["dag_runs"]
            for dag_run in dag_runs:
                dag_runs_id_list.append(dag_run["dag_run_id"])
        if len(dag_runs_id_list) < limit:
            continue_flag = False
        delete_dag_runs(dag_id, dag_runs_id_list)


waited_to_be_deleted_list = [
    ('your_dag_id1', 15),
    ('your_dag_id2', 15),
]

for dag_id, days in waited_to_be_deleted_list:
    get_dag_runs(dag_id, days=days)
🔗

文章推荐