1.普通排序
1.1sortByKey
sortByKey数据类型为k-v,且是按照key进行排序。sortByKey是局部排序,不是全局排序,如果要进行全局排序,必须将所有的数据都拉取到一台机器上面才可以
//sortByKey,按照身高进行降序排序
val height2Stu = stuRDD.map(stu => (stu.height, stu))
val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)
sorted.foreach { case (height, stu) => println(stu) }
1.2sortBy
sortBy其实是使用sortByKey来实现,但是比sortByKey更加灵活,因为sortByKey只能应用在k-v数据格式上,而这个sortBy可以用在非k-v键值对的数据格式上面。
//sortBy,按照升高降序排序
val sortedBy = stuRDD.sortBy(stu => stu.height,
ascending = true,
numPartitions = 1
)(new Ordering[Double]() {
override def compare(x: Double, y: Double) = y.compareTo(x)
},
ClassTag.Double.asInstanceOf[ClassTag[Double]])
sortedBy.foreach(println)
sortBy的操作,除了正常的升序,分区个数以外,还需需要传递一个将原始数据类型,提取其中用于排序的字段;并且提供用于比较的方式,以及在运行时的数据类型ClassTag标记为trait。
1.3takeOrdered
takeOrdered也是对rdd进行排序,但是和上述的sortByKey和sortBy相比较,takeOrdered是一个action操作,返回值为一个集合,而前两者为transformation,返回值为rdd。如果我们想在driver中获取排序之后的结果,那么建议使用takeOrdered,因为该操作边排序边返回。其实是take和sortBy的一个结合体。takeOrdered(n),获取排序之后的n条记录
//takeOrdered,先按照身高降序排序,再按年龄升序排,也叫二次排序
stuRDD.takeOrdered(3)(new Ordering[Student]() {
override def compare(x: Student, y: Student) = {
var ret = y.height.compareTo(x.height)
if (ret == 0) {
ret = x.age.compareTo(y.age)
}
ret
}
}).foreach(println)
完整代码:
package blog.p3
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.reflect.ClassTag
/**
* @Author Daniel
* @Description
* 普通排序
**/
object OrdinarySort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${OrdinarySort.getClass.getSimpleName}")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val stuRDD: RDD[Student] = sc.parallelize(List(
Student(1, "张三", 19, 168),
Student(2, "李四", 18, 175),
Student(3, "王五", 18, 176),
Student(4, "赵六", 20, 180),
Student(5, "周七", 18, 168.5)
))
//sortByKey,按照身高进行降序排序
val height2Stu = stuRDD.map(stu => (stu.height, stu))
val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)
sorted.foreach { case (height, stu) => println(stu) }
//sortBy,按照升高降序排序
val sortedBy = stuRDD.sortBy(stu => stu.height,
ascending = true,
numPartitions = 1
)(new Ordering[Double]() {
override def compare(x: Double, y: Double) = y.compareTo(x)
},
ClassTag.Double.asInstanceOf[ClassTag[Double]])
sortedBy.foreach(println)
//takeOrdered,先按照身高降序排序,再按年龄升序排,也叫二次排序
stuRDD.takeOrdered(3)(new Ordering[Student]() {
override def compare(x: Student, y: Student) = {
var ret = y.height.compareTo(x.height)
if (ret == 0) {
ret = x.age.compareTo(y.age)
}
ret
}
}).foreach(println)
sc.stop()
}
}
case class Student(id: Int, name: String, age: Int, height: Double)
2.二次排序
所谓二次排序,指的是排序字段不唯一,有多个,共同排序。
//对学生的身高和年龄依次排序
case class Person(id: Int, name: String, age: Int, height: Double) extends Ordered[Person] {
override def compare(that: Person) = {
var ret = this.height.compareTo(that.height)
if (ret == 0) {
ret = this.age.compareTo(that.age)
}
ret
}
完整代码:
package blog.p3
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author Daniel
* @Description
* 二次排序
**/
object SecondSort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${SecondSort.getClass.getSimpleName}")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val personRDD: RDD[Person] = sc.parallelize(List(
Person(1, "张三", 19, 168),
Person(2, "李四", 18, 175),
Person(3, "王五", 18, 176),
Person(4, "赵六", 20, 180),
Person(5, "周七", 18, 168)
))
personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))
sc.stop()
}
}
//对学生的身高和年龄依次排序
case class Person(id: Int, name: String, age: Int, height: Double) extends Ordered[Person] {
override def compare(that: Person) = {
var ret = this.height.compareTo(that.height)
if (ret == 0) {
ret = this.age.compareTo(that.age)
}
ret
}
}
3.TopN
TopN就是aciton算子操作中的take(N) 或者是takeOrderd(N),一般使用后者,效率高于前者。
3.1分组TopN
在分组的情况之下,获取每个组内的TopN数据。
需求数据:
chinese ls 91
english ww 56
chinese zs 90
chinese zl 76
english zq 88
字段分别为科目,姓名,成绩。要求:求出每个科目成绩排名前3的学生信息。
实现代码:
package blog.p3
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* @Author Daniel
* @Description
* 分组TopN
**/
object GroupSortTopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${GroupSortTopN.getClass.getSimpleName}")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val lines = sc.textFile("file:/F:/test/topn.txt")
//将数据转化为rdd
val course2Info: RDD[(String, String)] = lines.map(line => {
val spaceIndex = line.indexOf(" ")
val course = line.substring(0, spaceIndex)
val info = line.substring(spaceIndex + 1)
(course, info)
})
//按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总
val course2Infos: RDD[(String, Iterable[String])] = course2Info.groupByKey() //按照key进行分组
//分组内的排序
val sorted: RDD[(String, mutable.TreeSet[String])] = course2Infos.map { case (course, infos) => {
val topN = mutable.TreeSet[String]()(new Ordering[String]() {
override def compare(x: String, y: String) = {
val xScore = x.split("\\s+")(1)
val yScore = y.split("\\s+")(1)
yScore.compareTo(xScore)
}
})
for (info <- infos) {
topN.add(info)
}
(course, topN.take(3))
}
}
sorted.foreach(println)
sc.stop()
}
}
3.2优化分组TopN
上面的代码使用了groupByKey,这个算子的性能是比较差的,因为没有本地预聚合,所以进行优化。
package blog.p3
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* @Author Daniel
* @Description
*优化分组TopN
**/
object GroupSortByCombineByKeyTopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${GroupSortByCombineByKeyTopN.getClass.getSimpleName}")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val lines = sc.textFile("file:/F:/test/topn.txt")
val course2Info: RDD[(String, String)] = lines.map(line => {
val spaceIndex = line.indexOf(" ")
val course = line.substring(0, spaceIndex)
val info = line.substring(spaceIndex + 1)
(course, info)
})
val sorted = course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)
sorted.foreach(println)
sc.stop()
}
def createCombiner(info: String): mutable.TreeSet[String] = {
val ts = new mutable.TreeSet[String]()(new Ordering[String]() {
override def compare(x: String, y: String) = {
val xScore = x.split("\\s+")(1)
val yScore = y.split("\\s+")(1)
yScore.compareTo(xScore)
}
})
ts.add(info)
ts
}
//分区内排序
def mergeValue(ab: mutable.TreeSet[String], info: String): mutable.TreeSet[String] = {
ab.add(info)
if (ab.size > 3) {
ab.take(3)
} else {
ab
}
}
//分区间排序
def mergeCombiners(ab: mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {
for (info <- ab1) {
ab.add(info)
}
if (ab.size > 3) {
ab.take(3)
} else {
ab
}
}
}