已复制
全屏展示
复制代码

Airflow单节点安装部署与架构


· 8 min read

一. 安装部署

Airflow是一种任务调度系统,可以认为它是crontab的升级版,crontab只能在指定时间去执行,它并不能解决多任务依赖的问题。Airflow灵活性非常高,功能非常强大,虽然新手在接触时有点懵,不理解为什么要这么设计,但是相信我,上手之后你会发现它的唯美之处。

通常情况下,单节点安装 Airflow 就能解决大部分的任务调度依赖了,如果任务太多,它也可以很方便的增加 worker 节点。

1.1 配置MySQL

MySQL的安装过程略,安装好MySQL后,登录MySQL创建Airflow数据库,本示例使用MySQL版本5.7.28。

$ mysql -uroot -p123456MySQL_
> CREATE DATABASE airflow_db_single CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
> set global validate_password_policy=0;
> CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
> GRANT ALL PRIVILEGES ON airflow_db_single.* TO 'airflow_user';

1.2 虚拟环境安装 Airflow

# 创建 /opt/airflow 为 AIRFLOW_HOME
sudo mkdir /opt/airflow
sudo chown work:work /opt/airflow
export AIRFLOW_HOME=/opt/airflow


# 创建虚拟环境,确保不污染其他Python环境
sudo pip3 install virtualenv
virtualenv -p /usr/local/bin/python3.8 /opt/airflow/airflow_env
source /opt/airflow/airflow_env/bin/activate
pip -V

# 安装指定 Airflow Python 版本, Python3.11 暂不支持
AIRFLOW_VERSION=2.6.2
PYTHON_VERSION=python3.8

# 指定下载链接
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# 安装 Airflow 及其依赖
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install mysql-connector-python


# 安装完成后运行 airflow --help 生成配置文件
# 生成的配置文件路径 $AIRFLOW_HOME/airflow.cfg
# airflow --help 命令会报错,不用理会,我们的目的是创建默认配置文件
# 运行之前,确保 AIRFLOW_HOME 环境变是正确的,因为配置文件会读取该变量
echo $AIRFLOW_HOME
airflow --help

1.3 配置 Airflow 并初始化

  • $AIRFLOW_HOME/airflow.cfg
# 修改主要配置文件,注意:时区一定要修改成Asia/Shanghai(假设使用场景在国内)
[core]
sql_alchemy_conn = mysql+mysqlconnector://airflow_user:airflow_pass@localhost:3306/airflow_db_single
load_examples = False
executor = LocalExecutor
default_timezone = Asia/Shanghai
default_ui_timezone = Asia/Shanghai

[api]
auth_backends = airflow.api.auth.backend.session,airflow.api.auth.backend.basic_auth

[scheduler]
dag_dir_list_interval = 60


# 初始化数据库 创建用户:用户名flow1 密码flow1
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow db init
airflow users create \
    --username flow1 \
    --role Admin \
    --password flow1 \
    --firstname first_flow1 \
    --lastname last_flow1 \
    --email example@163.com

# 如果初始化报如下的错,请修改 MySQL 配置 /etc/my.cnf,重启MySQL
# [mysqld]
# explicit_defaults_for_timestamp=1

sqlalchemy.exc.ProgrammingError: (mysql.connector.errors.ProgrammingError) 1067 (42000): 
Invalid default value for 'updated_at'

1.4 启动 Airflow 组件

单机版主要启动两个组件:webserver  scheduler,编写两个脚本分别在两个终端启动。

  • /opt/airflow/start1.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow webserver --port 8080
  • /opt/airflow/start2.sh
#!/bin/bash
source /opt/airflow/airflow_env/bin/activate
export AIRFLOW_HOME=/opt/airflow
airflow scheduler

浏览器登录airflow web界面 http://yourhostname:8080/  用户名flow1  密码flow1

1.5 重置 Airflow

如果在测试过程中出现问题,可以重置以后再来一次。

  • 停止 webserver 和 scheduler
  • 删除 MySQL 数据 airflow_db_single、删除 airflow 目录下的 logs 和 dags 目录内的所有文件
  • 然后重新初始化 Airflow
