Spark 所有排序方法总结
排序方法1
- 定义
case class
,在内部重写需要排序的字段 - 然后使用
sortBy(x => x)
case class Person(name: String, age: Int, score: Int) extends Ordered[Person] {
override def compare(o: Person): Int = {
if (this.score == o.score) {
this.age - o.age
} else {
this.score - o.score
}
}
}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yzy").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("zhang,30,99", "wang,40,99", "zao,22,23"))
val res = rdd1.map(line => {
val fields: Array[String] = line.split(",")
val name = fields(0)
val age = fields(1).toInt
val score = fields(2).toInt
Person(name, age, score)
}).sortBy(x => x)
.collect()
println(res.toBuffer)
}
}
// ArrayBuffer(Person(zao,22,23), Person(zhang,30,99), Person(wang,40,99))
排序方法2
case class Person(name: String, age: Int, score: Int)
// 存放所有的排序utils
object OrderingContext {
// 隐式的参数
implicit object OrderingPerson extends Ordering[Person] {
override def compare(x: Person, y: Person): Int = {
if (x.score == y.score) {
x.age - x.age
} else {
x.score - y.score
}
}
}
}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yzy").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("zhang,30,99", "wang,40,99", "zao,22,23"))
val rdd2 = rdd1.map(line => {
val fields: Array[String] = line.split(",")
val name = fields(0)
val age = fields(1).toInt
val score = fields(2).toInt
Person(name, age, score)
})
// 直接导入即可
import OrderingContext.OrderingPerson
val res = rdd2.sortBy(x => x).collect()
println(res.toBuffer)
}
}
排序方法3
- 方法3和方法2类似
case class Person(name: String, age: Int, score: Int)
// 存放所有的排序utils
object OrderingContext {
// 隐式变量
implicit val orderPerson: Ordering[Person] = new Ordering[Person] {
override def compare(x: Person, y: Person): Int = {
if (x.score == y.score) {
x.age - x.age
} else {
x.score - y.score
}
}
}
}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yzy").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("zhang,30,99", "wang,40,99", "zao,22,23"))
val rdd2 = rdd1.map(line => {
val fields: Array[String] = line.split(",")
val name = fields(0)
val age = fields(1).toInt
val score = fields(2).toInt
Person(name, age, score)
})
import OrderingContext.orderPerson
val res = rdd2.sortBy(x => x).collect()
println(res.toBuffer)
}
}
排序方法4
推荐
- 指定一个 tuple,安装tuple给定的字段排序
- 如果需要倒序,使用负号
case class Person(name: String, age: Int, score: Int)
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("yzy").setMaster("local")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("zhang,30,99", "wang,40,99", "zao,22,23"))
val rdd2 = rdd1.map(line => {
val fields: Array[String] = line.split(",")
val name = fields(0)
val age = fields(1).toInt
val score = fields(2).toInt
Person(name, age, score)
})
val res = rdd2.sortBy(x => (x.score, -x.age)).collect()
println(res.toBuffer)
}
}