已复制
全屏展示
复制代码

Airflow集群原理与实战部署


· 14 min read

一. 集群组件与工作流

当构建一个 Airflow 集群时,通常会包含以下几个核心组件,每个组件都承担了不同的角色和功能。

Database(数据库)

用于存储 Airflow 的TASK、DAG定义以及 TASK 的状态、依赖关系、调度时间等。常用的元数据数据库包括MySQL、PostgreSQL等。

Scheduler(调度器)

调度器是 Airflow 集群的核心组件之一,它负责定期读取并解析 DAG 文件、创建 DAG 对象并注册到 Database 中。负责根据预定义的调度规则和任务的依赖关系,决定任务的执行顺序和时间。它会定期查询Database(默认每秒一次),检查哪些任务需要执行,同时生成执行计划实例,将任务发送到消息队列中等待执行。

Executor(执行器)

执行器负责实际运行 Airflow 的任务。Airflow提供了多种 Executor 实现,如LocalExecutor(本地执行器)和 CeleryExecutor(基于Celery的执行器)。Executor 从消息队列中获取待执行的任务,并在可用的工作节点上启动相应的任务进程进行处理。

Worker(运行节点)

Worker 是执行任务的实际运行环境,在使用 CeleryExecutor 的集群中,Worker是在由 celery 启动一系列进程,这些进程负责在主机上启动执行任务。当执行器将任务发送到 Worker 时,运行节点会启动任务进程并提供必要的资源(如CPU、内存)来执行任务逻辑。

Web Server(Web服务器)

Web服务器提供了 Airflow 的用户界面,通过该界面可以查看和管理任务、调度规则、执行历史等。用户可以通过 Web 界面触发任务的手动运行、暂停或终止任务的执行。

Message Queue(消息队列)

消息队列用于在调度器和执行器之间传递任务信息。调度器将任务发送到消息队列,而执行器从消息队列中获取任务消息,并在工作节点上启动相应的任务进程进行处理,消息队列我们通常使用 Redis 。

Result Backend(结果后端)

result_backend 用于存储任务的执行结果或状态信息。当任务完成时,任务进程将结果或状态信息写入结果后端,以供 Scheduler 查询。常用的 result_backend 有 MySQL、PostgreSQL、Redis。

Airflow 集群工作流

  • 首先 Scheduler 定时(dag_dir_list_interval配置指定秒数)扫描 dags 目录下的 py 文件,解析 DAG 生成 DAG 对象以及 TASK 对象写入数据库中。
  • Scheduler 每秒钟从数据库读取DAG 和 task 的定义、状态、调度规则信息,判断是否执行,如需要执行,将生成 task instance 并发送到 消息队列。
  • Executor 从消息队列中获取任务消息,并指定 Worker 让其启动实际运行任务。
  • Worker 执行完任务后把执行结果放入 result backend 中。
  • Scheduler 定时去 result backend 获取任务的执行状态,并更新到数据库中。
  • Webserver 负责展示DAG task 的各种状态信息、历史数据,以及对DAG 和 task 的run、clear等常用操作。
  • 各个组件的交互逻辑,这里 Queue broker相当于 Redis 消息队列。
  • Scheduler 与 Executor 与 Worker 的 执行任务的详细流程

二. 集群部署

2.1 环境准备

确定安装目录为 /opt/airflow,所有相关安装都在该目录下,即AIRFLOW_HOME,修改目录权限为普通用户。

sudo mkdir /opt/airflow
sudo chown work.work /opt/airflow

本文在两个节点上部署,操作系统 CentOS7, Python版本3.8, Airflow 版本 2.6.2,节点规划如下:

node1 node2 备注
airflow-webserver(master) airflow-webserver(backup) 均启动,通过 nginx 配置主备方式,nginx 检查到有问题时自动切换到 backup 上
airflow-scheduler(master) airflow-scheduler(backup) 同时只允许一个 airflow-scheduler 存活(防止重复生成任务),由 keepalived 控制启停
airflow-celery-worker airflow-celery-worker 均启动,执行 Executor 推送的任务
airflow-celery-flower airflow-celery-flower 均启动,celery 的可视化界面
keepalived(master) keepalived(backup) 均启动,用于管理 scheduler 的运行
MySQL node1启动:用作后端数据库、celery 的 result backend
Redis node1启动:用作celery的消息队列
Python环境

两个节点 node1 node2 安装Python环境,Airflow 2.6.2 已经不支持Python3.6了,需要安装Python3.8。由于 CentOS7 没有 Python3.8,所以需要额外安装,安装后配置 Python 虚拟环境。

# 安装 yum 仓库
$ sudo yum install centos-release-scl

# 安装 Python3.8
$ sudo yum install rh-python38 rh-python38-python-devel

