大数据开发 Airflow删除taskinstance实例 Airflow删除taskinstance实例,当Airflow运行很久后(或者你有分钟级别的dag),你会发现无法打开 task instances页面,那是因为 task instances 数量太多了,需要手动删除过去无用的task instances,Airflow提供了api来实现,将其加入定时任务每天删除就行,最后不要手动操作数据库。 yuziyue 2 Apr 2024 · 1 min read
大数据开发 Airflow跨dag依赖的特殊情况 在使用跨dag依赖task的时候,通常情况下是依赖的dag的execution_date小于当前dag的execution_date的,如果依赖的task是在将来执行的(execution_date大于当前dag的execution_date),这时候该怎么办呢?假设 dag1 是每天 0:30 执行,比如 execution_date 为 2023-08-18 00:30:00 yuziyue 18 Aug 2023 · 2 min read
大数据开发 Airflow task实例状态流转及含义 理想情况下的状态流转 none =》scheduled =》 queued =》 running =》 success. 当running和queued的任务数量过大时,后续产生的任务实例都会处于scheduled状态。每次scheduler调度任务时,选出次状态的任务执行。 yuziyue 27 Jul 2023 · 1 min read
大数据开发 Airflow 中如何使用 XComs 在 airflow 中,operator 一般(not always)是原子的,也就是说,他们一般独立执行,同时也不需要和其他 operator 共享信息,如果两个 operators 需要共享信息,如 filename 之类的, 推荐将这两个 operators 组合成一个 operator。如果实在不能避免,则可以使用 XComs (cross-communication) 来实现。XComs 用来在不同tasks 之间交换信息,看下面的示例。 yuziyue 27 Jul 2023 · 1 min read
大数据开发 Airflow web界面如何只clear下游任务 我有一个任务 task1,task1 的下游有 task2 task3 两个任务, 今天这三个task都已经成功运行了。今天接到需求需要修改 task1 的逻辑。当我改好 task1 的逻辑后,通过其他方式把 task1 的任务已经跑过了,此时只需要把 task1 的下游任务刷一遍就行了。 yuziyue 10 Jul 2023 · 1 min read
大数据开发 Airflow异常task killed externally 问题场景:task 的状态莫名奇妙的成 failed 的了,task 出错没有任何运行 task 的日志,只在 Scheduler 打印的日志中看到如下信息,信息表明是 celery 的Worker 执行任务时出错了。 yuziyue 10 Jul 2023 · 1 min read
大数据开发 Airflow Scheduler源码解读 首先 Scheduler 的 DagFileProcessorAgent 进程定时(dag_dir_list_interval配置指定秒数)扫描 dags 目录下的 py 文件,解析 DAG 生成 DAG 对象以及 TASK 对象写入数据库中。 Scheduler 每秒钟从数据库读取DAG 和 task 的定义、状态、调度规则信息,判断是否执行,如需要执行,将生成 task instance 并发送到 。 yuziyue 7 Jul 2023 · 9 min read
大数据开发 Airflow清理logs日志 此时Airflow产生的日志就只有它自己的了,尽管只有Airflow自己产生的日志,但是还是非常多的,假设有一个分钟级的DAG,它里面有10个task,这个DAG每分钟就会产生 10 个目录,每个小时600个目录,一天就会产生 600 * 24 = 14400个目录,这些目录还是挺多了,如果你的task很多,那就更不用说了。 yuziyue 4 Jul 2023 · 2 min read
大数据开发 Airflow集群原理与实战部署 首先 Scheduler 定时(dag_dir_list_interval配置指定秒数)扫描 dags 目录下的 py 文件,解析 DAG 生成 DAG 对象以及 TASK 对象写入数据库中。Scheduler 每秒钟从数据库读取DAG 和 task 的定义、状态、调度规则信息,判断是否执行,如需要执行,将生成 task instance 并发送到 消息队列。 yuziyue 1 Jul 2023 · 14 min read
大数据开发 Airflow任务与DAG执行监控实践 当某个 task 执行完毕后,如果 task 失败则发送报警,在 default_args 里面配 on_failure_callback。 当某个 dag 执行完毕后,如果 dag 失败则发送报警,在 DAG 参数里面配置 on_failure_callback。如果某个 dag 一直运行着且不报错,这时我们怎么知道它是什么状态呢? yuziyue 29 Jun 2023 · 5 min read
大数据开发 Airflow 之 BranchPythonOperator BranchPythonOperator 是 Airflow 中的一种Operator,用于根据条件选择不同的分支路径。它是 PythonOperator 的一种特殊形式,用于执行 Python 可调用函数来决定任务执行的分支。使用 BranchPythonOperator 时,需要提供一个 Python 可调用函数,它将根据条件逻辑返回选择的分支路径。 yuziyue 28 Jun 2023 · 1 min read
大数据开发 Airflow 之 ShortCircuitOperator ShortCircuitOperator 用于在工作流中实现条件判断和短路功能。它的主要作用是基于一个条件表达式的结果来决定是否执行后续的任务。ShortCircuitOperator 可以在任务的执行过程中引入条件逻辑,以便根据某个条件的真假情况来控制工作流的执行路径。如果条件为真,任务将继续执行后续的任务;如果条件为假,任务将被短路,直接跳过下游的任务。 yuziyue 27 Jun 2023 · 2 min read
大数据开发 Airflow配置时区与任务调度时间 Airflow中有许多时间变量,这些时间变量可以让我们在定义任务、失败重跑时非常方便。其中最常用的也是最重要的就是 logical_date,它表示当前执行task所在的 schedule_interval 时间点的上一个 period 时间点,即通常的场景:今天处理昨天的数据。 yuziyue 27 Jun 2023 · 5 min read
大数据开发 Airflow补数与重跑backfill 在创建dag时,我们最好是保持 catchup=False,也就是不补数,因为在你首次创建dag的时候,不知道dag文件是否写对了,如果有错的话,错误的补数也就没有意义了。如果要刷的数据分区早于 dag 创建的时间,就需要手动执行,airflow 提供了 backfill 子命令来完成这个事情。 yuziyue 25 Jun 2023 · 3 min read
大数据开发 Airflow实用技巧与最佳实践 为什么选择Airflow,它方便查看任务之间的依赖关系(在dag文件中跳转查看、而不必要在图形界面中看得眼花缭乱),创建专门管理 dag 的 git 项目,这项目可能会包含多个项目的dag,可以统一部署dag管理。 yuziyue 21 Jun 2023 · 5 min read
大数据开发 Airflow单dag内多个crontab任务 假设现在有个项目,需要定义很多的分钟级任务、同时伴随着小时级任务、以及在指定的某个时间点的任务,由于一个dag里面只能配置一个 schedule_interval, 所以没办法把这个项目的所有任务放在一个dag里面。 yuziyue 21 Jun 2023 · 3 min read
大数据开发 Airflow单节点安装部署与架构 一个Airflow安装实例主要包含了这些组件,dag将所有需要运行的tasks按照依赖关系组织起来,描述了所有tasks执行的顺序。scheduler定时触发workflow即dag的运行,dags folder 用于存放所有的dag files yuziyue 20 Jun 2023 · 8 min read
大数据开发 Airflow跨Dag任务之间依赖总结 跨Dag任务依赖中,Airflow 提供了一个 ExternalTaskMarker 类,它可以告诉 dag1 里面的task,dag2 中的 task 依赖了自己,在 clear dag1 的时候,也会把 dag2 中依赖 dag1 中的 task clear 掉! yuziyue 15 Jun 2023 · 6 min read