已复制
全屏展示
复制代码

Spark实战最常用算子合集详解


· 16 min read

Spark实战最常用算子合集,包含最常用的spark算子使用方法,每个算子都有使用示例。实际上spark的很多算子都是基于几个原始算子封装而来,文中也会对几个非常重要,非常底层的核心算子做详细解释,其中也会有源码讲解。

map

map 不存在 shuffle,直接在每个分区中计算

// 每处理一个元素,调用一次传入的函数。
// 在函数内部可以获取分区的详细信息,比如分区 ID
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val result: RDD[(Int, Int)] = nums.map(x => {
  val index: Int = TaskContext.get().partitionId()
  (index, x * 100)
})
result.foreach(println)

mapPartitions

  • map是作用于每一个元素上
  • mapPartitions是作用于每一个分区上
  • 如果要在分区级别处理数据时,可以使用mapPartitions
  • mapPartitions是一个transformation,foreachPartition 是一个action
  • mapPartitions(iterator, preservesPartitioning) 其中preservesPartitioning 表示是否保留上一个RDD的分区器
// 调用 mapPartitions 方法,可以将数据以分区为单位取出来,一个分区就是一个迭代器
// 场景:比如在每个分区只需要一个数据库连接时,
// 避免在 map 里面多次创建数据库连接,需要使用 mapPartitions
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val result2: RDD[(Int, Int)] = nums.mapPartitions(it => {
    val index: Int = TaskContext.get().partitionId()
    // 创建数据库连接
    val newIt: Iterator[(Int, Int)] = it.map(x => {
        // if (!it.hasNext): 关闭数据库连接
        (index, x * 100)
    })
    newIt
})
result2.foreach(println)


// 在 mapPartitions 里面可以写任意的转换函数(map、filter等)
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val result3: RDD[Int] = nums.mapPartitions(it => {
    val index: Int = TaskContext.get().partitionId()
    val newIt: Iterator[Int] = it.filter(_ % 2 == 0)
    newIt
})
result3.foreach(println)


// 或者创建自定义的 Iterator
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val result3: RDD[Int] = nums.mapPartitions(it => {
    val newIt: Iterator[Int] = Iterator(100, 200, 300)
    newIt
})
result3.foreach(println)

mapPartitionsWithIndex

mapPartitions里面如果要获取分区 ID 的话,需要TaskContext.get().partitionId() 获取,mapPartitionsWithIndex可以直接将分区ID传入。

val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
nums.mapPartitionsWithIndex((index: Int, it: Iterator[Int]) => {
    // it 表示一个分区的迭代器
    it.map(element => index + ":" + element)
}).foreach(println)

filter

filter不存在shuffle,在每个分区中filter

// 取出偶数
val nums: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
nums.filter(x => x %2 == 0).foreach(println)

flatMap

先在每个分区中计算map,然后在每个分区中flat


val lines = sc.parallelize(Array("java spark", "java scala"))
val words = lines.flatMap(x => x.split(" ")).collect
// Array(java, spark, java, scala)

val lines = sc.parallelize(Array("java spark", "java scala"))
val words = lines.flatMap(x => x).collect
// Array(j, a, v, a,  , s, p, a, r, k, , j, a, v, a, , s, c, a, l, a)

groupByKey

  • 下游的 task 到上游的去拉取数据
  • groupByKey传入分区数,那么 hashpartionner 计算 key 落到的分区就会发生变化
  • 首先在局部分组,然后全局分组。其中会用到 CompactBuffer 用于存储 key 对应的 value。
val words: RDD[String] = sc.parallelize(List(
  "spark", "hadoop", "hive", "spark",
  "spark", "flink", "spark", "hbase",
  "kafka", "kafka", "kafka", "kafka",
  "hadoop", "flink", "hive", "flink"), 4)

val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
 // 可以传入下游 RDD 的分区数
val grouped: RDD[(String, Iterable[Int])] = wordAndOne.groupByKey()

grouped.saveAsTextFile("output/words")

