当Airflow运行很久后(或者你有分钟级别的dag),你会发现无法打开 task instances页面,那是因为 task instances 数量太多了,需要手动删除过去无用的task instances,Airflow提供了api来实现,将其加入定时任务每天删除就行,最后不要手动操作数据库。
主要用到两个接口:
- 接口1:获取一批dag_runs,获取是设置start_date_lte参数表示start_date时间小于此时间的都可以删除
- 接口2:删除这批dag_runs
下面是完整代码:
# This script is used to delete these task instances (dag_runs).
import requests
import datetime
airflow_address = 'http://domain/api/v1/dags'
# curl get the oldest instances
# curl -X GET 'http://domain/api/v1/dags/your_dag_id/dagRuns?limit=2&order_by=start_date' --user "admin:xxx"
AIRFLOW_HEADERS = {'Accept': 'application/json'}
AIRFLOW_AUTH = ("admin", "xxx") # username and password
def delete_dag_runs(dag_id, dag_runs):
print("dag runs number ready to be deleted: " + str(len(dag_runs)))
for dag_run_id in dag_runs:
print("deleting " + dag_run_id + " of " + dag_id)
dag_delete_url = f'{airflow_address}/{dag_id}/dagRuns/{dag_run_id}'
requests.delete(dag_delete_url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
# dag_id: dag_id
# days: keep the most recent N days of dag_runs.
def get_dag_runs(dag_id: str, days=15):
limit = 100 # only take 100 at a time.
max_start_date = (datetime.datetime.now() - datetime.timedelta(days=days - 1)).strftime("%Y-%m-%dT00:00:00+00:00")
print("max_start_date: " + max_start_date)
dag_runs_url = f'{airflow_address}/{dag_id}/dagRuns?limit={limit}&order_by=start_date&start_date_lte={max_start_date}'
continue_flag = True
while continue_flag:
r = requests.get(dag_runs_url, auth=AIRFLOW_AUTH, headers=AIRFLOW_HEADERS)
dag_runs_id_list = []
if r.status_code == 200:
dag_runs = r.json()["dag_runs"]
for dag_run in dag_runs:
dag_runs_id_list.append(dag_run["dag_run_id"])
if len(dag_runs_id_list) < limit:
continue_flag = False
delete_dag_runs(dag_id, dag_runs_id_list)
waited_to_be_deleted_list = [
('your_dag_id1', 15),
('your_dag_id2', 15),
]
for dag_id, days in waited_to_be_deleted_list:
get_dag_runs(dag_id, days=days)