大数据开发 Airflow异常task killed externally 问题场景:task 的状态莫名奇妙的成 failed 的了,task 出错没有任何运行 task 的日志,只在 Scheduler 打印的日志中看到如下信息,信息表明是 celery 的Worker 执行任务时出错了。 yuziyue 10 Jul 2023 · 1 min read
大数据开发 Airflow Scheduler源码解读 首先 Scheduler 的 DagFileProcessorAgent 进程定时(dag_dir_list_interval配置指定秒数)扫描 dags 目录下的 py 文件,解析 DAG 生成 DAG 对象以及 TASK 对象写入数据库中。 Scheduler 每秒钟从数据库读取DAG 和 task 的定义、状态、调度规则信息,判断是否执行,如需要执行,将生成 task instance 并发送到 。 yuziyue 7 Jul 2023 · 9 min read
大数据开发 Airflow清理logs日志 此时Airflow产生的日志就只有它自己的了,尽管只有Airflow自己产生的日志,但是还是非常多的,假设有一个分钟级的DAG,它里面有10个task,这个DAG每分钟就会产生 10 个目录,每个小时600个目录,一天就会产生 600 * 24 = 14400个目录,这些目录还是挺多了,如果你的task很多,那就更不用说了。 yuziyue 4 Jul 2023 · 2 min read
大数据开发 Airflow集群原理与实战部署 首先 Scheduler 定时(dag_dir_list_interval配置指定秒数)扫描 dags 目录下的 py 文件,解析 DAG 生成 DAG 对象以及 TASK 对象写入数据库中。Scheduler 每秒钟从数据库读取DAG 和 task 的定义、状态、调度规则信息,判断是否执行,如需要执行,将生成 task instance 并发送到 消息队列。 yuziyue 1 Jul 2023 · 14 min read
大数据开发 Airflow任务与DAG执行监控实践 当某个 task 执行完毕后,如果 task 失败则发送报警,在 default_args 里面配 on_failure_callback。 当某个 dag 执行完毕后,如果 dag 失败则发送报警,在 DAG 参数里面配置 on_failure_callback。如果某个 dag 一直运行着且不报错,这时我们怎么知道它是什么状态呢? yuziyue 29 Jun 2023 · 5 min read
大数据开发 Airflow 之 BranchPythonOperator BranchPythonOperator 是 Airflow 中的一种Operator,用于根据条件选择不同的分支路径。它是 PythonOperator 的一种特殊形式,用于执行 Python 可调用函数来决定任务执行的分支。使用 BranchPythonOperator 时,需要提供一个 Python 可调用函数,它将根据条件逻辑返回选择的分支路径。 yuziyue 28 Jun 2023 · 1 min read
大数据开发 Airflow 之 ShortCircuitOperator ShortCircuitOperator 用于在工作流中实现条件判断和短路功能。它的主要作用是基于一个条件表达式的结果来决定是否执行后续的任务。ShortCircuitOperator 可以在任务的执行过程中引入条件逻辑,以便根据某个条件的真假情况来控制工作流的执行路径。如果条件为真,任务将继续执行后续的任务;如果条件为假,任务将被短路,直接跳过下游的任务。 yuziyue 27 Jun 2023 · 2 min read
大数据开发 Hive Spark 数据类型总结 struct 相对于map 来说,它有固定的字段及类型。map可以存放任何数量的key-value,而struct只能存放提前定义好的字段,类似其他编程语言的对象的。decimal(m,n) 表示数字总长度为 m 位,小数位为 n 位,那么整数位就只有 m-n 位了,在 hive 中默认值为 decimal(10,0) yuziyue 27 Jun 2023 · 3 min read
大数据开发 Airflow配置时区与任务调度时间 Airflow中有许多时间变量,这些时间变量可以让我们在定义任务、失败重跑时非常方便。其中最常用的也是最重要的就是 logical_date,它表示当前执行task所在的 schedule_interval 时间点的上一个 period 时间点,即通常的场景:今天处理昨天的数据。 yuziyue 27 Jun 2023 · 5 min read
大数据开发 Spark history server 部署 我们知道,在提交 spark 任务后,可以通过 SparkUI 的地址 http://:4040 查看应用的状态、实时日志等信息,但是一旦应用运行完成或者停止,我们就查看不了这些信息了,这时我们可以配置 Spark history server,它可以将历史 app 的日志放到指定hdfs 目录,然后通过 http 接口开放出来。 yuziyue 26 Jun 2023 · 4 min read
大数据开发 Airflow补数与重跑backfill 在创建dag时,我们最好是保持 catchup=False,也就是不补数,因为在你首次创建dag的时候,不知道dag文件是否写对了,如果有错的话,错误的补数也就没有意义了。如果要刷的数据分区早于 dag 创建的时间,就需要手动执行,airflow 提供了 backfill 子命令来完成这个事情。 yuziyue 25 Jun 2023 · 3 min read
大数据开发 Airflow线上任务丢失总结 我们有一个Airflow集群,发现到 1 点的时候有些任务就延迟到早上 7 点以后才开始执行,有些任务更严重,在 1 点到 7 点的任务直接丢失,7点后才开始正常调度,这是为什么呢,下面下面我用简单的例子讲述一下为什么会发生这种情况,以及怎么解决这个问题。 yuziyue 25 Jun 2023 · 7 min read
大数据开发 spark-submit日志级别设置 一. 在 sparkContext 代码中设置 在代码设置的日志级别只能控制 executor 的日志级别,driver 的日志还是照常输出,二. 在 log4j.properties 文件中设置,日志级别 yuziyue 23 Jun 2023 · 1 min read
大数据开发 Spark 打印加载的所有Jar包 使用场景:有时spark程序加载依赖的 jar 包太多,有可能出现了冲突,可以打印出所有已经加载jar包排查问题。 yuziyue 23 Jun 2023 · 1 min read
大数据开发 spark rdd 概念与实战 Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。用户可以使用两种方法创建 RDD:读取一个外部数据集 yuziyue 23 Jun 2023 · 5 min read
大数据开发 spark rdd 持久化数据 我们在操作 rdd 的过程中,避免多次计算同一个 rdd,我们可以使用spark提供的持久化功能,一个rdd若果已经持久化过了,就不需要再次计算了,直接从内存的持久化中获取结果就行,如果持久化的数据过多,内存放不下,spark会使用LRU策略把最老的缓存清除掉。 yuziyue 23 Jun 2023 · 2 min read
大数据开发 Spark架构与核心概念及快速上手 首先Spark是一种计算引擎,它基于MapReduce计算模型扩展,复杂的计算任务主要在内存中进行,其速度远快于MapReduce,我们通常所说的Spark是运行 Yarn 资源管理器上的即 Spark on Yarn,从上层应用来看,每个Spark应用都有一个驱动器程序(driver)和一个执行器程序(executor)组成。 yuziyue 23 Jun 2023 · 8 min read
大数据开发 大数据集群快速同步配置 在大数据集群中,所有节点的配置必须保持一致,一旦修改了某些配置,此时就需要把配置同步到其他节点,我们通常是用scp来同步配置的,此时需要写绝对路径才能scp,比如看下面的示例,我们可以利用 rsync 和 ssh,实现文件同步和远程执行命令。 yuziyue 22 Jun 2023 · 2 min read