// (hive,CompactBuffer(1, 1))
// (flink,CompactBuffer(1, 1, 1))
// (spark,CompactBuffer(1, 1, 1, 1))
// (hadoop,CompactBuffer(1, 1))
// (hbase,CompactBuffer(1))
// (kafka,CompactBuffer(1, 1, 1, 1))
  • 使用 ShuffledRDD 实现跟 groupByKey 效果相同的功能
val words: RDD[String] = sc.parallelize(List(
  "spark", "hadoop", "hive", "spark",
  "spark", "flink", "spark", "hbase",
  "kafka", "kafka", "kafka", "kafka",
  "hadoop", "flink", "hive", "flink"), 4)

val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

// String  对应key
// Int  对应value
// ArrayBuffer[Int]  对应value的集合
val shuffledRDD = new ShuffledRDD[String, Int, ArrayBuffer[Int]](
  wordAndOne,
  new HashPartitioner(wordAndOne.partitions.length)
)

// 分区局部创建ArrayBuffer将组内第一个value存入
val createCombiner = (x: Int) => ArrayBuffer(x)

// 分区局部将key相同的放在buf里面(局部分组append)
val mergeValue = (buf: ArrayBuffer[Int], v: Int) => buf += v

// 全局将 key 相同的 buf 合并
val mergeCombiners = (a1: ArrayBuffer[Int], a2: ArrayBuffer[Int]) => a1 ++= a2

shuffledRDD.setAggregator(new Aggregator[String, Int, ArrayBuffer[Int]](
  createCombiner,      // 该函数在 map side 阶段(shuffle write)
  mergeValue,          // 该函数在 map side 阶段(shuffle write)
  mergeCombiners       // 该函数在 shuffle read 阶段
))
shuffledRDD.setMapSideCombine(false) // 分组不能在 map side 合并

shuffledRDD.collect

groupBy

  • groupBy更加灵活,它的 value 包含了原始数据,所以传输的数据会更大。
  • groupBy底层调用了 groupByKey,即可以理解成 wordAndOne.map(x => (x._1, x)).groupByKey()
val words: RDD[String] = sc.parallelize(List(
  "spark", "hadoop", "hive", "spark",
  "spark", "flink", "spark", "hbase",
  "kafka", "kafka", "kafka", "kafka",
  "hadoop", "flink", "hive", "flink"), 4)

val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
val grouped: RDD[(String, Iterable[(String, Int)])] = wordAndOne.groupBy(_._1)


// (hive,CompactBuffer((hive,1), (hive,1)))
// (flink,CompactBuffer((flink,1), (flink,1), (flink,1)))
// (spark,CompactBuffer((spark,1), (spark,1), (spark,1), (spark,1)))
// (hadoop,CompactBuffer((hadoop,1), (hadoop,1)))
// (hbase,CompactBuffer((hbase,1)))
// (kafka,CompactBuffer((kafka,1), (kafka,1), (kafka,1), (kafka,1)))

reduce

  • 分区内、分区间的计算方式相同
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd1.reduce(_ + _)
  • 二维数组求平均值,求每一列的平均值
val list: List[List[Double]] = List(
      List(1, 2, 3, 4, 5),
      List(10, 20, 30, 40, 50),
      List(100, 200, 300, 400, 500)
    )
val avgList = list.reduce((x, y) => x.zip(y).map(x => x._1 + x._2)).map(x => x / list.length)

scala> avgList
res22: List[Double] = List(37.0, 74.0, 111.0, 148.0, 185.0)

reduceByKey

  • reduceByKey 和 grupByKey 的性能对比:reduceByKey性能更好,因为 groupByKey 会拉取更多的数据,reduceByKey 先局部计算,然后全局计算。
val words: RDD[String] = sc.parallelize(List(
  "spark", "hadoop", "hive", "spark",
  "spark", "flink", "spark", "hbase",
  "kafka", "kafka", "kafka", "kafka",
  "hadoop", "flink", "hive", "flink"), 4)

val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
wordAndOne.reduceByKey(_ + _)

// 等价于:v1 v2, v1一个是已经累加的后的数,v2是准备要累加的数
wordAndOne.reduceByKey((v1, v2) => v1 + v2)


