已复制
全屏展示
复制代码

spark rdd 概念与实战


· 5 min read

一. rdd 定义

Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。用户可以使用两种方法创建 RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比如 list 和 set)。

vallines=sc.textFile("/etc/profile")

//转化操作
valstartsWithExport=lines.filter(_.startsWith("export"))

//行动操作
valfirstLine=lines.first

//持久化到内存
lines.persist()

RDD 支持两种类型的操作:转化操作(transformation)和行动操作(action)。转化操作会由一个 RDD 生成一个新的 RDD,比如这里的startsWithExport,另一方面,行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。

默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 rdd.persist() 让 Spark 把这个 RDD 缓存下来。在实际操作中,你会经常用 persist()来把数据的一部分读取到内存中,并反复查询这部分数据。

rdd总结
  • 概念:rdd是分布式、弹性、可容错的抽象数据集,是一个不可变的、有多个分区的、可以并行的计算集合,rdd中不保存数据,保存的是元数据,即描述以后数据怎么读取、调用了什么方法、传入了什么函数、依赖关系等等。
  • 有多个分区,分区数量决定任务并行数
  • 一个功能函数作用在分区上,函数决定计算逻辑
  • rdd和rdd存在依赖关系,可以根据依赖关系恢复失败的任务和划分stage
  • 如果要发生 shuffle,要使用分区器,默认使用hashpatitioner,分区器决定数据到下游哪个分区
  • 最优位置,即将 executor 调度到数据所在节点上,要求 Worker 和 DataNode 部署在同一个节点上

二. 创建 rdd

Spark 提供了两种创建 RDD 的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。

// 在驱动器程序中对一个集合进行并行化创建RDD
vallines=sc.parallelize(List("pandas","ilikepandas"))

// 从外部存储中读取数据来创建RDD。
vallines=sc.textFile("/path/to/README.md")


sc.makeRDD(List(1,2,3,4,5))

三. 操作 rdd

  • 转化操作:RDD 的转化操作是返回新 RDD 的操作。通过转化操作,你从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系。Spark 需要用这些信息来按需计算每个 RDD,也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据。
  • 行动操作:行动操作是第二种类型的 RDD 操作,它们会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的 RDD 的转化操作。

RDD 还有一个 collect() 函数,可以用来获取整个 RDD 中的数据。如果你的程序把 RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时,就可以使用它。记住,只有当你的整个数据集能在单台机器的内存中放得下时,才能使用 collect(),因此,collect() 不能用在大规模数据集上。

在大多数情况下,你可以使用 saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把 RDD 的数据内容以各种自带的格式保存到分布式的存储系统中。

// 只取前十行数据, take行动算子
lines.take(10).foreach(println)


// 相当于 1 + 2 + 3 + 4
scala> sc.parallelize(List(1,2,3,4)).reduce((x, y) => x+y)
res2: Int = 10

// 相当于0 + 1 + 2 + 3 + 4这里的初始值不能是其他值,根据元素类型和操作类型值来确定
// 例如 + 对应的 0,* 对应的 1,或拼接操作对应的空列表
scala> sc.parallelize(List(1,2,3,4)).fold(0)((x, y) => x + y)
res12: Int = 10
scala> sc.parallelize(List(1,2,3,4)).fold(1)((x, y) => x * y)
res20: Int = 24

四. rdd 广播

  • 使用场景:如果有部分数据集(A=rdd1)量很小,这部分数据需要和另一部分量大的数据集(B=rdd2)进行关联,这是在某些分区上必然会出现关联不上的情况,这是就会出现 shuffle 了。
  • 解决方法:为了避免 shuffle,可以将小量的数据集在每一executor保留一份全部的数据,这时大数据集就可以关联上了。
  • 广播原理:将变量发到某一个 executor,然后其他的 executor 可以到这个executor 里面取。
  • 广播优点:将小量的数据在每一个 executor 都放一份,避免了 shuffle。
  • 将driver内的数据广播到 executor 里面去
  • 一旦广播出去,广播变量则不可修改
// 大数据集
val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

// 小数据集
val nums = List(5, 10)

// 将小数据集广播到 executor 里面,这是一个阻塞方法
val numsRef: Broadcast[List[Int]] = sc.broadcast(nums)

// rdd1 使用 nums 的广播变量的引用
rdd1.filter(x => {
  val numsValue: List[Int] = numsRef.value
  numsValue.contains(x)
}).foreach(println)

文章推荐