场景概述
通常情况下,我们不会让Airflow保存实际运行的任务的日志的,我们会通过 SSHOperator
登录到远程执行代码,然后 把日志打印到 那台机器上,比如看下面的示例:
task1 = SSHOperator(
ssh_conn_id='datanode01',
task_id='task1',
depends_on_past=False,
command="bash /path/program.sh >>/path/program.log 2>&1 "
)
此时Airflow产生的日志就只有它自己的了,尽管只有Airflow自己产生的日志,但是还是非常多的,假设有一个分钟级的DAG,它里面有10个task,这个DAG每分钟就会产生 10 个目录,每个小时600个目录,一天就会产生 600 * 24 = 14400
个目录,这些目录还是挺多了,如果你的task很多,那就更不用说了。
清理脚本
使用 Python 脚本清理日志base_log_folder = '/opt/airflow/logs'
,你可以加crontab定时任务,或者创建一个专门清理的 dag
- clean_airflow_logs.py Python 清理脚本
import datetime
import os
import re
import shutil
import sys
def traverse_and_unlink(file_or_dir, retain_seconds, rm):
for entry in os.scandir(file_or_dir):
new_file_or_dir = os.path.join(file_or_dir, entry)
last_ts = os.stat(new_file_or_dir).st_mtime
last_str = datetime.datetime.fromtimestamp(last_ts).strftime("%Y-%m-%d %H:%M:%S")
delta_seconds = datetime.datetime.now().timestamp() - last_ts
if os.path.isfile(new_file_or_dir):
if delta_seconds > retain_seconds:
if rm:
print(f"remove file {new_file_or_dir}")
os.remove(new_file_or_dir)
else:
print(f"last modified time is {last_str} for file {new_file_or_dir}")
elif os.path.isdir(new_file_or_dir):
# remove task instance dir directly
pattern1 = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}$'
pattern2 = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2}$'
if re.search(pattern1, new_file_or_dir) or re.search(pattern2, new_file_or_dir):
if delta_seconds > retain_seconds:
if rm:
print(f"remove dir {new_file_or_dir}")
shutil.rmtree(new_file_or_dir)
else:
print(f"last modified time is {last_str} for dir {new_file_or_dir}")
else:
traverse_and_unlink(new_file_or_dir, retain_seconds, rm)
def clean_airflow_logs(rm):
retain_seconds = 3600 * 24 * 30
base_log_folder = '/opt/airflow/logs'
traverse_and_unlink(base_log_folder, retain_seconds, rm)
if __name__ == '__main__':
remove = False
if len(sys.argv) == 2 and sys.argv[1] == 'remove':
remove = True
clean_airflow_logs(remove)