# 创建 python3.8 命令软连接
$ sudo ln -s /opt/rh/rh-python38/root/usr/bin/python3.8 /usr/local/bin/python3.8
$ sudo ln -s /opt/rh/rh-python38/root/usr/bin/pip3 /usr/local/bin/pip3

$ python3.8 --version
Python 3.8.13

# 切换到 root 用户安装 virtualenv(需要root权限)
sudo su -
pip3 install virtualenv
ln -s /opt/rh/rh-python38/root/usr/local/bin/virtualenv /usr/local/bin/virtualenv

# 安装好 virtualenv 后切换到普通用户 配置虚拟环境
$ virtualenv -p /usr/local/bin/python3.8 /opt/airflow/airflow_env
$ source /opt/airflow/airflow_env/bin/activate
Redis准备

在 node1 上安装Redis,安装过程略,确保有一个 redis 即可。

(airflow_env) [work@node1 opt]$ redis-cli -h node1
node1:6379> 
MySQL准备

在 node1 上安装MySQL,安装过程略,确保有一个 MySQL 即可,但是 python3-devel mysql-devel,这两个包必须安装,不安装的话,后面没法安装 MySQL 的provider。

sudo yum install mysql-devel

创建数据库及权限。

mysql -uroot -hnode1 -p123456MySQL_

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
set global validate_password_policy=0;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';


-- 确保能用新的用户名密码登录成功
mysql -uairflow_user -hnode1 -pairflow_pass
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MySQL connection id is 7456
Server version: 5.7.28 MySQL Community Server (GPL)
Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MySQL [(none)]>

2.2 安装 Airflow

分别在两个节点 node1 node2 安装Airflow 以及相关的 Python包:

# 确保 Python 环境正确
source /opt/airflow/airflow_env/bin/activate
pip -V


# 注意:今后要安装 provider 的时候,需要在两个节点都安装,安装步骤也一样
export AIRFLOW_HOME=/opt/airflow
export AIRFLOW_VERSION=2.6.2
export PYTHON_VERSION=3.8
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.6.2/constraints-3.8.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

pip install mysql-connector-python --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[mysql] --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[redis] --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[celery] --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[statsd] --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[datadog] --constraint "${CONSTRAINT_URL}"


# 运行如下命令会在 AIRFLOW_HOME 生成默认配置,这个命令报错也没关系,后续修改配置。
airflow --help


# 添加 AIRFLOW_HOME 到全配置
sudo vim /etc/profile
export AIRFLOW_HOME=/opt/airflow

source /etc/profile

2.3 配置 Airflow

注意:为了保证配置文件的一致性,修改的配置统一在 node1 上进行,包括后续的 dag 文件,也在 node1 上修改,然后同步 到node2 上。

  • /opt/airflow/airflow.cfg
[core]
sql_alchemy_conn = mysql+mysqlconnector://airflow_user:airflow_pass@node1:3306/airflow_db
load_examples = False
executor = CeleryExecutor
default_timezone = Asia/Shanghai
parallelism = 64


[metrics]
statsd_on = True
statsd_port = 8125
statsd_datadog_enabled = True


[api]
auth_backends = airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth


[webserver]
default_ui_timezone = Asia/Shanghai
web_server_port = 9099
base_url = http://localhost:9099


[celery]
worker_concurrency = 16
broker_url = redis://10.10.10.91:6379/1
result_backend = db+mysql://airflow_user:airflow_pass@node1:3306/airflow_db
flower_url_prefix = /flower


[cli]
endpoint_url = http://localhost:9099


[scheduler]
dag_dir_list_interval = 120
max_dagruns_to_create_per_loop = 1024
max_dagruns_per_loop_to_schedule = 1024
  • 修改默认的 Webserver 端口为 9099 的目的是服务器上的 8080 已经被占用了
  • statsd_on 的作用:用于将 airflow 的监控指标暴露出来,这里 Airflow 将 metrics 发送到 localhost:8125,然后 statsd_exporter 接收 localhost:8125 的 statsd 格式的数据,转换成 prometheus 的数据,然后 statsd_exporter 暴露出 9102 端口,供 prometheus 来抓取,最后一步 grafana 接入 prometheus  展示监控指标。

2.4 配置 Scheduler HA

一个 Airflow 集群的 Scheduler 只允许在一个节点上运行,为了保证高可用,我们利用 keepalived 的 master backup 选举机制,来确保同时只运行一个 Scheduler,更详细的内容见 https://yuchaoshui.com/bec06432c9755c3c821833caccd714b0/

  • 两个节点 node1 node2 安装 keepalived,并创建管理脚本,注意:脚本名不要包含 scheduler 关键字,因为后面要通过 ps 命令判断 scheduler 是否存活。
mkdir /opt/airflow/airflow_ha
sudo yum install -y keepalived
sudo systemctl enable keepalived


# 管理脚本 1
vim /opt/airflow/airflow_ha/schdler_start.sh

