Spark实战最常用算子合集详解
Spark实战最常用算子合集,包含最常用的spark算子使用方法,每个算子都有使用示例。实际上spark的很多算子都是基于几个原始算子封装而来,文中也会对几个非常重要,非常底层的核心算子做详细解释,其中也会有源码讲解。
map
map 不存在 shuffle,直接在每个分区中计算
// 每处理一个元素,调用一次传入的函数。
// 在函数内部可以获取分区的详细信息,比如分区 ID
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val result: RDD[(Int, Int)] = nums.map(x => {
val index: Int = TaskContext.get().partitionId()
(index, x * 100)
})
result.foreach(println)
mapPartitions
- map是作用于每一个元素上
- mapPartitions是作用于每一个分区上
- 如果要在分区级别处理数据时,可以使用mapPartitions
- mapPartitions是一个transformation,foreachPartition 是一个action
- mapPartitions(iterator, preservesPartitioning) 其中preservesPartitioning 表示是否保留上一个RDD的分区器
// 调用 mapPartitions 方法,可以将数据以分区为单位取出来,一个分区就是一个迭代器
// 场景:比如在每个分区只需要一个数据库连接时,
// 避免在 map 里面多次创建数据库连接,需要使用 mapPartitions
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val result2: RDD[(Int, Int)] = nums.mapPartitions(it => {
val index: Int = TaskContext.get().partitionId()
// 创建数据库连接
val newIt: Iterator[(Int, Int)] = it.map(x => {
// if (!it.hasNext): 关闭数据库连接
(index, x * 100)
})
newIt
})
result2.foreach(println)
// 在 mapPartitions 里面可以写任意的转换函数(map、filter等)
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val result3: RDD[Int] = nums.mapPartitions(it => {
val index: Int = TaskContext.get().partitionId()
val newIt: Iterator[Int] = it.filter(_ % 2 == 0)
newIt
})
result3.foreach(println)
// 或者创建自定义的 Iterator
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val result3: RDD[Int] = nums.mapPartitions(it => {
val newIt: Iterator[Int] = Iterator(100, 200, 300)
newIt
})
result3.foreach(println)
mapPartitionsWithIndex
mapPartitions
里面如果要获取分区 ID 的话,需要TaskContext.get().partitionId()
获取,mapPartitionsWithIndex
可以直接将分区ID传入。
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
nums.mapPartitionsWithIndex((index: Int, it: Iterator[Int]) => {
// it 表示一个分区的迭代器
it.map(element => index + ":" + element)
}).foreach(println)
filter
filter不存在shuffle,在每个分区中filter
// 取出偶数
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
nums.filter(x => x %2 == 0).foreach(println)
flatMap
先在每个分区中计算map,然后在每个分区中flat
val lines = sc.parallelize(Array("java spark", "java scala"))
val words = lines.flatMap(x => x.split(" ")).collect
// Array(java, spark, java, scala)
val lines = sc.parallelize(Array("java spark", "java scala"))
val words = lines.flatMap(x => x).collect
// Array(j, a, v, a, , s, p, a, r, k, , j, a, v, a, , s, c, a, l, a)
groupByKey
- 下游的 task 到上游的去拉取数据
groupByKey
传入分区数,那么 hashpartionner 计算 key 落到的分区就会发生变化- 首先在局部分组,然后全局分组。其中会用到 CompactBuffer 用于存储 key 对应的 value。
val words: RDD[String] = sc.parallelize(List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
// 可以传入下游 RDD 的分区数
val grouped: RDD[(String, Iterable[Int])] = wordAndOne.groupByKey()
grouped.saveAsTextFile("output/words")
// (hive,CompactBuffer(1, 1))
// (flink,CompactBuffer(1, 1, 1))
// (spark,CompactBuffer(1, 1, 1, 1))
// (hadoop,CompactBuffer(1, 1))
// (hbase,CompactBuffer(1))
// (kafka,CompactBuffer(1, 1, 1, 1))
- 使用
ShuffledRDD
实现跟groupByKey
效果相同的功能
val words: RDD[String] = sc.parallelize(List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
// String 对应key
// Int 对应value
// ArrayBuffer[Int] 对应value的集合
val shuffledRDD = new ShuffledRDD[String, Int, ArrayBuffer[Int]](
wordAndOne,
new HashPartitioner(wordAndOne.partitions.length)
)
// 分区局部创建ArrayBuffer将组内第一个value存入
val createCombiner = (x: Int) => ArrayBuffer(x)
// 分区局部将key相同的放在buf里面(局部分组append)
val mergeValue = (buf: ArrayBuffer[Int], v: Int) => buf += v
// 全局将 key 相同的 buf 合并
val mergeCombiners = (a1: ArrayBuffer[Int], a2: ArrayBuffer[Int]) => a1 ++= a2
shuffledRDD.setAggregator(new Aggregator[String, Int, ArrayBuffer[Int]](
createCombiner, // 该函数在 map side 阶段(shuffle write)
mergeValue, // 该函数在 map side 阶段(shuffle write)
mergeCombiners // 该函数在 shuffle read 阶段
))
shuffledRDD.setMapSideCombine(false) // 分组不能在 map side 合并
shuffledRDD.collect
groupBy
- groupBy更加灵活,它的 value 包含了原始数据,所以传输的数据会更大。
- groupBy底层调用了 groupByKey,即可以理解成
wordAndOne.map(x => (x._1, x)).groupByKey()
val words: RDD[String] = sc.parallelize(List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
val grouped: RDD[(String, Iterable[(String, Int)])] = wordAndOne.groupBy(_._1)
// (hive,CompactBuffer((hive,1), (hive,1)))
// (flink,CompactBuffer((flink,1), (flink,1), (flink,1)))
// (spark,CompactBuffer((spark,1), (spark,1), (spark,1), (spark,1)))
// (hadoop,CompactBuffer((hadoop,1), (hadoop,1)))
// (hbase,CompactBuffer((hbase,1)))
// (kafka,CompactBuffer((kafka,1), (kafka,1), (kafka,1), (kafka,1)))
reduce
- 分区内、分区间的计算方式相同
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd1.reduce(_ + _)
- 二维数组求平均值,求每一列的平均值
val list: List[List[Double]] = List(
List(1, 2, 3, 4, 5),
List(10, 20, 30, 40, 50),
List(100, 200, 300, 400, 500)
)
val avgList = list.reduce((x, y) => x.zip(y).map(x => x._1 + x._2)).map(x => x / list.length)
scala> avgList
res22: List[Double] = List(37.0, 74.0, 111.0, 148.0, 185.0)
reduceByKey
- reduceByKey 和 grupByKey 的性能对比:reduceByKey性能更好,因为 groupByKey 会拉取更多的数据,reduceByKey 先局部计算,然后全局计算。
val words: RDD[String] = sc.parallelize(List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
wordAndOne.reduceByKey(_ + _)
// 等价于:v1 v2, v1一个是已经累加的后的数,v2是准备要累加的数
wordAndOne.reduceByKey((v1, v2) => v1 + v2)
// Array((hive,2), (flink,3), (spark,4), (hadoop,2), (hbase,1), (kafka,4))
- 使用 ShuffledRDD 实现 reduceByKey 的功能
val words: RDD[String] = sc.parallelize(List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
val f1 = (x: Int) => x // 局部分区的第一个value
val f2 = (m: Int, n: Int) => m + n // 局部分区的相同key相加
val f3 = (a: Int, b: Int) => a + b // 全局的相同key相加
val shuffledRDD = new ShuffledRDD[String, Int, Int](
wordAndOne,
new HashPartitioner(wordAndOne.partitions.length)
)
shuffledRDD.setMapSideCombine(true)
shuffledRDD.setAggregator(new Aggregator[String, Int, Int](f1, f2, f3))
shuffledRDD.collect
// Array((hive,2), (flink,3), (spark,4), (hadoop,2), (hbase,1), (kafka,4))
combineByKey
- 使用 combineByKey 实现 reduceByKey
val words: RDD[String] = sc.parallelize(List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
val f1 = (x: Int) => x // 局部分区的第一个value
val f2 = (m: Int, n: Int) => m + n // 局部分区的相同key相加
val f3 = (a: Int, b: Int) => a + b // 全局的相同key相加
val reduced: RDD[(String, Int)] = wordAndOne.combineByKey(f1, f2, f3)
// Array((hive,2), (flink,3), (spark,4), (hadoop,2), (hbase,1), (kafka,4))
foldByKey
- 和 reduceByKey 的区别就是可以指定初始值。
- 注意:初始值表示只在局部分区使用一次。
val words: RDD[String] = sc.parallelize(List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
wordAndOne.foldByKey(100)(_ + _)
aggregateByKey
- 注意:初始值表示只在局部分区使用一次。
- 手动指定局部聚合、全局聚合的函数。
val words: RDD[String] = sc.parallelize(List(
"spark", "hadoop", "hive", "spark",
"spark", "flink", "spark", "hbase",
"kafka", "kafka", "kafka", "kafka",
"hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
wordAndOne.aggregateByKey(0)(_ + _, _ + _) // 分别是局部聚合函数、全局聚合函数
aggregate
- aggregate 是一个行动算子。
- aggregate 的初始值会在每个分区内使用一次,在全局合并的时候用一次。
- aggregateByKey 的初始值只会在每个分区内使用一次。
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
val value1: Int = rdd1.aggregate(0)((x, y) => x + y, (a, b) => a + b) // 21
val value2: Int = rdd1.aggregate(100)((x, y) => x + y, (a, b) => a + b) // 321
// (x, y) => x + y 表示分区内计算,x表示初始值或者中间结果
// (a, b) => a + b 表示分区间计算,a表示初始值或者中间结果
cogroup
- groupByKey: 表示将一个 rdd 的相同 key 的数据聚合到同一个分区的同一个组 buf
- cogroup: 表示将多个 rdd 的相同 key 的数据聚合到同一个机器的同一个分区的同一个组
- cogroup 会生成 CoGroupedRDD,上游的分区会不确定的进入到下游的不同分区(shuffle)
- 两个RDD进行cogroup会产生3个stage,如下图所示
// buf,buf的value的多个迭代器
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
val rdd3 = rdd1.cogroup(rdd2)
val result = rdd3.mapValues(t => t._1.sum + t._2.sum).collect // 求出每个单词的累计总和
// rdd3 的分区数为 rdd1 和 rdd2 中分区数的最大值
// result 的结果为
// Array((tom,4), (kitty,2), (jerry,5), (shuke,2))
// rdd3 结果如下
// Array(
// (tom, (CompactBuffer(1, 2), CompactBuffer(1))),
// (kitty,(CompactBuffer(2), CompactBuffer())),
// (jerry,(CompactBuffer(3), CompactBuffer(2))),
// (shuke,(CompactBuffer(), CompactBuffer(2)))
// )
// Array类型为 Array[(String, (Iterable[Int], Iterable[Int]))]
// String: 相同的Key
// 第一个Iterable:第一个RDD
// 第二个Iterable:第二个RDD
mapValues
- 原RDD中的Key保持不变,value会变成新的Value,然后一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
rdd1.mapValues(t => t*10).collect
// 结果为
Array[(String, Int)] = Array((tom,10), (tom,20), (jerry,30), (kitty,20))
flatMapValues
- 该函数只适用于元素为KV对的RDD。
- 原RDD中的Key保持不变,value会变成新的Value,然后被flat。
- flatMapValues传入的函数中,参数表示key对应的value
val rdd0 = sc.parallelize(List(("a", Array(1, 2, 3)), ("b", Array(4, 5, 6))))
rdd0.flatMapValues(x => x)
// Array((a,1), (a,2), (a,3), (b,4), (b,5), (b,6))
rdd0.flatMapValues(x => x.map(_*10)).collect
// Array((a,10), (a,20), (a,30), (b,40), (b,50), (b,60))
distinct
distinct
先在分区内部去重,然后全局去重- 可以根据实际情况,增加分区数可以提高计算速度
- 主要实现源码
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
- 使用方法
val nums: RDD[Int] = sc.parallelize(List(1, 2, 1, 3, 4, 1, 5, 3, 3, 5, 5, 4), 4)
nums.distinct().foreach(println)
// 自己实现distinct
val nums: RDD[Int] = sc.parallelize(List(1, 2, 1, 3, 4, 1, 5, 3, 3, 5, 5, 4), 4)
nums.map((_, null)).reduceByKey((x, _) => x).map(_._1).foreach(println)
intersection
- 计算交集的两个rdd必须要数据类型相同
val rdd1 = sc.parallelize(List("a", "b", "c", "d"), 2)
val rdd2 = sc.parallelize(List("c", "d", "e", "f"), 2)
rdd1.intersection(rdd2).foreach(println)
- 实现源码:主要是使用cogroup将多个rdd的相同key收集到同一个元组里面,如果都不为空,则存在交集
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
join
- 相当于 SQL 中的 inner jon
- join 必须是key-value类型的rdd
- join 底层使用cogroup
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
result.foreach(println)
// 结果:
// (tom,(1,1))
// (tom,(2,1))
// (jerry,(3,2))
- 关键源码
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
- 实现join的scala语法关键在双层for循环,如果只有x,没有y,则不会yield,如下
scala> for(x <- List(1,2,3); y <- List(4)) yield (x, y)
res24: List[(Int, Int)] = List((1,4), (2,4), (3,4))
scala> for(x <- List(1,2,3); y <- List()) yield (x, y)
res25: List[(Int, Nothing)] = List()
- 手动实现join
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
val result: RDD[(String, (Int, Int))] = rdd1.cogroup(rdd2).flatMapValues(x => {
for (a <- x._1.iterator; b <- x._2.iterator) yield (a, b)
})
result.foreach(println)
leftOutJoin
- 和 SQL 的 left out join 类似
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
rdd1.leftOuterJoin(rdd2).foreach(println)
// (tom, (1,Some(1)))
// (tom, (2,Some(1)))
// (jerry,(3,Some(2)))
// (kitty,(2,None))
- rightOuterJoin,和 SQL 的 right out join 类似
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
rdd1.rightOuterJoin(rdd2).foreach(println)
// (tom, (Some(1),1))
// (tom, (Some(2),1))
// (jerry,(Some(3),2))
// (shuke,(None,2))
- fullOuterJoin
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
rdd1.fullOuterJoin(rdd2).foreach(println)
// (tom, (Some(1),Some(1)))
// (tom, (Some(2),Some(1)))
// (jerry, (Some(3),Some(2)))
// (shuke, (None, Some(2)))
// (kitty, (Some(2),None))
collect
- collect 会出发运行 sc.runJob,按照分区顺序,依次收集executor计算的结果。
- 如果driver的内存很小,collect只会获取部分数据
foreach
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
// 传入的函数在 executor 里面执行,x表示每个元素
rdd1.foreach(x => x * 10)
foreachPartition
- mapPartitions是一个transformation,foreachPartition是一个action
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd1.foreachPartition( iterator => {
// 创建数据库连接
for (element <- iterator){
// 插入数据库
println(element)
}
// 关闭数据库连接
})
count
- count 不需要shuffle
- 先局部count,然后返回Array全局计算总和
val nums: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7), 2)
println(nums.count())
- 自己实现count
val nums: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7), 2)
val arr: Array[Long] = sc.runJob(nums, (it: Iterator[Int]) => {
var cnt = 0L
while (it.hasNext) {
cnt += 1L
it.next()
}
cnt
})
println(arr.sum)
top
- top使用有界优先队列
BoundedPriorityQueue
有更好的性能。 - 现在每个分区球topN,然后使用reduce将数据收集到Driver求topN。
nums.top(2)(Ordering[Int].on[(String, Int)](v => v._2))
,下面是详细解释2
表示取几个数Int
表示需要按照排序的类型(String, Int)
表示原始输入的数据类型(v => v._2))
指定要排序的字段,如果是升序,则添加负号即可
// 倒序
val nums: RDD[Int] = sc.parallelize(List(1, 8, 3, 6, 2, 9, 5, 4, 7), 2)
val ints: Array[Int] = nums.top(2)(Ordering[Int].on[Int](v => v))
println(ints.toBuffer) // 9 8
// 升序
val nums: RDD[Int] = sc.parallelize(List(1, 8, 3, 6, 2, 9, 5, 4, 7), 2)
val ints: Array[Int] = nums.top(2)(Ordering[Int].on[Int](v => -v))
println(ints.toBuffer) // 1 2
// 指定按照元组的第二个元素排序,可以降序、可以升序
val nums: RDD[(String, Int)] = sc.parallelize(
List(
("zhang", 9),
("wang", 10),
("zhao", 5))
)
val tuples: Array[(String, Int)] = nums.top(2)(Ordering[Int].on[(String, Int)](v => v._2))
println(tuples.toBuffer) // ArrayBuffer((wang,10), (zhang,9))
// 或者通过隐式转换,不用传入top
implicit val order = Ordering[Int].on[(String, Int)](v => v._2)
nums.top(2)
take
- take不排序直接获取。
- 先从一个分区取,如果数据够了,则触发job直接返回。
- 如果一个分区的数据不够,从第二个分区获取,触发job返回数据。
- 依次类推直到取到指定数量
partitionBy
- 自定义rdd分区器,即将数据分到不同的分区中的算法。
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("myRdd").setMaster("local")
val sc = new SparkContext(conf)
val nums: RDD[(String, Int)] = sc.parallelize(List(("zhang", 9), ("zhang", 12), ("wang", 10), ("zhao", 5)))
val partitioner = new CustomPartitioner(Array("zhang", "wang", "zhao"))
val partitioned: RDD[(String, Int)] = nums.partitionBy(partitioner)
println(partitioned.partitions.length) // 3 个分区
println(partitioned.collect.toBuffer)
}
}
// 自定义 分区器
class CustomPartitioner(val keys: Array[String]) extends Partitioner {
val nameToNum = new mutable.HashMap[String, Int]()
for (i <- keys.indices) {
nameToNum(keys(i)) = i
}
override def numPartitions: Int = keys.length
// 在 Executor 的 task 中,shuffle write 之前会调用
override def getPartition(key: Any): Int = {
val k: String = key.asInstanceOf[String]
nameToNum(k)
}
}
repartitionAndSortWithinPartitions
- 需要自定义分区器。
- 重新分区,并排序,性能高于reparation,在reparation时排序。
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.{RDD}
import scala.collection.mutable
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("myRdd").setMaster("local")
val sc = new SparkContext(conf)
val nums: RDD[(String, Int)] = sc.parallelize(List(("zhang", 9), ("zhang", 12), ("wang", 10), ("zhao", 5)))
// 因为想要按照value数字排序,所以value要放在key里面,才能用 repartitionAndSortWithinPartitions
val newNums: RDD[((String, Int), Null)] = nums.map(x => ((x._1, x._2), null))
val partitioner = new CustomPartitioner(Array("zhang", "wang", "zhao"))
implicit val rule = Ordering[Int].on[(String, Int)](x => -x._2) // 降序
val partitioned: RDD[((String, Int), Null)] = newNums.repartitionAndSortWithinPartitions(partitioner)
partitioned.map(_x._1).foreach(println)
// (zhang,12)
// (zhang,9)
// (wang,10)
// (zhao,5)
}
}
class CustomPartitioner(val keys: Array[String]) extends Partitioner {
val nameToNum = new mutable.HashMap[String, Int]()
for (i <- keys.indices) {
nameToNum(keys(i)) = i
}
override def numPartitions: Int = keys.length
// 在 Executor 的 task 中,shuffle write 之前会调用
override def getPartition(key: Any): Int = {
val k: (String, Int) = key.asInstanceOf[(String, Int)] // RDD 传进来的 Key 类型
nameToNum(k._1)
}
}
repartition
- 将数据使用随机数打散,重新分配
- 重新分区,改变分区的数量,会shuffle
coalesce
- 合并分区
- 场景:如果想减小分区数,但是呢又不想shuffle,可以使用coalesce
// 从3个分区改成2个分区,false表示不shuffle,默认是true
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9), 3)
rdd1.coalesce(2, false).collect
sample
- sample有三个参数
- 第一个:抽出后是否放回(false不放回,true放回)
- 第二个:得分(0~1之间)
- 第三个:随机数生成器的种子(默认为
Utils.random.nextLong
)
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
rdd1.sample(false, 0.5)
keyBy
- 将普通rdd变成key-value形式,同时还可以对key做一些修改
- 将传进的函数应用到value后作为key,原始的value作为value
val nums = sc.parallelize(List(1, 2, 3, 4, 5))
nums.keyBy(x => x * 100).collect
// Array((100,1), (200,2), (300,3), (400,4), (500,5))
// 相当于使用如下 map
nums.map(x => (x * 100, x)).collect
sortBy
- 注意: sortBy是一个 transformation,但是会触发action(用作数据采样)
- 调用 sortBy 的时候会生成3个RDD
- sortBy计算步骤:
- 1、首先从所有的分区中采样部分数据,得到数据的大概分布(比如0~9999)
- 2、使用RangePatitioner将数据按照不同的范围,shuffle到不同的分区
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
rdd1.sortBy(x => x, false).collect()
// 多字段排序
val nums = sc.parallelize(List(("zhang", 100), ("wang", 90)))
nums.sortBy(x => (x._2, x._1)).collect()
sortByKey
- 只针对key-value 的rdd
val nums = sc.parallelize(List(("zhang", 100), ("wang", 90)))
nums.sortByKey().collect // 升序
nums.sortByKey(false).collect // 降序