mysql -uroot -p123456MySQL_
> drop database airflow_db_single;
> CREATE DATABASE airflow_db_single CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;


rm -rf /opt/airflow/dags/*
rm -rf /opt/airflow/logs/*

二. 基础架构

2.1 基础概念

一个Airflow安装实例主要包含了下面这些组件:

  • dag:  将所有需要运行的tasks按照依赖关系组织起来,描述了所有tasks执行的顺序。
  • dag run:   一个dag的运行实例,根据 scheduler interval来生成
  • scheduler:  定时触发workflow即dag的运行,以及提交tasks到executor中执行。
  • executor:  它处理scheduler提交的tasks,多数情况下在scheduler中的Exector会把tasks推送到workers中执行。
  • webserver:  用户界面可以实现一些基本操作,比如开启、关闭dag运行等。
  • worker: 最终运行 task 的节点。
  • dags folder:  用于存放所有的dag files,scheduler 和 executor 从该目录中读取。
  • metadata database: 用于存储scheduler、executor、webserver、worker产生的数据。
  • task: task 是 Operator的一个实例,也就是 DAG 中的一个node。
  • task instance: task的一次运行。task instance 有自己的状态,包括running, success, failed, skipped, up for retry等。
  • task Relationships: DAG中的不同Tasks之间可以有依赖关系,如 TaskA >> TaskB,表明TaskB依赖TaskA。

下面是Airflow的一个架构图。

2.2 task分类

Operators

预定义task,可以简单理解为一个class,描述了DAG中一个具体的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求等等,同时用户可以自定义Operator。

Sensors

它是 Operators 的一个特殊类型,它被设计用于等待某些事件的发生,可以基于时间、等待某个文件生成、一个外部事件等,他们能做的就是等待,当它成功以后,下游的task就可以执行了。

自定义task

自定义 task 使用 @task 装饰的自定义 task,实际上很少用到,预定义的 Operators和 Sensors 基本能解决大部分场景下的问题了。

三. 添加任务

Airflow的所有任务都在 $AIRFLOW_HOME/dags 目录下面,都是 .py 文件,一个文件通常只写一个dag,在该目录下添加dag Python文件,Airflow scheduler 会自动去加载文件内容。

在安装operator或者sensor后,需要重启webserver、scheduler后才能在web界面查看到对应的类型。

3.1 python任务

/opt/airflow/dags/python_dag.py

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


def python_custom(ds, **kwargs):
    print('ds:', ds)
    print('kwargs:', kwargs)


with DAG(
        dag_id='python_dag',
        default_args=default_args,
        description='A simple haha DAG',
        schedule_interval='* * * * *',
        start_date=datetime(2023, 6, 20, 11, 35),
        catchup=False,
        tags=['new_example'],
) as dag:
    python_task1 = PythonOperator(
        task_id='python_task1',
        provide_context=True,
        python_callable=python_custom,
    )

3.2 bash任务

  • 安装依赖
# 安装 provider
pip install apache-airflow-providers-ssh
  • /opt/airflow/dags/bash_dag.py
from airflow import DAG
from textwrap import dedent
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        dag_id='bash_dag',
        default_args=default_args,
        description='A simple haha DAG',
        schedule_interval='* * * * *',
        start_date=datetime(2023, 06, 12),
        catchup=False,
        tags=['new_example'],
) as dag:
    dag.doc_md = """
    dag documentation
    """

    t1 = BashOperator(
        task_id='t1',
        bash_command='date >> /tmp/dagBashOperator',
    )
    t1.doc_md = dedent(
        """
        t1 task Documentation, You can document here
        """
    )

3.3 hive任务

  • 安装 operator
# 安装依赖
sudo yum install python3-devel cyrus-sasl-devel
source /opt/airflow/airflow_env/bin/activate
pip install apache-airflow-providers-apache-hive
  • 在web界面配置 Hive Server 2 Thrift,连接名为 hiveserver2_default(Admin => Connections  => 新建连接)
# 连接类型
Hive Server 2 Thrift

# 选择安装 Hive Server 的节点

# 端口默认
10000
  • /opt/airflow/dags/hive_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.operators.hive import HiveOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        dag_id='hive_dag',
        default_args=default_args,
        description='A simple haha DAG',
        schedule_interval='*/5 * * * *',
        start_date=datetime(2023, 06, 12),
        catchup=False,
        tags=['new_example'],
) as dag:
    t1 = HiveOperator(
        task_id="t1",
        hive_cli_conn_id="hiveserver2_default",
        hql="select name,city,age,salary from test.person"
    )