// Array((hive,2), (flink,3), (spark,4), (hadoop,2), (hbase,1), (kafka,4))
  • 使用 ShuffledRDD 实现 reduceByKey 的功能
val words: RDD[String] = sc.parallelize(List(
  "spark", "hadoop", "hive", "spark",
  "spark", "flink", "spark", "hbase",
  "kafka", "kafka", "kafka", "kafka",
  "hadoop", "flink", "hive", "flink"), 4)
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

val f1 = (x: Int) => x // 局部分区的第一个value
val f2 = (m: Int, n: Int) => m + n // 局部分区的相同key相加
val f3 = (a: Int, b: Int) => a + b // 全局的相同key相加
val shuffledRDD = new ShuffledRDD[String, Int, Int](
  wordAndOne,
  new HashPartitioner(wordAndOne.partitions.length)
)
shuffledRDD.setMapSideCombine(true)
shuffledRDD.setAggregator(new Aggregator[String, Int, Int](f1, f2, f3))

shuffledRDD.collect

// Array((hive,2), (flink,3), (spark,4), (hadoop,2), (hbase,1), (kafka,4))

combineByKey

  • 使用 combineByKey 实现 reduceByKey
val words: RDD[String] = sc.parallelize(List(
  "spark", "hadoop", "hive", "spark",
  "spark", "flink", "spark", "hbase",
  "kafka", "kafka", "kafka", "kafka",
  "hadoop", "flink", "hive", "flink"), 4)

val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

val f1 = (x: Int) => x // 局部分区的第一个value
val f2 = (m: Int, n: Int) => m + n // 局部分区的相同key相加
val f3 = (a: Int, b: Int) => a + b // 全局的相同key相加
val reduced: RDD[(String, Int)] = wordAndOne.combineByKey(f1, f2, f3)

// Array((hive,2), (flink,3), (spark,4), (hadoop,2), (hbase,1), (kafka,4))

foldByKey

  • 和 reduceByKey 的区别就是可以指定初始值。
  • 注意:初始值表示只在局部分区使用一次。
val words: RDD[String] = sc.parallelize(List(
  "spark", "hadoop", "hive", "spark",
  "spark", "flink", "spark", "hbase",
  "kafka", "kafka", "kafka", "kafka",
  "hadoop", "flink", "hive", "flink"), 4)

val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
wordAndOne.foldByKey(100)(_ + _)

aggregateByKey

  • 注意:初始值表示只在局部分区使用一次。
  • 手动指定局部聚合、全局聚合的函数。
val words: RDD[String] = sc.parallelize(List(
  "spark", "hadoop", "hive", "spark",
  "spark", "flink", "spark", "hbase",
  "kafka", "kafka", "kafka", "kafka",
  "hadoop", "flink", "hive", "flink"), 4)

val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
wordAndOne.aggregateByKey(0)(_ + _, _ + _)  // 分别是局部聚合函数、全局聚合函数

aggregate

  • aggregate 是一个行动算子。
  • aggregate 的初始值会在每个分区内使用一次,在全局合并的时候用一次。
  • aggregateByKey 的初始值只会在每个分区内使用一次。
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
val value1: Int = rdd1.aggregate(0)((x, y) => x + y, (a, b) => a + b)    // 21
val value2: Int = rdd1.aggregate(100)((x, y) => x + y, (a, b) => a + b)  // 321
// (x, y) => x + y 表示分区内计算,x表示初始值或者中间结果
// (a, b) => a + b 表示分区间计算,a表示初始值或者中间结果

cogroup

  • groupByKey: 表示将一个 rdd 的相同 key 的数据聚合到同一个分区的同一个组 buf
  • cogroup: 表示将多个 rdd 的相同 key 的数据聚合到同一个机器的同一个分区的同一个组
  • cogroup 会生成 CoGroupedRDD,上游的分区会不确定的进入到下游的不同分区(shuffle)
  • 两个RDD进行cogroup会产生3个stage,如下图所示
// buf,buf的value的多个迭代器
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
val rdd3 = rdd1.cogroup(rdd2)
val result = rdd3.mapValues(t => t._1.sum + t._2.sum).collect  // 求出每个单词的累计总和
// rdd3 的分区数为 rdd1 和 rdd2 中分区数的最大值

// result 的结果为
// Array((tom,4), (kitty,2), (jerry,5), (shuke,2))

// rdd3 结果如下
//    Array(
//      (tom,  (CompactBuffer(1, 2), CompactBuffer(1))),
//      (kitty,(CompactBuffer(2), CompactBuffer())),
//      (jerry,(CompactBuffer(3), CompactBuffer(2))),
//      (shuke,(CompactBuffer(), CompactBuffer(2)))
//    )
// Array类型为 Array[(String, (Iterable[Int], Iterable[Int]))]
// String: 相同的Key 
// 第一个Iterable:第一个RDD
// 第二个Iterable:第二个RDD

mapValues

  • 原RDD中的Key保持不变,value会变成新的Value,然后一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
rdd1.mapValues(t => t*10).collect

// 结果为
Array[(String, Int)] = Array((tom,10), (tom,20), (jerry,30), (kitty,20))

flatMapValues

  • 该函数只适用于元素为KV对的RDD。
  • 原RDD中的Key保持不变,value会变成新的Value,然后被flat。
  • flatMapValues传入的函数中,参数表示key对应的value
val rdd0 = sc.parallelize(List(("a", Array(1, 2, 3)), ("b", Array(4, 5, 6))))

rdd0.flatMapValues(x => x)
// Array((a,1), (a,2), (a,3), (b,4), (b,5), (b,6))

rdd0.flatMapValues(x => x.map(_*10)).collect
// Array((a,10), (a,20), (a,30), (b,40), (b,50), (b,60))

distinct

  • distinct先在分区内部去重,然后全局去重
  • 可以根据实际情况,增加分区数可以提高计算速度
  • 主要实现源码
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  • 使用方法
val nums: RDD[Int] = sc.parallelize(List(1, 2, 1, 3, 4, 1, 5, 3, 3, 5, 5, 4), 4)
nums.distinct().foreach(println)

// 自己实现distinct
val nums: RDD[Int] = sc.parallelize(List(1, 2, 1, 3, 4, 1, 5, 3, 3, 5, 5, 4), 4)
nums.map((_, null)).reduceByKey((x, _) => x).map(_._1).foreach(println)

intersection

  • 计算交集的两个rdd必须要数据类型相同
val rdd1 = sc.parallelize(List("a", "b", "c", "d"), 2)
val rdd2 = sc.parallelize(List("c", "d", "e", "f"), 2)
rdd1.intersection(rdd2).foreach(println)
  • 实现源码:主要是使用cogroup将多个rdd的相同key收集到同一个元组里面,如果都不为空,则存在交集
def intersection(other: RDD[T]): RDD[T] = withScope {
  this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
    .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
    .keys
}

join

  • 相当于 SQL 中的 inner jon
  • join 必须是key-value类型的rdd
  • join 底层使用cogroup
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
result.foreach(println)
// 结果:
//    (tom,(1,1))
//    (tom,(2,1))
//    (jerry,(3,2))
  • 关键源码
this.cogroup(other, partitioner).flatMapValues( pair =>
  for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
  • 实现join的scala语法关键在双层for循环,如果只有x,没有y,则不会yield,如下
scala> for(x <- List(1,2,3); y <- List(4)) yield (x, y)
res24: List[(Int, Int)] = List((1,4), (2,4), (3,4))

scala> for(x <- List(1,2,3); y <- List()) yield (x, y)
res25: List[(Int, Nothing)] = List()
  • 手动实现join
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
val result: RDD[(String, (Int, Int))] = rdd1.cogroup(rdd2).flatMapValues(x => {
  for (a <- x._1.iterator; b <- x._2.iterator) yield (a, b)
})
result.foreach(println)

leftOutJoin

  • 和 SQL 的 left out join 类似
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
rdd1.leftOuterJoin(rdd2).foreach(println)

//    (tom,  (1,Some(1)))
//    (tom,  (2,Some(1)))
//    (jerry,(3,Some(2)))
//    (kitty,(2,None))
  • rightOuterJoin,和 SQL 的 right out join 类似
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
rdd1.rightOuterJoin(rdd2).foreach(println)

// (tom,  (Some(1),1))
// (tom,  (Some(2),1))
// (jerry,(Some(3),2))
// (shuke,(None,2))
  • fullOuterJoin
val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("shuke", 2)))
rdd1.fullOuterJoin(rdd2).foreach(println)

