已复制
全屏展示
复制代码

Spark 所有排序方法总结


· 2 min read

排序方法1

  • 定义case class,在内部重写需要排序的字段
  • 然后使用sortBy(x => x)
case class Person(name: String, age: Int, score: Int) extends Ordered[Person] {
  override def compare(o: Person): Int = {
    if (this.score == o.score) {
      this.age - o.age
    } else {
      this.score - o.score
    }
  }
}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("yzy").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List("zhang,30,99", "wang,40,99", "zao,22,23"))
    val res = rdd1.map(line => {
      val fields: Array[String] = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val score = fields(2).toInt
      Person(name, age, score)
    }).sortBy(x => x)
      .collect()

    println(res.toBuffer)

  }
}

// ArrayBuffer(Person(zao,22,23), Person(zhang,30,99), Person(wang,40,99))

排序方法2

case class Person(name: String, age: Int, score: Int)

// 存放所有的排序utils
object OrderingContext {

  // 隐式的参数
  implicit object OrderingPerson extends Ordering[Person] {
    override def compare(x: Person, y: Person): Int = {
      if (x.score == y.score) {
        x.age - x.age
      } else {
        x.score - y.score
      }
    }
  }

}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("yzy").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List("zhang,30,99", "wang,40,99", "zao,22,23"))
    val rdd2 = rdd1.map(line => {
      val fields: Array[String] = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val score = fields(2).toInt
      Person(name, age, score)
    })

    // 直接导入即可
    import OrderingContext.OrderingPerson
    val res = rdd2.sortBy(x => x).collect()
    
    println(res.toBuffer)

  }
}

排序方法3

  • 方法3和方法2类似
case class Person(name: String, age: Int, score: Int)

// 存放所有的排序utils
object OrderingContext {

  // 隐式变量
  implicit val orderPerson: Ordering[Person] = new Ordering[Person] {
    override def compare(x: Person, y: Person): Int = {
      if (x.score == y.score) {
        x.age - x.age
      } else {
        x.score - y.score
      }
    }
  }

}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("yzy").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List("zhang,30,99", "wang,40,99", "zao,22,23"))
    val rdd2 = rdd1.map(line => {
      val fields: Array[String] = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val score = fields(2).toInt
      Person(name, age, score)
    })

    import OrderingContext.orderPerson
    val res = rdd2.sortBy(x => x).collect()

    println(res.toBuffer)

  }
}

排序方法4

推荐

  • 指定一个 tuple,安装tuple给定的字段排序
  • 如果需要倒序,使用负号
case class Person(name: String, age: Int, score: Int)

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("yzy").setMaster("local")
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List("zhang,30,99", "wang,40,99", "zao,22,23"))
    val rdd2 = rdd1.map(line => {
      val fields: Array[String] = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val score = fields(2).toInt
      Person(name, age, score)
    })

    val res = rdd2.sortBy(x => (x.score, -x.age)).collect()
    println(res.toBuffer)
  }
}
🔗

文章推荐