已复制
全屏展示
复制代码

Airflow Scheduler源码解读


· 9 min read

一. 执行流程概要

  • 首先 Scheduler 的 DagFileProcessorAgent 进程定时(dag_dir_list_interval配置指定秒数)扫描 dags 目录下的 py 文件,解析 DAG 生成 DAG 对象以及 TASK 对象写入数据库中。
  • Scheduler 每秒钟从数据库读取DAG 和 task 的定义、状态、调度规则信息,判断是否执行,如需要执行,将生成 task instance 并发送到 Executor 消息队列。
  • Worker 从消息队列中获取任务消息,并启动实际运行任务。
  • Worker 执行完任务后把执行结果放入 result backend 中。
  • Scheduler 定时去 result backend 获取任务的执行状态,并更新到数据库中。
  • Webserver 负责展示DAG task 的各种状态信息、历史数据,以及对DAG 和 task 的run、clear等常用操作。
  • 各个组件的交互逻辑,这里 Queue broker相当于 Redis 消息队列。

二. 执行流程源码

Airflow源码版本2.6.2,地址 https://github.com/apache/airflow.git ,本文以 CeleryExecutor 为例,梳理源码流程。

入口文件

  • airflow/cli/cli_config.py  配置了所有的 Airflow 命令参数
  • airflow/cli/commands/scheduler_command.py 入口文件,命令行调用 scheduler 函数启动 Scheduler 服务。
@cli_utils.action_cli
def scheduler(args):
    """Starts Airflow Scheduler."""
    print(settings.HEADER)

    job_runner = SchedulerJobRunner(
        job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs, do_pickle=args.do_pickle
    )
    ExecutorLoader.validate_database_executor_compatibility(job_runner.job.executor)

    if args.daemon:
        pid, stdout, stderr, log_file = setup_locations(
            "scheduler", args.pid, args.stdout, args.stderr, args.log_file
        )
        handle = setup_logging(log_file)
        with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle:
            stdout_handle.truncate(0)
            stderr_handle.truncate(0)

            ctx = daemon.DaemonContext(
                pidfile=TimeoutPIDLockFile(pid, -1),
                files_preserve=[handle],
                stdout=stdout_handle,
                stderr=stderr_handle,
                umask=int(settings.DAEMON_UMASK, 8),
            )
            with ctx:
                _run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs)
    else:
        signal.signal(signal.SIGINT, sigint_handler)
        signal.signal(signal.SIGTERM, sigint_handler)
        signal.signal(signal.SIGQUIT, sigquit_handler)
        _run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs)

  • 创建一个job_runner:在指定的时间间隔运行,用于调度满足条件的 task(ready to run)
  • 创建 Executor,从配置文件中获取Executor配置 conf.get_mandatory_value("core", "executor"),同时检测 Executor 和 Database 是否兼容,比如 SQLite 就只持 Sequential Executor。实际上就是创建了一个 CeleryExecutor 实例,它使用 BulkStateFetcher 来批量获取 celery 的的执行状态。
  • CeleryExecutor位置:airflow.executors.celery_executor.CeleryExecutor
  • 初始化日志:pid文件、stdout、stderr、log
  • 准备就绪后调用 _run_scheduler_job(job_runner, skip_serve_logs=args.skip_serve_logs) ==》run_job()启动服务,实际上是执行了job_runner._execute,重点回到了 job_runner上。

实例 job_runner

    def _execute(self) -> int | None:
        from airflow.dag_processing.manager import DagFileProcessorAgent

        self.log.info("Starting the scheduler")

        executor_class, _ = ExecutorLoader.import_default_executor_cls()

        # DAGs can be pickled for easier remote execution by some executors
        pickle_dags = self.do_pickle and executor_class.supports_pickling

        self.log.info("Processing each file at most %s times", self.num_times_parse_dags)
        if not self._standalone_dag_processor:
            self.processor_agent = DagFileProcessorAgent(
                dag_directory=Path(self.subdir),
                max_runs=self.num_times_parse_dags,
                processor_timeout=processor_timeout,
                dag_ids=[],
                pickle_dags=pickle_dags,
                async_mode=async_mode,
            )
        
        self.job.executor.start()
        if self.processor_agent:
            self.processor_agent.start()
            
  • job_runner 文件位置 airflow/jobs/scheduler_job_runner.py
  • 启动 Executor 进程 Celery Executor,executor 启动其实没有启动进程,只是打印了以后日志而已。
  • 启动 DagFileProcessorManager 子进程,专门用于解析 dag 文件,我们会看到 setproctitle("airflow scheduler -- DagFileProcessorManager"),这也是命令行使用 ps grep 是看到的,子进程会进入一个死循环 while True: 在这里面解析dag文件。
  • 运行 self._run_scheduler_loop() ,这个才是真正的 Scheduler 循环执行的主函数,进入该函数,除非出现异常或停止程序,否则是不会退出这个循环的。

