Airflow单节点安装部署与架构
一. 安装部署
Airflow是一种任务调度系统,可以认为它是crontab的升级版,crontab只能在指定时间去执行,它并不能解决多任务依赖的问题。Airflow灵活性非常高,功能非常强大,虽然新手在接触时有点懵,不理解为什么要这么设计,但是相信我,上手之后你会发现它的唯美之处。
通常情况下,单节点安装 Airflow 就能解决大部分的任务调度依赖了,如果任务太多,它也可以很方便的增加 worker 节点。
1.1 配置MySQL
MySQL的安装过程略,安装好MySQL后,登录MySQL创建Airflow数据库,本示例使用MySQL版本5.7.28。
$ mysql -uroot -p123456MySQL_
> CREATE DATABASE airflow_db_single CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
> set global validate_password_policy=0;
> CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
> GRANT ALL PRIVILEGES ON airflow_db_single.* TO 'airflow_user';
1.2 虚拟环境安装 Airflow
# 创建 /opt/airflow 为 AIRFLOW_HOME
sudo mkdir /opt/airflow
sudo chown work:work /opt/airflow
export AIRFLOW_HOME=/opt/airflow
# 创建虚拟环境,确保不污染其他Python环境
sudo pip3 install virtualenv
virtualenv -p /usr/local/bin/python3.8 /opt/airflow/airflow_env
source /opt/airflow/airflow_env/bin/activate
pip -V
# 安装指定 Airflow Python 版本, Python3.11 暂不支持
AIRFLOW_VERSION=2.6.2
PYTHON_VERSION=python3.8
# 指定下载链接
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# 安装 Airflow 及其依赖
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install mysql-connector-python
# 安装完成后运行 airflow --help 生成配置文件
# 生成的配置文件路径 $AIRFLOW_HOME/airflow.cfg
# airflow --help 命令会报错,不用理会,我们的目的是创建默认配置文件
# 运行之前,确保 AIRFLOW_HOME 环境变是正确的,因为配置文件会读取该变量
echo $AIRFLOW_HOME
airflow --help
1.3 配置 Airflow 并初始化
- $AIRFLOW_HOME/airflow.cfg
# 修改主要配置文件,注意:时区一定要修改成Asia/Shanghai(假设使用场景在国内)
[core]
sql_alchemy_conn = mysql+mysqlconnector://airflow_user:airflow_pass@localhost:3306/airflow_db_single
load_examples = False
executor = LocalExecutor
default_timezone = Asia/Shanghai
default_ui_timezone = Asia/Shanghai
[api]
auth_backends = airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth
[scheduler]
dag_dir_list_interval = 60
# 初始化数据库 创建用户:用户名flow1 密码flow1
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow db init
airflow users create \
--username flow1 \
--role Admin \
--password flow1 \
--firstname first_flow1 \
--lastname last_flow1 \
--email example@163.com
# 如果初始化报如下的错,请修改 MySQL 配置 /etc/my.cnf,重启MySQL
# [mysqld]
# explicit_defaults_for_timestamp=1
sqlalchemy.exc.ProgrammingError: (mysql.connector.errors.ProgrammingError) 1067 (42000):
Invalid default value for 'updated_at'
1.4 启动 Airflow 组件
单机版主要启动两个组件:webserver scheduler,编写两个脚本分别在两个终端启动。
- /opt/airflow/start1.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow webserver --port 8080
- /opt/airflow/start2.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow scheduler
浏览器登录airflow web界面 http://yourhostname:8080/ 用户名flow1 密码flow1
1.5 重置 Airflow
如果在测试过程中出现问题,可以重置以后再来一次。
- 停止 webserver 和 scheduler
- 删除 MySQL 数据 airflow_db_single、删除 airflow 目录下的 logs 和 dags 目录内的所有文件
- 然后重新初始化 Airflow
mysql -uroot -p123456MySQL_
> drop database airflow_db_single;
> CREATE DATABASE airflow_db_single CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
rm -rf /opt/airflow/dags/*
rm -rf /opt/airflow/logs/*
二. 基础架构
2.1 基础概念
一个Airflow安装实例主要包含了下面这些组件:
- dag: 将所有需要运行的tasks按照依赖关系组织起来,描述了所有tasks执行的顺序。
- dag run: 一个dag的运行实例,根据 scheduler interval来生成
- scheduler: 定时触发workflow即dag的运行,以及提交tasks到executor中执行。
- executor: 它处理scheduler提交的tasks,多数情况下在scheduler中的Exector会把tasks推送到workers中执行。
- webserver: 用户界面可以实现一些基本操作,比如开启、关闭dag运行等。
- worker: 最终运行 task 的节点。
- dags folder: 用于存放所有的dag files,scheduler 和 executor 从该目录中读取。
- metadata database: 用于存储scheduler、executor、webserver、worker产生的数据。
- task: task 是 Operator的一个实例,也就是 DAG 中的一个node。
- task instance: task的一次运行。task instance 有自己的状态,包括running, success, failed, skipped, up for retry等。
- task Relationships: DAG中的不同Tasks之间可以有依赖关系,如 TaskA >> TaskB,表明TaskB依赖TaskA。
下面是Airflow的一个架构图。
2.2 task分类
Operators
预定义task,可以简单理解为一个class,描述了DAG中一个具体的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求等等,同时用户可以自定义Operator。
Sensors
它是 Operators 的一个特殊类型,它被设计用于等待某些事件的发生,可以基于时间、等待某个文件生成、一个外部事件等,他们能做的就是等待,当它成功以后,下游的task就可以执行了。
自定义task
自定义 task 使用 @task 装饰的自定义 task,实际上很少用到,预定义的 Operators和 Sensors 基本能解决大部分场景下的问题了。
三. 添加任务
Airflow的所有任务都在 $AIRFLOW_HOME/dags 目录下面,都是 .py 文件,一个文件通常只写一个dag,在该目录下添加dag Python文件,Airflow scheduler 会自动去加载文件内容。
在安装operator或者sensor后,需要重启webserver、scheduler后才能在web界面查看到对应的类型。
3.1 python任务
/opt/airflow/dags/python_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def python_custom(ds, **kwargs):
print('ds:', ds)
print('kwargs:', kwargs)
with DAG(
dag_id='python_dag',
default_args=default_args,
description='A simple haha DAG',
schedule_interval='* * * * *',
start_date=datetime(2023, 6, 20, 11, 35),
catchup=False,
tags=['new_example'],
) as dag:
python_task1 = PythonOperator(
task_id='python_task1',
provide_context=True,
python_callable=python_custom,
)
3.2 bash任务
- 安装依赖
# 安装 provider
pip install apache-airflow-providers-ssh
- /opt/airflow/dags/bash_dag.py
from airflow import DAG
from textwrap import dedent
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='bash_dag',
default_args=default_args,
description='A simple haha DAG',
schedule_interval='* * * * *',
start_date=datetime(2023, 06, 12),
catchup=False,
tags=['new_example'],
) as dag:
dag.doc_md = """
dag documentation
"""
t1 = BashOperator(
task_id='t1',
bash_command='date >> /tmp/dagBashOperator',
)
t1.doc_md = dedent(
"""
t1 task Documentation, You can document here
"""
)
3.3 hive任务
- 安装 operator
# 安装依赖
sudo yum install python3-devel cyrus-sasl-devel
source /opt/airflow/airflow_env/bin/activate
pip install apache-airflow-providers-apache-hive
- 在web界面配置 Hive Server 2 Thrift,连接名为 hiveserver2_default(Admin => Connections => 新建连接)
# 连接类型
Hive Server 2 Thrift
# 选择安装 Hive Server 的节点
# 端口默认
10000
- /opt/airflow/dags/hive_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.operators.hive import HiveOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='hive_dag',
default_args=default_args,
description='A simple haha DAG',
schedule_interval='*/5 * * * *',
start_date=datetime(2023, 06, 12),
catchup=False,
tags=['new_example'],
) as dag:
t1 = HiveOperator(
task_id="t1",
hive_cli_conn_id="hiveserver2_default",
hql="select name,city,age,salary from test.person"
)
3.4 hive sensor任务
- 安装sensor,用于检查hive表指定分区是否存在
pip install apache-airflow-providers-apache-hive
- 同样要先设置 connections,注意是 Hive metastore Thrift
# 连接类型
Hive metastore Thrift
# 选择安装 Hive metastore 的节点
# 端口默认
9083
- /opt/airflow/dags/hive_partition_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor
def get_dt_partition(logical_date, interval=8):
new_date = (logical_date + timedelta(hours=interval))
dt = new_date.strftime("%Y-%m-%d")
partition = "dt=%s" % (dt)
return partition
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='hive_partition_dag',
default_args=default_args,
description='A simple DAG',
schedule_interval='5 20 * * *',
start_date=datetime(2023, 6, 24, 21, 15),
catchup=False,
tags=['new_example'],
user_defined_macros={
"get_dt_partition": get_dt_partition,
}
) as dag:
profile_new = HivePartitionSensor(
task_id='profile_new',
# hive的 hive_metastore 连接,通过ui界面的 Connections 进行配置
metastore_conn_id='metastore_default',
# 需要检查的hive表名。注意:需要加上数据库名
table='userprofile.profile_new',
# reschedule: 该模式在休眠期间不会占用slot,只有在执行时才会占用
mode='reschedule',
# 两次检查的间隔时间,单位秒。使用reschedule模式时,建议该值不小于60。
poke_interval=60,
# 需要检测的分区。这里是检查昨天的分区
partition='{{ get_dt_partition(logical_date) }}',
)
t1 = HiveOperator(
hive_cli_conn_id="t1",
task_id="selectHiveTask",
hql="select name,city,age,salary from test.person"
)
# t1 任务必须等到 profile_new 昨天的分区生成以后才执行
profile_new >> t1
3.5 mysql 任务
- operator准备
sudo yum install -y mysql-devel
pip install mysql apache-airflow-providers-mysql
- 在web界面配置 mysql 连接,连接id为 custom_mysql_id
- /opt/airflow/dags/mysql_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.mysql.operators.mysql import MySqlOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='mysql_dag',
default_args=default_args,
description='A simple haha DAG',
schedule_interval='*/5 * * * *',
start_date=datetime(2023, 06, 12),
catchup=False,
tags=['new_example'],
) as dag:
t1 = MySqlOperator(
task_id="t1",
mysql_conn_id="custom_mysql_id",
sql="select * from mysql.user"
)
3.6 ssh任务
SSHOperator使用场景:当需要登录到其他机器执行脚本时使用。
- 安装provider
# 安装依赖
pip install apache-airflow-providers-ssh
- 在 web 界面创建 ssh 的 connection: ssh_node2
- /opt/airflow/dags/ssh_dag.py
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.ssh.operators.ssh import SSHOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='ssh_dag',
default_args=default_args,
description='for spark',
schedule_interval='* * * * *',
start_date=datetime(2023, 6, 25),
catchup=False,
tags=['big_data']
) as dag:
test_ssh = SSHOperator(
ssh_conn_id='ssh_node2',
task_id='test_ssh',
depends_on_past=False,
command="hostname; echo 999 > /tmp/test_ssh",
)
参考资料: