Spark-core高级排序

1.普通排序

1.1sortByKey

sortByKey数据类型为k-v,且是按照key进行排序。sortByKey是局部排序,不是全局排序,如果要进行全局排序,必须将所有的数据都拉取到一台机器上面才可以

    //sortByKey,按照身高进行降序排序
    val height2Stu = stuRDD.map(stu => (stu.height, stu))
    val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)
    sorted.foreach { case (height, stu) => println(stu) }

1.2sortBy

sortBy其实是使用sortByKey来实现,但是比sortByKey更加灵活,因为sortByKey只能应用在k-v数据格式上,而这个sortBy可以用在非k-v键值对的数据格式上面。

    //sortBy,按照升高降序排序
    val sortedBy = stuRDD.sortBy(stu => stu.height,
      ascending = true,
      numPartitions = 1
    )(new Ordering[Double]() {
      override def compare(x: Double, y: Double) = y.compareTo(x)
    },
      ClassTag.Double.asInstanceOf[ClassTag[Double]])
    sortedBy.foreach(println)

sortBy的操作,除了正常的升序,分区个数以外,还需需要传递一个将原始数据类型,提取其中用于排序的字段;并且提供用于比较的方式,以及在运行时的数据类型ClassTag标记为trait。

1.3takeOrdered

takeOrdered也是对rdd进行排序,但是和上述的sortByKey和sortBy相比较,takeOrdered是一个action操作,返回值为一个集合,而前两者为transformation,返回值为rdd。如果我们想在driver中获取排序之后的结果,那么建议使用takeOrdered,因为该操作边排序边返回。其实是take和sortBy的一个结合体。takeOrdered(n),获取排序之后的n条记录

    //takeOrdered,先按照身高降序排序,再按年龄升序排,也叫二次排序
    stuRDD.takeOrdered(3)(new Ordering[Student]() {
      override def compare(x: Student, y: Student) = {
        var ret = y.height.compareTo(x.height)
        if (ret == 0) {
          ret = x.age.compareTo(y.age)
        }
        ret
      }
    }).foreach(println)

完整代码:

package blog.p3

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.reflect.ClassTag

/**
  * @Author Daniel
  * @Description
  * 普通排序
  **/
object OrdinarySort {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(s"${OrdinarySort.getClass.getSimpleName}")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)

    val stuRDD: RDD[Student] = sc.parallelize(List(
      Student(1, "张三", 19, 168),
      Student(2, "李四", 18, 175),
      Student(3, "王五", 18, 176),
      Student(4, "赵六", 20, 180),
      Student(5, "周七", 18, 168.5)
    ))

    //sortByKey,按照身高进行降序排序
    val height2Stu = stuRDD.map(stu => (stu.height, stu))
    val sorted = height2Stu.sortByKey(ascending = false, numPartitions = 1)
    sorted.foreach { case (height, stu) => println(stu) }
    //sortBy,按照升高降序排序
    val sortedBy = stuRDD.sortBy(stu => stu.height,
      ascending = true,
      numPartitions = 1
    )(new Ordering[Double]() {
      override def compare(x: Double, y: Double) = y.compareTo(x)
    },
      ClassTag.Double.asInstanceOf[ClassTag[Double]])
    sortedBy.foreach(println)
    //takeOrdered,先按照身高降序排序,再按年龄升序排,也叫二次排序
    stuRDD.takeOrdered(3)(new Ordering[Student]() {
      override def compare(x: Student, y: Student) = {
        var ret = y.height.compareTo(x.height)
        if (ret == 0) {
          ret = x.age.compareTo(y.age)
        }
        ret
      }
    }).foreach(println)
    sc.stop()
  }
}

case class Student(id: Int, name: String, age: Int, height: Double)

2.二次排序

所谓二次排序,指的是排序字段不唯一,有多个,共同排序。

//对学生的身高和年龄依次排序
case class Person(id: Int, name: String, age: Int, height: Double) extends Ordered[Person] {
  override def compare(that: Person) = {
    var ret = this.height.compareTo(that.height)
    if (ret == 0) {
      ret = this.age.compareTo(that.age)
    }
    ret
  }

完整代码:

package blog.p3

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Author Daniel
  * @Description
  * 二次排序
  **/
object SecondSort {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(s"${SecondSort.getClass.getSimpleName}")
      .setMaster("local[2]")
    val sc = new SparkContext(conf)
    val personRDD: RDD[Person] = sc.parallelize(List(
      Person(1, "张三", 19, 168),
      Person(2, "李四", 18, 175),
      Person(3, "王五", 18, 176),
      Person(4, "赵六", 20, 180),
      Person(5, "周七", 18, 168)
    ))
    personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))
    sc.stop()
  }
}