// (tom,   (Some(1),Some(1)))
// (tom,   (Some(2),Some(1)))
// (jerry, (Some(3),Some(2)))
// (shuke, (None,   Some(2)))
// (kitty, (Some(2),None))

collect

  • collect 会出发运行 sc.runJob,按照分区顺序,依次收集executor计算的结果。
  • 如果driver的内存很小,collect只会获取部分数据

foreach

val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)

// 传入的函数在 executor 里面执行,x表示每个元素
rdd1.foreach(x => x * 10)

foreachPartition

  • mapPartitions是一个transformation,foreachPartition是一个action
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd1.foreachPartition( iterator => {
  // 创建数据库连接
  for (element <- iterator){
  // 插入数据库
  println(element)
  }
  // 关闭数据库连接
})

count

  • count 不需要shuffle
  • 先局部count,然后返回Array全局计算总和
val nums: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7), 2)
println(nums.count())
  • 自己实现count
val nums: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7), 2)
val arr: Array[Long] = sc.runJob(nums, (it: Iterator[Int]) => {
  var cnt = 0L
  while (it.hasNext) {
    cnt += 1L
    it.next()
  }
  cnt
})
println(arr.sum)

top

  • top使用有界优先队列 BoundedPriorityQueue 有更好的性能。
  • 现在每个分区球topN,然后使用reduce将数据收集到Driver求topN。
  • nums.top(2)(Ordering[Int].on[(String, Int)](v => v._2)),下面是详细解释
  • 2表示取几个数
  • Int 表示需要按照排序的类型
  • (String, Int) 表示原始输入的数据类型
  • (v => v._2)) 指定要排序的字段,如果是升序,则添加负号即可
// 倒序
val nums: RDD[Int] = sc.parallelize(List(1, 8, 3, 6, 2, 9, 5, 4, 7), 2)
val ints: Array[Int] = nums.top(2)(Ordering[Int].on[Int](v => v))
println(ints.toBuffer)  // 9 8

// 升序
val nums: RDD[Int] = sc.parallelize(List(1, 8, 3, 6, 2, 9, 5, 4, 7), 2)
val ints: Array[Int] = nums.top(2)(Ordering[Int].on[Int](v => -v))
println(ints.toBuffer)  // 1 2


// 指定按照元组的第二个元素排序,可以降序、可以升序
val nums: RDD[(String, Int)] = sc.parallelize(
  List(
    ("zhang", 9),
    ("wang", 10),
    ("zhao", 5))
)
val tuples: Array[(String, Int)] = nums.top(2)(Ordering[Int].on[(String, Int)](v => v._2))
println(tuples.toBuffer)  // ArrayBuffer((wang,10), (zhang,9))


// 或者通过隐式转换,不用传入top
implicit val order = Ordering[Int].on[(String, Int)](v => v._2)
nums.top(2)

take

  • take不排序直接获取。
  • 先从一个分区取,如果数据够了,则触发job直接返回。
  • 如果一个分区的数据不够,从第二个分区获取,触发job返回数据。
  • 依次类推直到取到指定数量

partitionBy

  • 自定义rdd分区器,即将数据分到不同的分区中的算法。
object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("myRdd").setMaster("local")
    val sc = new SparkContext(conf)
    
    val nums: RDD[(String, Int)] = sc.parallelize(List(("zhang", 9), ("zhang", 12), ("wang", 10), ("zhao", 5)))
    val partitioner = new CustomPartitioner(Array("zhang", "wang", "zhao"))
    val partitioned: RDD[(String, Int)] = nums.partitionBy(partitioner)
    println(partitioned.partitions.length) // 3 个分区
    println(partitioned.collect.toBuffer)
  }
}