#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow/
nohup airflow scheduler 1>>/opt/airflow/logs/schduler.log 2>&1 &


# 管理脚本 2
vim /opt/airflow/airflow_ha/schdler_manager.sh

#!/bin/bash
echo "$(date) current state is $1" >> /opt/airflow/airflow_ha/check.log
# $1 表示当前节点被选举的角色:MASTER BACKUP STOP
# 在启动服务时,确保没有已经在运行着的服务
if [[ "$1" == "MASTER" ]];then
    num=$(ps aux | grep airflow | grep scheduler | grep -v grep | wc -l)
    if [[ "$num" == "0" ]];then
        bash /opt/airflow/airflow_ha/schdler_start.sh
    fi
elif [[ "$1" == "BACKUP" || "$1" == "STOP" ]];then
    ps aux | grep airflow | grep scheduler | awk '{print $2}' | xargs kill -9 2>/dev/null || true
else
    echo "arg \$1 must be one of these: MASTER BACKUP STOP"
fi
keepalived 主备配置
  • node1 配置 /etc/keepalived/keepalived.conf,记得修改 interface
global_defs {
    script_user root # 运行检查脚本的linux用户
}

vrrp_script chk_script_health {
    # 执行检查脚本: 查看日状态判断当前节点状态(MASTER、BACKUP)
    script "/bin/bash /opt/airflow/airflow_ha/schdler_manager.sh $(journalctl -u keepalived | grep Entering | grep -v 'Unsafe permissions' | tail -1 | awk '{print $8}')"
    interval 30 # 每隔多少秒进行检查一次
}

vrrp_instance VI_1 {
    virtual_router_id 53 # 一个标识, MASTER 和 BACKUP 必须一致
    advert_int 5 # 多久进行一次master选举,即健康检查时间间隔
    interface ens33 # 必须是一个真实存储的网络接口

    # 调用检查脚本
    track_script {
      chk_script_health
    }

    # 认证权限密码,防止非法节点进入
    authentication {
        auth_type PASS
        auth_pass 1111
    }

    # 停止keeplived进程,则应用程序也要停止,避免两个节点同时运行你的程序
    notify_stop "/bin/bash /opt/airflow/airflow_ha/schdler_manager.sh STOP"

    # -----------分割线以上可以完全相同,分割线一下必须不同-----------

    state MASTER # 表明当前节点类型: BACKUP 或者 MASTER
    priority 101 # BACKUP 的值必须小于 MASTER 的值

    # 用 vrrp 单播方式通告
    unicast_src_ip 10.10.10.91
    unicast_peer {
        10.10.10.92
    }
}
  • node2 配置 /etc/keepalived/keepalived.conf,记得修改 interface
global_defs {
    script_user root # 运行检查脚本的linux用户
}

vrrp_script chk_script_health {
    # 执行检查脚本: 查看日状态判断当前节点状态(MASTER、BACKUP)
    script "/bin/bash /opt/airflow/airflow_ha/schdler_manager.sh $(journalctl -u keepalived | grep Entering | grep -v 'Unsafe permissions' | tail -1 | awk '{print $8}')"
    interval 30 # 每隔多少秒进行检查一次
}

vrrp_instance VI_1 {
    virtual_router_id 53 # 一个标识, MASTER 和 BACKUP 必须一致
    advert_int 5 # 多久进行一次master选举,即健康检查时间间隔
    interface ens33 # 必须是一个真实存储的网络接口

    # 调用检查脚本
    track_script {
      chk_script_health
    }

    # 认证权限密码,防止非法节点进入
    authentication {
        auth_type PASS
        auth_pass 1111
    }

    # 停止keeplived进程,则应用程序也要停止,避免两个节点同时运行你的程序
    notify_stop "/bin/bash /opt/airflow/airflow_ha/schdler_manager.sh STOP"

    # -----------分割线以上可以完全相同,分割线一下必须不同-----------

    state BACKUP # 表明当前节点类型: BACKUP 或者 MASTER
    priority 100 # BACKUP 的值必须小于 MASTER 的值

    # 用 vrrp 单播方式通告
    unicast_src_ip 10.10.10.92
    unicast_peer {
        10.10.10.91
    }
}

2.5 同步配置

# 每次修改配置后都同步一下
scp /opt/airflow/airflow.cfg work@node2:/opt/airflow/airflow.cfg

2.6 启动集群

初始集群
  • 首次安装需要初始、创建用户,在 node1 上初始化即可。
source /opt/airflow/airflow_env/bin/activate
airflow db init

airflow users create \
    --username flow1 \
    --role Admin \
    --password flow1 \
    --firstname first_flow1 \
    --lastname last_flow1 \
    --email example@163.com

