Airflow集群原理与实战部署
一. 集群组件与工作流
当构建一个 Airflow 集群时,通常会包含以下几个核心组件,每个组件都承担了不同的角色和功能。
Database(数据库)
用于存储 Airflow 的TASK、DAG定义以及 TASK 的状态、依赖关系、调度时间等。常用的元数据数据库包括MySQL、PostgreSQL等。
Scheduler(调度器)
调度器是 Airflow 集群的核心组件之一,它负责定期读取并解析 DAG 文件、创建 DAG 对象并注册到 Database 中。负责根据预定义的调度规则和任务的依赖关系,决定任务的执行顺序和时间。它会定期查询Database(默认每秒一次),检查哪些任务需要执行,同时生成执行计划实例,将任务发送到消息队列中等待执行。
Executor(执行器)
执行器负责实际运行 Airflow 的任务。Airflow提供了多种 Executor 实现,如LocalExecutor(本地执行器)和 CeleryExecutor(基于Celery的执行器)。Executor 从消息队列中获取待执行的任务,并在可用的工作节点上启动相应的任务进程进行处理。
Worker(运行节点)
Worker 是执行任务的实际运行环境,在使用 CeleryExecutor 的集群中,Worker是在由 celery 启动一系列进程,这些进程负责在主机上启动执行任务。当执行器将任务发送到 Worker 时,运行节点会启动任务进程并提供必要的资源(如CPU、内存)来执行任务逻辑。
Web Server(Web服务器)
Web服务器提供了 Airflow 的用户界面,通过该界面可以查看和管理任务、调度规则、执行历史等。用户可以通过 Web 界面触发任务的手动运行、暂停或终止任务的执行。
Message Queue(消息队列)
消息队列用于在调度器和执行器之间传递任务信息。调度器将任务发送到消息队列,而执行器从消息队列中获取任务消息,并在工作节点上启动相应的任务进程进行处理,消息队列我们通常使用 Redis 。
Result Backend(结果后端)
result_backend 用于存储任务的执行结果或状态信息。当任务完成时,任务进程将结果或状态信息写入结果后端,以供 Scheduler 查询。常用的 result_backend 有 MySQL、PostgreSQL、Redis。
Airflow 集群工作流
- 首先 Scheduler 定时(dag_dir_list_interval配置指定秒数)扫描 dags 目录下的 py 文件,解析 DAG 生成 DAG 对象以及 TASK 对象写入数据库中。
- Scheduler 每秒钟从数据库读取DAG 和 task 的定义、状态、调度规则信息,判断是否执行,如需要执行,将生成 task instance 并发送到 消息队列。
- Executor 从消息队列中获取任务消息,并指定 Worker 让其启动实际运行任务。
- Worker 执行完任务后把执行结果放入 result backend 中。
- Scheduler 定时去 result backend 获取任务的执行状态,并更新到数据库中。
- Webserver 负责展示DAG task 的各种状态信息、历史数据,以及对DAG 和 task 的run、clear等常用操作。
- 各个组件的交互逻辑,这里 Queue broker相当于 Redis 消息队列。
- Scheduler 与 Executor 与 Worker 的 执行任务的详细流程
二. 集群部署
2.1 环境准备
确定安装目录为 /opt/airflow
,所有相关安装都在该目录下,即AIRFLOW_HOME
,修改目录权限为普通用户。
sudo mkdir /opt/airflow
sudo chown work.work /opt/airflow
本文在两个节点上部署,操作系统 CentOS7, Python版本3.8, Airflow 版本 2.6.2,节点规划如下:
node1 | node2 | 备注 |
---|---|---|
airflow-webserver(master) | airflow-webserver(backup) | 均启动,通过 nginx 配置主备方式,nginx 检查到有问题时自动切换到 backup 上 |
airflow-scheduler(master) | airflow-scheduler(backup) | 同时只允许一个 airflow-scheduler 存活(防止重复生成任务),由 keepalived 控制启停 |
airflow-celery-worker | airflow-celery-worker | 均启动,执行 Executor 推送的任务 |
airflow-celery-flower | airflow-celery-flower | 均启动,celery 的可视化界面 |
keepalived(master) | keepalived(backup) | 均启动,用于管理 scheduler 的运行 |
MySQL | node1启动:用作后端数据库、celery 的 result backend | |
Redis | node1启动:用作celery的消息队列 |
Python环境
两个节点 node1 node2 安装Python环境,Airflow 2.6.2 已经不支持Python3.6了,需要安装Python3.8。由于 CentOS7 没有 Python3.8,所以需要额外安装,安装后配置 Python 虚拟环境。
# 安装 yum 仓库
$ sudo yum install centos-release-scl
# 安装 Python3.8
$ sudo yum install rh-python38 rh-python38-python-devel
# 创建 python3.8 命令软连接
$ sudo ln -s /opt/rh/rh-python38/root/usr/bin/python3.8 /usr/local/bin/python3.8
$ sudo ln -s /opt/rh/rh-python38/root/usr/bin/pip3 /usr/local/bin/pip3
$ python3.8 --version
Python 3.8.13
# 切换到 root 用户安装 virtualenv(需要root权限)
sudo su -
pip3 install virtualenv
ln -s /opt/rh/rh-python38/root/usr/local/bin/virtualenv /usr/local/bin/virtualenv
# 安装好 virtualenv 后切换到普通用户 配置虚拟环境
$ virtualenv -p /usr/local/bin/python3.8 /opt/airflow/airflow_env
$ source /opt/airflow/airflow_env/bin/activate
Redis准备
在 node1 上安装Redis,安装过程略,确保有一个 redis 即可。
(airflow_env) [work@node1 opt]$ redis-cli -h node1
node1:6379>
MySQL准备
在 node1 上安装MySQL,安装过程略,确保有一个 MySQL 即可,但是 python3-devel mysql-devel
,这两个包必须安装,不安装的话,后面没法安装 MySQL 的provider。
sudo yum install mysql-devel
创建数据库及权限。
mysql -uroot -hnode1 -p123456MySQL_
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
set global validate_password_policy=0;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
-- 确保能用新的用户名密码登录成功
mysql -uairflow_user -hnode1 -pairflow_pass
Welcome to the MariaDB monitor. Commands end with ; or \g.
Your MySQL connection id is 7456
Server version: 5.7.28 MySQL Community Server (GPL)
Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
MySQL [(none)]>
2.2 安装 Airflow
分别在两个节点 node1 node2 安装Airflow 以及相关的 Python包:
# 确保 Python 环境正确
source /opt/airflow/airflow_env/bin/activate
pip -V
# 注意:今后要安装 provider 的时候,需要在两个节点都安装,安装步骤也一样
export AIRFLOW_HOME=/opt/airflow
export AIRFLOW_VERSION=2.6.2
export PYTHON_VERSION=3.8
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.6.2/constraints-3.8.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install mysql-connector-python --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[mysql] --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[redis] --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[celery] --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[statsd] --constraint "${CONSTRAINT_URL}"
pip install apache-airflow[datadog] --constraint "${CONSTRAINT_URL}"
# 运行如下命令会在 AIRFLOW_HOME 生成默认配置,这个命令报错也没关系,后续修改配置。
airflow --help
# 添加 AIRFLOW_HOME 到全配置
sudo vim /etc/profile
export AIRFLOW_HOME=/opt/airflow
source /etc/profile
2.3 配置 Airflow
注意:为了保证配置文件的一致性,修改的配置统一在 node1 上进行,包括后续的 dag 文件,也在 node1 上修改,然后同步 到node2 上。
- /opt/airflow/airflow.cfg
[core]
sql_alchemy_conn = mysql+mysqlconnector://airflow_user:airflow_pass@node1:3306/airflow_db
load_examples = False
executor = CeleryExecutor
default_timezone = Asia/Shanghai
parallelism = 64
[metrics]
statsd_on = True
statsd_port = 8125
statsd_datadog_enabled = True
[api]
auth_backends = airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth
[webserver]
default_ui_timezone = Asia/Shanghai
web_server_port = 9099
base_url = http://localhost:9099
[celery]
worker_concurrency = 16
broker_url = redis://10.10.10.91:6379/1
result_backend = db+mysql://airflow_user:airflow_pass@node1:3306/airflow_db
flower_url_prefix = /flower
[cli]
endpoint_url = http://localhost:9099
[scheduler]
dag_dir_list_interval = 120
max_dagruns_to_create_per_loop = 1024
max_dagruns_per_loop_to_schedule = 1024
- 修改默认的 Webserver 端口为 9099 的目的是服务器上的 8080 已经被占用了
- statsd_on 的作用:用于将 airflow 的监控指标暴露出来,这里 Airflow 将 metrics 发送到 localhost:8125,然后 statsd_exporter 接收 localhost:8125 的 statsd 格式的数据,转换成 prometheus 的数据,然后 statsd_exporter 暴露出 9102 端口,供 prometheus 来抓取,最后一步 grafana 接入 prometheus 展示监控指标。
2.4 配置 Scheduler HA
一个 Airflow 集群的 Scheduler 只允许在一个节点上运行,为了保证高可用,我们利用 keepalived 的 master backup 选举机制,来确保同时只运行一个 Scheduler,更详细的内容见 https://yuchaoshui.com/bec06432c9755c3c821833caccd714b0/
- 两个节点 node1 node2 安装 keepalived,并创建管理脚本,注意:脚本名不要包含 scheduler 关键字,因为后面要通过 ps 命令判断 scheduler 是否存活。
mkdir /opt/airflow/airflow_ha
sudo yum install -y keepalived
sudo systemctl enable keepalived
# 管理脚本 1
vim /opt/airflow/airflow_ha/schdler_start.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow/
nohup airflow scheduler 1>>/opt/airflow/logs/schduler.log 2>&1 &
# 管理脚本 2
vim /opt/airflow/airflow_ha/schdler_manager.sh
#!/bin/bash
echo "$(date) current state is $1" >> /opt/airflow/airflow_ha/check.log
# $1 表示当前节点被选举的角色:MASTER BACKUP STOP
# 在启动服务时,确保没有已经在运行着的服务
if [[ "$1" == "MASTER" ]];then
num=$(ps aux | grep airflow | grep scheduler | grep -v grep | wc -l)
if [[ "$num" == "0" ]];then
bash /opt/airflow/airflow_ha/schdler_start.sh
fi
elif [[ "$1" == "BACKUP" || "$1" == "STOP" ]];then
ps aux | grep airflow | grep scheduler | awk '{print $2}' | xargs kill -9 2>/dev/null || true
else
echo "arg \$1 must be one of these: MASTER BACKUP STOP"
fi
keepalived 主备配置
- node1 配置 /etc/keepalived/keepalived.conf,记得修改 interface
global_defs {
script_user root # 运行检查脚本的linux用户
}
vrrp_script chk_script_health {
# 执行检查脚本: 查看日状态判断当前节点状态(MASTER、BACKUP)
script "/bin/bash /opt/airflow/airflow_ha/schdler_manager.sh $(journalctl -u keepalived | grep Entering | grep -v 'Unsafe permissions' | tail -1 | awk '{print $8}')"
interval 30 # 每隔多少秒进行检查一次
}
vrrp_instance VI_1 {
virtual_router_id 53 # 一个标识, MASTER 和 BACKUP 必须一致
advert_int 5 # 多久进行一次master选举,即健康检查时间间隔
interface ens33 # 必须是一个真实存储的网络接口
# 调用检查脚本
track_script {
chk_script_health
}
# 认证权限密码,防止非法节点进入
authentication {
auth_type PASS
auth_pass 1111
}
# 停止keeplived进程,则应用程序也要停止,避免两个节点同时运行你的程序
notify_stop "/bin/bash /opt/airflow/airflow_ha/schdler_manager.sh STOP"
# -----------分割线以上可以完全相同,分割线一下必须不同-----------
state MASTER # 表明当前节点类型: BACKUP 或者 MASTER
priority 101 # BACKUP 的值必须小于 MASTER 的值
# 用 vrrp 单播方式通告
unicast_src_ip 10.10.10.91
unicast_peer {
10.10.10.92
}
}
- node2 配置 /etc/keepalived/keepalived.conf,记得修改 interface
global_defs {
script_user root # 运行检查脚本的linux用户
}
vrrp_script chk_script_health {
# 执行检查脚本: 查看日状态判断当前节点状态(MASTER、BACKUP)
script "/bin/bash /opt/airflow/airflow_ha/schdler_manager.sh $(journalctl -u keepalived | grep Entering | grep -v 'Unsafe permissions' | tail -1 | awk '{print $8}')"
interval 30 # 每隔多少秒进行检查一次
}
vrrp_instance VI_1 {
virtual_router_id 53 # 一个标识, MASTER 和 BACKUP 必须一致
advert_int 5 # 多久进行一次master选举,即健康检查时间间隔
interface ens33 # 必须是一个真实存储的网络接口
# 调用检查脚本
track_script {
chk_script_health
}
# 认证权限密码,防止非法节点进入
authentication {
auth_type PASS
auth_pass 1111
}
# 停止keeplived进程,则应用程序也要停止,避免两个节点同时运行你的程序
notify_stop "/bin/bash /opt/airflow/airflow_ha/schdler_manager.sh STOP"
# -----------分割线以上可以完全相同,分割线一下必须不同-----------
state BACKUP # 表明当前节点类型: BACKUP 或者 MASTER
priority 100 # BACKUP 的值必须小于 MASTER 的值
# 用 vrrp 单播方式通告
unicast_src_ip 10.10.10.92
unicast_peer {
10.10.10.91
}
}
2.5 同步配置
# 每次修改配置后都同步一下
scp /opt/airflow/airflow.cfg work@node2:/opt/airflow/airflow.cfg
2.6 启动集群
初始集群
- 首次安装需要初始、创建用户,在 node1 上初始化即可。
source /opt/airflow/airflow_env/bin/activate
airflow db init
airflow users create \
--username flow1 \
--role Admin \
--password flow1 \
--firstname first_flow1 \
--lastname last_flow1 \
--email example@163.com
启动集群
# 每个节点启动 4 个服务,可以使用服务管理工具比如 supervisor 来统一管理
# 这里我会开 4 个终端分别前台启动,按照下面顺序启动。
airflow webserver
airflow celery worker
airflow celery flower
# 启动1 /opt/airflow/start1.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
nohup airflow webserver >>/opt/airflow/logs/webserver.log 2>&1 &
# 启动2 /opt/airflow/start2.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
nohup airflow celery worker >>/opt/airflow/logs/celery_worker.log 2>&1 &
# 启动3 /opt/airflow/start3.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
nohup airflow celery flower >>/opt/airflow/logs/celery_flower.log 2>&1 &
# 启动4 keepalived 会管理 Scheduler 的启动
sudo systemctl restart keepalived
sudo systemctl status keepalived
- 启动后 Scheduler 会报如下警告,那是因为还没有安装 statsd_exporter
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2023-06-29T23:02:35.763+0800] {executor_loader.py:114} INFO - Loaded executor: CeleryExecutor
[2023-06-29T23:02:35.766+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
[2023-06-29T23:02:35.782+0800] {scheduler_job_runner.py:788} INFO - Starting the scheduler
[2023-06-29T23:02:35.782+0800] {scheduler_job_runner.py:795} INFO - Processing each file at most -1 times
[2023-06-29T23:02:35.785+0800] {manager.py:165} INFO - Launched DagFileProcessorManager with pid: 50613
[2023-06-29T23:02:35.786+0800] {scheduler_job_runner.py:1553} INFO - Resetting orphaned tasks for active dag runs
[2023-06-29T23:02:35.789+0800] {settings.py:60} INFO - Configured default timezone Timezone('Asia/Shanghai')
[2023-06-29T23:02:35.824+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
[2023-06-29T23:02:35.922+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
[2023-06-29T23:02:35.930+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
[2023-06-29T23:02:35.931+0800] {base.py:885} WARNING - Error submitting packet: [Errno 111] Connection refused, dropping the packet and closing the socket
2.7 安装 statsd_exporter
- 在两个节点 node1 noede2 上安装,下面的命令启动了 8125 端口接收 Scheduler 发送的数据,处理收到的数据后,又开启了监听端口 9102,用于把数据开放给 prometheus 来抓取,最后可以 还可以配置 grafana 来展示 prometheus 的数据,这是一个完整的闭合链路(prometheus、grafana 的安装略)。
wget https://github.com/prometheus/statsd_exporter/releases/download/v0.22.5/statsd_exporter-0.22.5.linux-amd64.tar.gz
tar -zxf statsd_exporter-0.22.5.linux-amd64.tar.gz -C /opt/airflow/
# 启动脚本4 /opt/airflow/start4.sh
#!/bin/bash
cd /opt/airflow/statsd_exporter-0.22.5.linux-amd64/
nohup ./statsd_exporter --statsd.listen-udp localhost:8125 --web.listen-address=":9102" >>./statsd_exporter.log 2>&1 &
2.8 配置 NGINX
nginx 配置 node1 为master,只有 node1 故障以后,才会使用 node2 的 Webserver
/etc/nginx/conf.d/airflow.conf
upstream airflow_servers {
server node1:9099;
server node2:9099 backup;
}
server {
listen 80;
location / {
proxy_pass http://airflow_servers;
}
}
2.8 测试 DAG
vim /opt/airflow/dags/dag1.py
,在 node1 创建 dag。scp /opt/airflow/dags/dag1.py node2:/opt/airflow/dags/
,同步到 node2 上。
import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1),
'depends_on_past': False,
}
with DAG(
dag_id='dag1',
schedule_interval="* * * * *",
start_date=datetime.datetime(2023, 6, 28, 17),
catchup=False,
tags=['bigdata'],
default_args=default_args,
) as dag:
task1 = PythonOperator(task_id='task1', python_callable=lambda: print("task11111"))
三. 集群扩张
如果任务太多,celery的Worker计算不过了,可以随意的增加Worker的数量来横向扩展。集群扩展其实主要是扩展 celery,包括:airflow-celery-worker、airflow-celery-flower,在新的节点只需要启动这两个服务即可,大概步骤如下:
- 在新节点上,按本文的照步骤安装 airflow 相关的所有包,同本文前面 安装airflow 的步骤。
- 同步旧节点上的 /opt/airflow/airflow.cfg 文件到新节点 /opt/airflow/airflow.cfg
- 新节点启动 celery:airflow-celery-worker、 airflow-celery-flower
四. 常用接口
health
- http://node1:9099/health 它返回如下两个组件的状态:
- scheduler: 根据连接的数据库的最新运行的job来判断的,所以说,如果你的集群使用了同一个数据库,那么这个健康状态检查无论检查 node1,还是检查 node2 肯定都是健康的。
- metadatabase:只要检查状态的过程不出错,那么这个值就是healthy的,通常也是健康的。
celery
- celery flower1 http://node1:5555/flower/dashboard
- celery flower2 http://node2:5555/flower/dashboard
webserver
- webserver1 http://node1:9099/home 也可以用 nginx 提供的统一地址。
- webserver1 http://node2:9099/home 也可以用 nginx 提供的统一地址。