//对学生的身高和年龄依次排序
case class Person(id: Int, name: String, age: Int, height: Double) extends Ordered[Person] {
  override def compare(that: Person) = {
    var ret = this.height.compareTo(that.height)
    if (ret == 0) {
      ret = this.age.compareTo(that.age)
    }
    ret
  }
}

3.TopN

TopN就是aciton算子操作中的take(N) 或者是takeOrderd(N),一般使用后者,效率高于前者。

3.1分组TopN

在分组的情况之下,获取每个组内的TopN数据。

需求数据:

chinese ls 91
english ww 56
chinese zs 90
chinese zl 76
english zq 88

字段分别为科目,姓名,成绩。要求:求出每个科目成绩排名前3的学生信息。

实现代码:

package blog.p3

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
  * @Author Daniel
  * @Description
  * 分组TopN
  **/
object GroupSortTopN {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(s"${GroupSortTopN.getClass.getSimpleName}")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)

    val lines = sc.textFile("file:/F:/test/topn.txt")

    //将数据转化为rdd
    val course2Info: RDD[(String, String)] = lines.map(line => {
      val spaceIndex = line.indexOf(" ")
      val course = line.substring(0, spaceIndex)
      val info = line.substring(spaceIndex + 1)
      (course, info)
    })
    //按照科目排序,指的是科目内排序,不是科目间的排序,所以需要把每个科目的信息汇总
    val course2Infos: RDD[(String, Iterable[String])] = course2Info.groupByKey() //按照key进行分组

    //分组内的排序
    val sorted: RDD[(String, mutable.TreeSet[String])] = course2Infos.map { case (course, infos) => {
      val topN = mutable.TreeSet[String]()(new Ordering[String]() {
        override def compare(x: String, y: String) = {
          val xScore = x.split("\\s+")(1)
          val yScore = y.split("\\s+")(1)
          yScore.compareTo(xScore)
        }
      })

      for (info <- infos) {
        topN.add(info)
      }
      (course, topN.take(3))
    }
    }
    sorted.foreach(println)
    sc.stop()
  }
}

3.2优化分组TopN

上面的代码使用了groupByKey,这个算子的性能是比较差的,因为没有本地预聚合,所以进行优化。

package blog.p3

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
  * @Author Daniel
  * @Description
  *优化分组TopN
  **/
object GroupSortByCombineByKeyTopN {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(s"${GroupSortByCombineByKeyTopN.getClass.getSimpleName}")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)

    val lines = sc.textFile("file:/F:/test/topn.txt")

    val course2Info: RDD[(String, String)] = lines.map(line => {
      val spaceIndex = line.indexOf(" ")
      val course = line.substring(0, spaceIndex)
      val info = line.substring(spaceIndex + 1)
      (course, info)
    })

    val sorted = course2Info.combineByKey(createCombiner, mergeValue, mergeCombiners)
    sorted.foreach(println)
    sc.stop()
  }

  def createCombiner(info: String): mutable.TreeSet[String] = {
    val ts = new mutable.TreeSet[String]()(new Ordering[String]() {
      override def compare(x: String, y: String) = {
        val xScore = x.split("\\s+")(1)
        val yScore = y.split("\\s+")(1)
        yScore.compareTo(xScore)
      }
    })
    ts.add(info)
    ts
  }

  //分区内排序
  def mergeValue(ab: mutable.TreeSet[String], info: String): mutable.TreeSet[String] = {
    ab.add(info)
    if (ab.size > 3) {
      ab.take(3)
    } else {
      ab
    }
  }

  //分区间排序
  def mergeCombiners(ab: mutable.TreeSet[String], ab1: mutable.TreeSet[String]): mutable.TreeSet[String] = {
    for (info <- ab1) {
      ab.add(info)
    }
    if (ab.size > 3) {
      ab.take(3)
    } else {
      ab
    }
  }
}
发布了101 篇原创文章 · 获赞 265 · 访问量 1万+

猜你喜欢

转载自blog.csdn.net/a805814077/article/details/103277259