已复制
全屏展示
复制代码

Airflow清理logs日志

· 2 min read

场景概述

通常情况下,我们不会让Airflow保存实际运行的任务的日志的,我们会通过 SSHOperator登录到远程执行代码,然后 把日志打印到 那台机器上,比如看下面的示例:

task1 = SSHOperator(
    ssh_conn_id='datanode01',
    task_id='task1',
    depends_on_past=False,
    command="bash /path/program.sh >>/path/program.log 2>&1 "
)

此时Airflow产生的日志就只有它自己的了,尽管只有Airflow自己产生的日志,但是还是非常多的,假设有一个分钟级的DAG,它里面有10个task,这个DAG每分钟就会产生 10 个目录,每个小时600个目录,一天就会产生 600 * 24 =  14400个目录,这些目录还是挺多了,如果你的task很多,那就更不用说了。

清理脚本

使用 Python 脚本清理日志base_log_folder = '/opt/airflow/logs',你可以加crontab定时任务,或者创建一个专门清理的 dag

  • clean_airflow_logs.py   Python 清理脚本
import datetime
import os
import re
import shutil
import sys


def traverse_and_unlink(file_or_dir, retain_seconds, rm):
    for entry in os.scandir(file_or_dir):
        new_file_or_dir = os.path.join(file_or_dir, entry)
        last_ts = os.stat(new_file_or_dir).st_mtime
        last_str = datetime.datetime.fromtimestamp(last_ts).strftime("%Y-%m-%d %H:%M:%S")
        delta_seconds = datetime.datetime.now().timestamp() - last_ts
        if os.path.isfile(new_file_or_dir):
            if delta_seconds > retain_seconds:
                if rm:
                    print(f"remove file {new_file_or_dir}")
                    os.remove(new_file_or_dir)
                else:
                    print(f"last modified time is {last_str} for file {new_file_or_dir}")
        elif os.path.isdir(new_file_or_dir):
            # remove task instance dir directly
            pattern1 = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}$'
            pattern2 = r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+\+\d{2}:\d{2}$'
            if re.search(pattern1, new_file_or_dir) or re.search(pattern2, new_file_or_dir):
                if delta_seconds > retain_seconds:
                    if rm:
                        print(f"remove dir {new_file_or_dir}")
                        shutil.rmtree(new_file_or_dir)
                    else:
                        print(f"last modified time is {last_str} for dir {new_file_or_dir}")
            else:
                traverse_and_unlink(new_file_or_dir, retain_seconds, rm)


def clean_airflow_logs(rm):
    retain_seconds = 3600 * 24 * 30
    base_log_folder = '/opt/airflow/logs'
    traverse_and_unlink(base_log_folder, retain_seconds, rm)


if __name__ == '__main__':
    remove = False
    if len(sys.argv) == 2 and sys.argv[1] == 'remove':
        remove = True
    clean_airflow_logs(remove)
    
🔗

文章推荐