// 自定义 分区器
class CustomPartitioner(val keys: Array[String]) extends Partitioner {
  val nameToNum = new mutable.HashMap[String, Int]()
  for (i <- keys.indices) {
    nameToNum(keys(i)) = i
  }
  override def numPartitions: Int = keys.length
  // 在 Executor 的 task 中,shuffle write 之前会调用
  override def getPartition(key: Any): Int = {
    val k: String = key.asInstanceOf[String]
    nameToNum(k)
  }
}

repartitionAndSortWithinPartitions

  • 需要自定义分区器。
  • 重新分区,并排序,性能高于reparation,在reparation时排序。
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.{RDD}
import scala.collection.mutable

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("myRdd").setMaster("local")
    val sc = new SparkContext(conf)

    val nums: RDD[(String, Int)] = sc.parallelize(List(("zhang", 9), ("zhang", 12), ("wang", 10), ("zhao", 5)))
    // 因为想要按照value数字排序,所以value要放在key里面,才能用 repartitionAndSortWithinPartitions
    val newNums: RDD[((String, Int), Null)] = nums.map(x => ((x._1, x._2), null))

    val partitioner = new CustomPartitioner(Array("zhang", "wang", "zhao"))
    implicit val rule = Ordering[Int].on[(String, Int)](x => -x._2)   // 降序
    val partitioned: RDD[((String, Int), Null)] = newNums.repartitionAndSortWithinPartitions(partitioner)

    partitioned.map(_x._1).foreach(println)

    //    (zhang,12)
    //    (zhang,9)
    //    (wang,10)
    //    (zhao,5)
  }
}

class CustomPartitioner(val keys: Array[String]) extends Partitioner {
  val nameToNum = new mutable.HashMap[String, Int]()
  for (i <- keys.indices) {
    nameToNum(keys(i)) = i
  }
  override def numPartitions: Int = keys.length
  // 在 Executor 的 task 中,shuffle write 之前会调用
  override def getPartition(key: Any): Int = {
    val k: (String, Int) = key.asInstanceOf[(String, Int)]   // RDD 传进来的 Key 类型
    nameToNum(k._1)
  }
}

repartition

  • 将数据使用随机数打散,重新分配
  • 重新分区,改变分区的数量,会shuffle

coalesce

  • 合并分区
  • 场景:如果想减小分区数,但是呢又不想shuffle,可以使用coalesce
// 从3个分区改成2个分区,false表示不shuffle,默认是true
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9), 3)
rdd1.coalesce(2, false).collect

sample

  • sample有三个参数
  • 第一个:抽出后是否放回(false不放回,true放回)
  • 第二个:得分(0~1之间)
  • 第三个:随机数生成器的种子(默认为Utils.random.nextLong
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
rdd1.sample(false, 0.5)

keyBy

  • 将普通rdd变成key-value形式,同时还可以对key做一些修改
  • 将传进的函数应用到value后作为key,原始的value作为value
val nums = sc.parallelize(List(1, 2, 3, 4, 5))
nums.keyBy(x => x * 100).collect
// Array((100,1), (200,2), (300,3), (400,4), (500,5))

// 相当于使用如下 map
nums.map(x => (x * 100, x)).collect

sortBy

  • 注意: sortBy是一个 transformation,但是会触发action(用作数据采样)
  • 调用 sortBy 的时候会生成3个RDD
  • sortBy计算步骤:
  • 1、首先从所有的分区中采样部分数据,得到数据的大概分布(比如0~9999)
  • 2、使用RangePatitioner将数据按照不同的范围,shuffle到不同的分区
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
rdd1.sortBy(x => x, false).collect()

// 多字段排序
val nums = sc.parallelize(List(("zhang", 100), ("wang", 90)))
nums.sortBy(x => (x._2, x._1)).collect()

sortByKey

  • 只针对key-value 的rdd
val nums = sc.parallelize(List(("zhang", 100), ("wang", 90)))
nums.sortByKey().collect       // 升序
nums.sortByKey(false).collect  // 降序

文章推荐