启动集群
# 每个节点启动 4 个服务,可以使用服务管理工具比如 supervisor 来统一管理
# 这里我会开 4 个终端分别前台启动,按照下面顺序启动。
airflow webserver
airflow celery worker
airflow celery flower


# 启动1 /opt/airflow/start1.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
nohup airflow webserver >>/opt/airflow/logs/webserver.log 2>&1 &


# 启动2 /opt/airflow/start2.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
nohup airflow celery worker >>/opt/airflow/logs/celery_worker.log 2>&1 &


# 启动3 /opt/airflow/start3.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
nohup airflow celery flower >>/opt/airflow/logs/celery_flower.log 2>&1 &


# 启动4 keepalived 会管理 Scheduler 的启动
sudo systemctl restart keepalived
sudo systemctl status keepalived
  • 启动后 Scheduler 会报如下警告,那是因为还没有安装 statsd_exporter
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2023-06-29T23:02:35.763+0800] {executor_loader.py:114} INFO - Loaded executor: CeleryExecutor
[2023-06-29T23:02:35.766+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
[2023-06-29T23:02:35.782+0800] {scheduler_job_runner.py:788} INFO - Starting the scheduler
[2023-06-29T23:02:35.782+0800] {scheduler_job_runner.py:795} INFO - Processing each file at most -1 times
[2023-06-29T23:02:35.785+0800] {manager.py:165} INFO - Launched DagFileProcessorManager with pid: 50613
[2023-06-29T23:02:35.786+0800] {scheduler_job_runner.py:1553} INFO - Resetting orphaned tasks for active dag runs
[2023-06-29T23:02:35.789+0800] {settings.py:60} INFO - Configured default timezone Timezone('Asia/Shanghai')
[2023-06-29T23:02:35.824+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
[2023-06-29T23:02:35.922+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
[2023-06-29T23:02:35.930+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
[2023-06-29T23:02:35.931+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket

2.7 安装 statsd_exporter

  • 在两个节点 node1 noede2 上安装,下面的命令启动了 8125 端口接收 Scheduler 发送的数据,处理收到的数据后,又开启了监听端口 9102,用于把数据开放给 prometheus 来抓取,最后可以 还可以配置 grafana 来展示 prometheus 的数据,这是一个完整的闭合链路(prometheus、grafana 的安装略)。
wget https://github.com/prometheus/statsd_exporter/releases/download/v0.22.5/statsd_exporter-0.22.5.linux-amd64.tar.gz
tar -zxf statsd_exporter-0.22.5.linux-amd64.tar.gz -C /opt/airflow/


# 启动脚本4 /opt/airflow/start4.sh
#!/bin/bash
cd /opt/airflow/statsd_exporter-0.22.5.linux-amd64/
nohup ./statsd_exporter --statsd.listen-udp localhost:8125 --web.listen-address=":9102" >>./statsd_exporter.log 2>&1 &

2.8 配置 NGINX

nginx 配置 node1 为master,只有 node1 故障以后,才会使用 node2 的 Webserver

  • /etc/nginx/conf.d/airflow.conf
upstream airflow_servers {
  server node1:9099;
  server node2:9099 backup;
}

server {
    listen 80;
    location / {
        proxy_pass http://airflow_servers;
    }
}

2.8 测试 DAG

  • vim /opt/airflow/dags/dag1.py,在 node1 创建 dag。
  • scp /opt/airflow/dags/dag1.py node2:/opt/airflow/dags/,同步到 node2 上。
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

default_args = {
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=1),
    'depends_on_past': False,
}

with DAG(
        dag_id='dag1',
        schedule_interval="* * * * *",
        start_date=datetime.datetime(2023, 6, 28, 17),
        catchup=False,
        tags=['bigdata'],
        default_args=default_args,
) as dag:
    task1 = PythonOperator(task_id='task1', python_callable=lambda: print("task11111"))

三. 集群扩张

如果任务太多,celery的Worker计算不过了,可以随意的增加Worker的数量来横向扩展。集群扩展其实主要是扩展 celery,包括:airflow-celery-worker、airflow-celery-flower,在新的节点只需要启动这两个服务即可,大概步骤如下:

  • 在新节点上,按本文的照步骤安装 airflow 相关的所有包,同本文前面 安装airflow 的步骤。
  • 同步旧节点上的 /opt/airflow/airflow.cfg 文件到新节点 /opt/airflow/airflow.cfg
  • 新节点启动 celery:airflow-celery-worker、 airflow-celery-flower

四. 常用接口

health

  • http://node1:9099/health 它返回如下两个组件的状态:
  • scheduler: 根据连接的数据库的最新运行的job来判断的,所以说,如果你的集群使用了同一个数据库,那么这个健康状态检查无论检查 node1,还是检查 node2 肯定都是健康的。
  • metadatabase:只要检查状态的过程不出错,那么这个值就是healthy的,通常也是健康的。

celery

webserver

🔗

文章推荐