Airflow Scheduler源码解读
一. 执行流程概要
- 首先 Scheduler 的 DagFileProcessorAgent 进程定时(dag_dir_list_interval配置指定秒数)扫描 dags 目录下的 py 文件,解析 DAG 生成 DAG 对象以及 TASK 对象写入数据库中。
- Scheduler 每秒钟从数据库读取DAG 和 task 的定义、状态、调度规则信息,判断是否执行,如需要执行,将生成 task instance 并发送到 Executor 消息队列。
- Worker 从消息队列中获取任务消息,并启动实际运行任务。
- Worker 执行完任务后把执行结果放入 result backend 中。
- Scheduler 定时去 result backend 获取任务的执行状态,并更新到数据库中。
- Webserver 负责展示DAG task 的各种状态信息、历史数据,以及对DAG 和 task 的run、clear等常用操作。
- 各个组件的交互逻辑,这里 Queue broker相当于 Redis 消息队列。
二. 执行流程源码
Airflow源码版本2.6.2,地址 https://github.com/apache/airflow.git ,本文以 CeleryExecutor 为例,梳理源码流程。
入口文件
- airflow/cli/cli_config.py 配置了所有的 Airflow 命令参数
- airflow/cli/commands/scheduler_command.py 入口文件,命令行调用 scheduler 函数启动 Scheduler 服务。
@cli_utils.action_cli
def scheduler(args):
"""Starts Airflow Scheduler."""
print(settings.HEADER)
job_runner = SchedulerJobRunner(
job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle
)
ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)
if args.daemon:
pid, stdout, stderr, log_file = setup_locations(
"scheduler", args.pid, args.stdout, args.stderr, args.log_file
)
handle = setup_logging(log_file)
with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle:
stdout_handle.truncate(0)
stderr_handle.truncate(0)
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout_handle,
stderr=stderr_handle,
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
_run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs)
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
_run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs)
- 创建一个job_runner:在指定的时间间隔运行,用于调度满足条件的 task(ready to run)
- 创建 Executor,从配置文件中获取Executor配置 conf.get_mandatory_value("core", "executor"),同时检测 Executor 和 Database 是否兼容,比如 SQLite 就只持 Sequential Executor。实际上就是创建了一个 CeleryExecutor 实例,它使用 BulkStateFetcher 来批量获取 celery 的的执行状态。
- CeleryExecutor位置:airflow.executors.celery_executor.CeleryExecutor
- 初始化日志:pid文件、stdout、stderr、log
- 准备就绪后调用 _run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs) ==》run_job()启动服务,实际上是执行了job_runner._execute,重点回到了 job_runner上。
实例 job_runner
def _execute(self) -> int | None:
from airflow.dag_processing.manager import DagFileProcessorAgent
self.log.info("Starting the scheduler")
executor_class, _ = ExecutorLoader.import_default_executor_cls()
# DAGs can be pickled for easier remote execution by some executors
pickle_dags = self.do_pickle and executor_class.supports_pickling
self.log.info("Processing each file at most %s times", self.num_times_parse_dags)
if not self._standalone_dag_processor:
self.processor_agent = DagFileProcessorAgent(
dag_directory=Path(self.subdir),
max_runs=self.num_times_parse_dags,
processor_timeout=processor_timeout,
dag_ids=[],
pickle_dags=pickle_dags,
async_mode=async_mode,
)
self.job.executor.start()
if self.processor_agent:
self.processor_agent.start()
- job_runner 文件位置 airflow/jobs/scheduler_job_runner.py
- 启动 Executor 进程 Celery Executor,executor 启动其实没有启动进程,只是打印了以后日志而已。
- 启动 DagFileProcessorManager 子进程,专门用于解析 dag 文件,我们会看到 setproctitle("airflow scheduler -- DagFileProcessorManager"),这也是命令行使用 ps grep 是看到的,子进程会进入一个死循环 while True: 在这里面解析dag文件。
- 运行 self._run_scheduler_loop() ,这个才是真正的 Scheduler 循环执行的主函数,进入该函数,除非出现异常或停止程序,否则是不会退出这个循环的。
函数 _run_scheduler_loop
首先启动指定的定时任务,这些定时任务完成指定的任务,然后
def _run_scheduler_loop(self) -> None:
"""
The actual scheduler loop.
The main steps in the loop are:
#. Harvest DAG parsing results through DagFileProcessorAgent
#. Find and queue executable tasks
#. Change task instance state in DB
#. Queue tasks in executor
#. Heartbeat executor
#. Execute queued tasks in executor asynchronously
#. Sync on the states of running tasks
Following is a graphic representation of these steps.
.. image:: ../docs/apache-airflow/img/scheduler_loop.jpg
"""
if not self.processor_agent and not self._standalone_dag_processor:
raise ValueError("Processor agent is not started.")
is_unit_test: bool = conf.getboolean("core", "unit_test_mode")
timers = EventScheduler()
# Check on start up, then every configured interval
self.adopt_or_reset_orphaned_tasks()
timers.call_regular_interval(
conf.getfloat("scheduler", "orphaned_tasks_check_interval", fallback=300.0),
self.adopt_or_reset_orphaned_tasks,
)
- 首先创建一个 EventScheduler 实例,通过这个实例创建多个定时检查:
- 每5分钟检查一次 orphaned_tasks
- 每15秒检查 "deferred" state (延期)、执行超时的任务,将其置为failed。
- 每5秒钟上报一次 pool_metrics 指标
- 每10秒钟检测一次 zombie task instances 任务
- 每60秒更新一次 paused dags 的 dag_run 运行状态
- 每120秒检查在 queued 中的任务是否超时,默认超时 task_queued_timeout 600秒
- 每60秒检查stale DAGs,即那些dag文件被删除的dag,这些dag应该被 deactivated
- 通过 itertools.count(start=1) 启动一个一直运行的循环,调用 _do_scheduling, 它是决定一个任务是否被放进队列的主函数:在合适时创建dag_runs、更新dag_run的状态、更新这个dag_run里面的task instance状态、创建task实例并发送到 Executor。
- self.job.executor.heartbeat() 根据可用的 open_slots数量,判断是否运行 新任务。
- self._process_executor_events(session=session) 检查已经运行完成的task
- self.processor_agent.heartbeat() 处理 processor_agent 的消息
三. task如何进入scheduled状态的
在 scheduler_job_runner.py 里
- _do_scheduling(self, session: Session) =>
- self._schedule_all_dag_runs(guard, dag_runs, session) =>
- dag_run.schedule_tis(schedulable_tis, session)
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
# Check if DAG not scheduled then skip interval calculation to same scheduler runtime
if dag_run.state in State.finished:
# Work out if we should allow creating a new DagRun now?
if self._should_update_dag_next_dagruns(dag, dag_model, session=session):
dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the execution dates and dag
# IDs in a single query, but it turns out that can be _very very slow_
# see #11147/commit ee90807ac for more details
dag_run.schedule_tis(schedulable_tis, session)
四. task如何进入queue队列
当使用 CeleryExecutor 时
scheduled状态如何进入queue队列的
scheduled状态的 task instance 进入queue主要分为三步(注意查询数据库时会锁表,确保同一时间只有一个Executor成功):
- 根据task权重、max_active_runs 等条件选出可以运行的 task instance
- 更新数据库的这些选择的 task 为 queued 状态
- 把这些 task instance 生成 task 能执行的command,放到 CeleryExecutor 的一个有序字典里面
self.queued_tasks: OrderedDict[TaskInstanceKey, QueuedTaskInstanceType] = OrderedDict()
,也就是说传递给 CeleryExecutor 了。在 Scheduler 里面通过self.job.executor.heartbeat()
触发 Executor 把task command 发送到 redis broker。
def _critical_section_enqueue_task_instances(self, session: Session) -> int:
"""
Enqueues TaskInstances for execution.
There are three steps:
1. Pick TIs by priority with the constraint that they are in the expected states
and that we do exceed max_active_runs or pool limits.
2. Change the state for the TIs above atomically.
3. Enqueue the TIs in the executor.
HA note: This function is a "critical section" meaning that only a single executor process can execute
this function at the same time. This is achieved by doing ``SELECT ... from pool FOR UPDATE``. For DBs
that support NOWAIT, a "blocked" scheduler will skip this and continue on with other tasks (creating
new DAG runs, progressing TIs from None to SCHEDULED etc.); DBs that don't support this (such as
MariaDB or MySQL 5.x) the other schedulers will wait for the lock before continuing.
:param session:
:return: Number of task instance with state changed.
"""
if self.job.max_tis_per_query == 0:
max_tis = self.job.executor.slots_available
else:
max_tis = min(self.job.max_tis_per_query, self.job.executor.slots_available)
queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
self._enqueue_task_instances_with_queued_state(queued_tis, session=session)
return len(queued_tis)
下面是在 Scheduler 的主进程里面,把 task 发送到 celery redis broker 里面的函数:
def send_task_to_executor(
task_tuple: TaskInstanceInCelery,
) -> tuple[TaskInstanceKey, CommandType, AsyncResult | ExceptionWithTraceback]:
"""Sends task to executor."""
key, command, queue, task_to_run = task_tuple
try:
with timeout(seconds=OPERATION_TIMEOUT):
result = task_to_run.apply_async(args=[command], queue=queue)
except Exception as e:
exception_traceback = f"Celery Task ID: {key}\n{traceback.format_exc()}"
result = ExceptionWithTraceback(e, exception_traceback)
return key, command, result
当使用 SequentialExecutor时
当 Scheduler 把 task 发送给 Executor 时,同样通过 self.job.executor.heartbeat()
来触发 command 执行,Executor 在 sync() 函数直接调用 subprocess.check_call(command, close_fds=True)
来同步执行命令,所以 SequentialExecutor 一次只能执行一个task。
当使用 LocalExecutor 时
LocalExecutor 使用 execute_async() 启动本地进程 Process 来异步运行任务,它可以并发运行task,Airflow 把这个Process 叫做 LocalWorker。
五. task如何被worker执行
如果是使用SequentialExecutor、LocalExecutor的话,Scheduler把task command传递给 Executor 的时候,就触发执行command了,但是当使用 CeleryExecutor 的时候,只是把 task command 发送给了 celery 的 redis broker,所以现在需要启动 celery worker 来消费 broker 的 task command,下面是启动 worker 的命令:
airflow celery worker
- airflow/cli/commands/celery_command.py 启动 worker 的入口文件,调用 worker() 函数启动worker,所谓 Worker 其实是 Celery 的工作进程,一个 Worker 根据 concurrency 启动若干个守护进程,用于任务的并发执行。
@cli_utils.action_cli
def worker(args):
"""Starts Airflow Celery worker."""
# Disable connection pool so that celery worker does not hold an unnecessary db connection
settings.reconfigure_orm(disable_connection_pool=True)
if not settings.validate_session():
raise SystemExit("Worker exiting, database connection precheck failed.")
autoscale = args.autoscale
skip_serve_logs = args.skip_serve_logs
....
- 创建 celery app
app = Celery(conf.get("celery", "CELERY_APP_NAME"), config_source=celery_configuration)
- worker经过一系列的初始化后,直接启动 worker ,因为在把command 放入到worker 的时候就已经按照 celery 的格式写入的,所以,读取的时候也按照 celery的格式直接读取
if args.daemon:
# Run Celery worker as daemon
handle = setup_logging(log_file)
with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle:
if args.umask:
umask = args.umask
else:
umask = conf.get("celery", "worker_umask", fallback=settings.DAEMON_UMASK)
stdout_handle.truncate(0)
stderr_handle.truncate(0)
daemon_context = daemon.DaemonContext(
files_preserve=[handle],
umask=int(umask, 8),
stdout=stdout_handle,
stderr=stderr_handle,
)
with daemon_context, _serve_logs(skip_serve_logs):
celery_app.worker_main(options)
else:
# Run Celery worker in the same process
with _serve_logs(skip_serve_logs):
celery_app.worker_main(options)
- celery_app.worker_main(options) 就是启动worker的代码,当消费到任务后,把任务传递给 execute_command() 函数执行task,我们完全不用关系celery怎么序列化、反序列化的,我直接用它的装饰其即可。
@app.task
def execute_command(command_to_exec: CommandType) -> None:
"""Executes command."""
dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(command_to_exec)
celery_task_id = app.current_task.request.id
log.info("[%s] Executing command in Celery: %s", celery_task_id, command_to_exec)
with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id):
try:
if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
_execute_in_subprocess(command_to_exec, celery_task_id)
else:
_execute_in_fork(command_to_exec, celery_task_id)
except Exception:
Stats.incr("celery.execute_command.failure")
raise
- 最后 worker 通过 subprocess.check_output() 执行最终的命令。
def _execute_in_subprocess(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
env = os.environ.copy()
if celery_task_id:
env["external_executor_id"] = celery_task_id
try:
subprocess.check_output(command_to_exec, stderr=subprocess.STDOUT, close_fds=True, env=env)
except subprocess.CalledProcessError as e:
log.exception("[%s] execute_command encountered a CalledProcessError", celery_task_id)
log.error(e.output)
msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
raise AirflowException(msg)