函数 _run_scheduler_loop

首先启动指定的定时任务,这些定时任务完成指定的任务,然后

    def _run_scheduler_loop(self) -> None:
        """
        The actual scheduler loop.

        The main steps in the loop are:
            #. Harvest DAG parsing results through DagFileProcessorAgent
            #. Find and queue executable tasks
                #. Change task instance state in DB
                #. Queue tasks in executor
            #. Heartbeat executor
                #. Execute queued tasks in executor asynchronously
                #. Sync on the states of running tasks

        Following is a graphic representation of these steps.

        .. image:: ../docs/apache-airflow/img/scheduler_loop.jpg

        """
        if not self.processor_agent and not self._standalone_dag_processor:
            raise ValueError("Processor agent is not started.")
        is_unit_test: bool = conf.getboolean("core", "unit_test_mode")

        timers = EventScheduler()

        # Check on start up, then every configured interval
        self.adopt_or_reset_orphaned_tasks()

        timers.call_regular_interval(
            conf.getfloat("scheduler", "orphaned_tasks_check_interval", fallback=300.0),
            self.adopt_or_reset_orphaned_tasks,
        )
  • 首先创建一个 EventScheduler 实例,通过这个实例创建多个定时检查:
  • 每5分钟检查一次 orphaned_tasks
  • 每15秒检查 "deferred" state (延期)、执行超时的任务,将其置为failed。
  • 每5秒钟上报一次 pool_metrics 指标
  • 每10秒钟检测一次 zombie task instances 任务
  • 每60秒更新一次 paused dags 的 dag_run 运行状态
  • 每120秒检查在 queued 中的任务是否超时,默认超时 task_queued_timeout 600秒
  • 每60秒检查stale DAGs,即那些dag文件被删除的dag,这些dag应该被 deactivated
  • 通过 itertools.count(start=1) 启动一个一直运行的循环,调用 _do_scheduling, 它是决定一个任务是否被放进队列的主函数:在合适时创建dag_runs、更新dag_run的状态、更新这个dag_run里面的task instance状态、创建task实例并发送到 Executor。
  • self.job.executor.heartbeat() 根据可用的 open_slots数量,判断是否运行 新任务。
  • self._process_executor_events(session=session) 检查已经运行完成的task
  • self.processor_agent.heartbeat() 处理 processor_agent 的消息

三. task如何进入scheduled状态的

在 scheduler_job_runner.py 里

  • _do_scheduling(self, session: Session)  =>
  • self._schedule_all_dag_runs(guard, dag_runs, session)  =>
  • dag_run.schedule_tis(schedulable_tis, session)
        schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
        # Check if DAG not scheduled then skip interval calculation to same scheduler runtime
        if dag_run.state in State.finished:
            # Work out if we should allow creating a new DagRun now?
            if self._should_update_dag_next_dagruns(dag, dag_model, session=session):
                dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
        # This will do one query per dag run. We "could" build up a complex
        # query to update all the TIs across all the execution dates and dag
        # IDs in a single query, but it turns out that can be _very very slow_
        # see #11147/commit ee90807ac for more details
        dag_run.schedule_tis(schedulable_tis, session)

四. task如何进入queue队列

当使用 CeleryExecutor 时

scheduled状态如何进入queue队列的

scheduled状态的 task instance 进入queue主要分为三步(注意查询数据库时会锁表,确保同一时间只有一个Executor成功):

  • 根据task权重、max_active_runs 等条件选出可以运行的 task instance
  • 更新数据库的这些选择的 task 为 queued 状态
  • 把这些 task instance 生成 task 能执行的command,放到 CeleryExecutor 的一个有序字典里面self.queued_tasks: OrderedDict[TaskInstanceKey, QueuedTaskInstanceType] = OrderedDict(),也就是说传递给 CeleryExecutor 了。在 Scheduler 里面通过 self.job.executor.heartbeat() 触发 Executor 把task command 发送到 redis broker。
    def _critical_section_enqueue_task_instances(self, session: Session) -> int:
        """
        Enqueues TaskInstances for execution.

        There are three steps:
        1. Pick TIs by priority with the constraint that they are in the expected states
        and that we do exceed max_active_runs or pool limits.
        2. Change the state for the TIs above atomically.
        3. Enqueue the TIs in the executor.

        HA note: This function is a "critical section" meaning that only a single executor process can execute
        this function at the same time. This is achieved by doing ``SELECT ... from pool FOR UPDATE``. For DBs
        that support NOWAIT, a "blocked" scheduler will skip this and continue on with other tasks (creating
        new DAG runs, progressing TIs from None to SCHEDULED etc.); DBs that don't support this (such as
        MariaDB or MySQL 5.x) the other schedulers will wait for the lock before continuing.

        :param session:
        :return: Number of task instance with state changed.
        """
        if self.job.max_tis_per_query == 0:
            max_tis = self.job.executor.slots_available
        else:
            max_tis = min(self.job.max_tis_per_query, self.job.executor.slots_available)
        queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)

        self._enqueue_task_instances_with_queued_state(queued_tis, session=session)
        return len(queued_tis)