3.4 hive sensor任务

  • 安装sensor,用于检查hive表指定分区是否存在
pip install apache-airflow-providers-apache-hive
  • 同样要先设置 connections,注意是 Hive metastore Thrift
# 连接类型
Hive metastore Thrift

# 选择安装 Hive metastore 的节点

# 端口默认
9083
  • /opt/airflow/dags/hive_partition_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.apache.hive.sensors.hive_partition import HivePartitionSensor

def get_dt_partition(logical_date, interval=8):
    new_date = (logical_date + timedelta(hours=interval))
    dt = new_date.strftime("%Y-%m-%d")
    partition = "dt=%s" % (dt)
    return partition

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        dag_id='hive_partition_dag',
        default_args=default_args,
        description='A simple DAG',
        schedule_interval='5 20 * * *',
        start_date=datetime(2023, 6, 24, 21, 15),
        catchup=False,
        tags=['new_example'],
        user_defined_macros={
            "get_dt_partition": get_dt_partition,
        }
) as dag:
    profile_new = HivePartitionSensor(
        task_id='profile_new',
        
        # hive的 hive_metastore 连接,通过ui界面的 Connections 进行配置
        metastore_conn_id='metastore_default',
        
        # 需要检查的hive表名。注意:需要加上数据库名
        table='userprofile.profile_new',
        
        # reschedule: 该模式在休眠期间不会占用slot,只有在执行时才会占用
        mode='reschedule',
        
        # 两次检查的间隔时间,单位秒。使用reschedule模式时,建议该值不小于60。
        poke_interval=60,
        
        # 需要检测的分区。这里是检查昨天的分区
        partition='{{ get_dt_partition(logical_date) }}',
    )
    
    t1 = HiveOperator(
        hive_cli_conn_id="t1",
        task_id="selectHiveTask",
        hql="select name,city,age,salary from test.person"
    )
    
    # t1 任务必须等到 profile_new 昨天的分区生成以后才执行
    profile_new >> t1

3.5 mysql 任务

  • operator准备
sudo yum install -y mysql-devel
pip install mysql apache-airflow-providers-mysql
  • 在web界面配置 mysql 连接,连接id为 custom_mysql_id
  • /opt/airflow/dags/mysql_dag.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.mysql.operators.mysql import MySqlOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        dag_id='mysql_dag',
        default_args=default_args,
        description='A simple haha DAG',
        schedule_interval='*/5 * * * *',
        start_date=datetime(2023, 06, 12),
        catchup=False,
        tags=['new_example'],
) as dag:
    t1 = MySqlOperator(
        task_id="t1",
        mysql_conn_id="custom_mysql_id",
        sql="select * from mysql.user"
    )

3.6 ssh任务

SSHOperator使用场景:当需要登录到其他机器执行脚本时使用。

  • 安装provider
# 安装依赖
pip install apache-airflow-providers-ssh
  • 在 web 界面创建 ssh 的 connection: ssh_node2
  • /opt/airflow/dags/ssh_dag.py
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.ssh.operators.ssh import SSHOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
        dag_id='ssh_dag',
        default_args=default_args,
        description='for spark',
        schedule_interval='* * * * *',
        start_date=datetime(2023, 6, 25),
        catchup=False,
        tags=['big_data']
) as dag:
    test_ssh = SSHOperator(
        ssh_conn_id='ssh_node2',
        task_id='test_ssh',
        depends_on_past=False,
        command="hostname; echo 999 > /tmp/test_ssh",
    )
    

参考资料:


文章推荐