下面是在 Scheduler 的主进程里面,把 task 发送到 celery redis broker 里面的函数:

def send_task_to_executor(
    task_tuple: TaskInstanceInCelery,
) -> tuple[TaskInstanceKey, CommandType, AsyncResult | ExceptionWithTraceback]:
    """Sends task to executor."""
    key, command, queue, task_to_run = task_tuple
    try:
        with timeout(seconds=OPERATION_TIMEOUT):
            result = task_to_run.apply_async(args=[command], queue=queue)
    except Exception as e:
        exception_traceback = f"Celery Task ID: {key}\n{traceback.format_exc()}"
        result = ExceptionWithTraceback(e, exception_traceback)

    return key, command, result
    

当使用 SequentialExecutor时

当 Scheduler 把 task 发送给 Executor 时,同样通过 self.job.executor.heartbeat()来触发 command 执行,Executor 在 sync() 函数直接调用 subprocess.check_call(command, close_fds=True)来同步执行命令,所以 SequentialExecutor 一次只能执行一个task。

当使用 LocalExecutor 时

LocalExecutor 使用 execute_async() 启动本地进程 Process 来异步运行任务,它可以并发运行task,Airflow 把这个Process 叫做 LocalWorker。

五. task如何被worker执行

如果是使用SequentialExecutor、LocalExecutor的话,Scheduler把task command传递给 Executor 的时候,就触发执行command了,但是当使用 CeleryExecutor 的时候,只是把 task command 发送给了 celery 的 redis broker,所以现在需要启动 celery worker 来消费 broker 的 task command,下面是启动 worker 的命令:

airflow celery worker
  • airflow/cli/commands/celery_command.py 启动 worker 的入口文件,调用 worker() 函数启动worker,所谓 Worker 其实是 Celery 的工作进程,一个 Worker 根据 concurrency 启动若干个守护进程,用于任务的并发执行。
@cli_utils.action_cli
def worker(args):
    """Starts Airflow Celery worker."""
    # Disable connection pool so that celery worker does not hold an unnecessary db connection
    settings.reconfigure_orm(disable_connection_pool=True)
    if not settings.validate_session():
        raise SystemExit("Worker exiting, database connection precheck failed.")

    autoscale = args.autoscale
    skip_serve_logs = args.skip_serve_logs
    ....
    
  • 创建 celery app
app = Celery(conf.get("celery", "CELERY_APP_NAME"), config_source=celery_configuration)
  • worker经过一系列的初始化后,直接启动 worker ,因为在把command 放入到worker 的时候就已经按照 celery 的格式写入的,所以,读取的时候也按照 celery的格式直接读取
    if args.daemon:
        # Run Celery worker as daemon
        handle = setup_logging(log_file)

        with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle:
            if args.umask:
                umask = args.umask
            else:
                umask = conf.get("celery", "worker_umask", fallback=settings.DAEMON_UMASK)

            stdout_handle.truncate(0)
            stderr_handle.truncate(0)

            daemon_context = daemon.DaemonContext(
                files_preserve=[handle],
                umask=int(umask, 8),
                stdout=stdout_handle,
                stderr=stderr_handle,
            )
            with daemon_context, _serve_logs(skip_serve_logs):
                celery_app.worker_main(options)

    else:
        # Run Celery worker in the same process
        with _serve_logs(skip_serve_logs):
            celery_app.worker_main(options)
  • celery_app.worker_main(options) 就是启动worker的代码,当消费到任务后,把任务传递给 execute_command() 函数执行task,我们完全不用关系celery怎么序列化、反序列化的,我直接用它的装饰其即可。
@app.task
def execute_command(command_to_exec: CommandType) -> None:
    """Executes command."""
    dag_id, task_id = BaseExecutor.validate_airflow_tasks_run_command(command_to_exec)
    celery_task_id = app.current_task.request.id
    log.info("[%s] Executing command in Celery: %s", celery_task_id, command_to_exec)
    with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id):
        try:
            if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
                _execute_in_subprocess(command_to_exec, celery_task_id)
            else:
                _execute_in_fork(command_to_exec, celery_task_id)
        except Exception:
            Stats.incr("celery.execute_command.failure")
            raise
  • 最后 worker 通过 subprocess.check_output() 执行最终的命令。
def _execute_in_subprocess(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
    env = os.environ.copy()
    if celery_task_id:
        env["external_executor_id"] = celery_task_id
    try:
        subprocess.check_output(command_to_exec, stderr=subprocess.STDOUT, close_fds=True, env=env)
    except subprocess.CalledProcessError as e:
        log.exception("[%s] execute_command encountered a CalledProcessError", celery_task_id)
        log.error(e.output)
        msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
        raise AirflowException(msg)
        
🔗

文章推荐