1. 学科最受欢迎老师解法补充
day21中该案例的解法四还有一个问题,就是当各个老师受欢迎度是一样的时候,其排序规则就处理不了,以下是对其优化的解法
实现方式五
FavoriteTeacher5
package com._51doit.spark04 import org.apache.spark.{Partitioner, SparkConf, SparkContext} import org.apache.spark.rdd.RDD import scala.collection.mutable object FavoriteTeacher5 { def main(args: Array[String]): Unit = { val isLocal = args(0).toBoolean //创建SparkConf,然后创建SparkContext val conf = new SparkConf().setAppName(this.getClass.getSimpleName) if (isLocal) { conf.setMaster("local[*]") } val sc = new SparkContext(conf) //指定以后从哪里读取数据创建RDD val lines: RDD[String] = sc.textFile(args(1)) //对数据进行切分 val subjectTeacherAndOne = lines.map(line => { val fields = line.split("/") val subject = fields(2).split("[.]")(0) val teacher = fields(3) ((subject, teacher), 1) }) //计算所有的学科,并收集到Driver端 val subjects: Array[String] = subjectTeacherAndOne.map(_._1._1).distinct().collect() //paritioner是在Driver端被new出来的,但是他的方法是在Executor中被调用的 val partitioner = new SubjectPartitioner3(subjects) //根据指定的key和分区器进行聚合(减少一次shuffle) val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(partitioner, _+_) val topN = args(2).toInt val result = reduced.mapPartitions(it => { //定义一个key排序的集合TreeSet val sorter = new mutable.TreeSet[OrderingBean]() //遍历出迭代器中的数据 it.foreach(t => { sorter += new OrderingBean(t._1._1, t._1._2, t._2) if (sorter.size > topN) { val last = sorter.last //移除最后一个 sorter -= last } }) sorter.iterator }) val r = result.collect() println(r.toBuffer) sc.stop() } } class SubjectPartitioner3(val subjects: Array[String]) extends Partitioner { //初始化分器的分区规则 val rules = new mutable.HashMap[String, Int]() var index = 0 for(sub <- subjects) { rules(sub) = index index += 1 } override def numPartitions: Int = subjects.length //该方法会在Executor中的Task中被调用 override def getPartition(key: Any): Int = { val tuple = key.asInstanceOf[(String, String)] val subject = tuple._1 //到实现初始化的规则中查找这个学科对应的分区编号 rules(subject) } }
OrderingBean(重新定义的排序规则)
package com._51doit.spark04 import scala.collection.mutable.ArrayBuffer class OrderingBean(val subject: String, val name: String, val count: Int) extends Ordered[OrderingBean] with Serializable { val equiv = new ArrayBuffer[(String, String, Int)]() equiv += ((subject, name, count)) override def compare(that: OrderingBean): Int = { if (this.count == that.count) { equiv += ((that.subject, that.name, that.count)) 0 } else { -(this.count - that.count) } } override def toString: String = if (equiv.size > 1) { equiv.toString() } else s"($subject, $name, $count)" }
实现方式六(使用repartitionAndSortWithinPartitions)
repartitionAndSortWithinPartitions按照指定的分区器进行排序并且在每个分区内进行排序
FavoriteTeacher6
object FavoriteTeacher06 { def main(args: Array[String]): Unit = { val isLocal = args(0).toBoolean //创建SparkConf,然后创建SparkContext val conf = new SparkConf().setAppName(this.getClass.getSimpleName) if (isLocal) { conf.setMaster("local[1]") } val sc = new SparkContext(conf) //指定以后从哪里读取数据创建RDD val lines: RDD[String] = sc.textFile(args(1)) //对数据进行切分 val subjectTeacherAndOne = lines.map(line => { val fields = line.split("/") val subject = fields(2).split("[.]")(0) val teacher = fields(3) ((subject, teacher), 1) }) //聚合 val reduced = subjectTeacherAndOne.reduceByKey(_ + _) //计算所有的学科,并收集到Driver端 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect() //paritioner是在Driver端被new出来的,但是他的方法是在Executor中被调用的 val partitioner = new SubjectPartitionerV2(subjects) //对原来的数据进行整理 val keyByRDD: RDD[((String, String, Int), Null)] = reduced .map(t => ((t._1._1, t._1._2, t._2), null)) //隐式转换 implicit val orderRules = new Ordering[(String, String, Int)] { override def compare(x: (String, String, Int), y: (String, String, Int)): Int = { -(x._3 - y._3) } } val topN = args(2).toInt //repartitionAndSortWithinPartitions按照指定的分区器进行排序并且在每个分区内进行排序 val result: RDD[((String, String, Int), Null)] = keyByRDD .repartitionAndSortWithinPartitions(partitioner) result.foreachPartition(it => { var index = 1 while (it.hasNext && index <= topN) { val tuple = it.next() println(tuple) index += 1 } }) sc.stop() } }
SubjectPartitionerV2
class SubjectPartitionerV2(val subjects: Array[String]) extends Partitioner { //初始化分器的分区规则 val rules = new mutable.HashMap[String, Int]() var index = 0 for(sub <- subjects) { rules(sub) = index index += 1 } override def numPartitions: Int = subjects.length //该方法会在Executor中的Task中被调用 override def getPartition(key: Any): Int = { val tuple = key.asInstanceOf[(String, String, Int)] val subject = tuple._1 //到实现初始化的规则中查找这个学科对应的分区编号 rules(subject) } }
扫描二维码关注公众号,回复:
8130154 查看本文章
2. 自定义排序
数据形式:姓名,年龄,颜值
需求:首先按照颜值排序(颜值高的排前面),当颜值相同的情况下,年龄小的人排前面
2.1 第一种形式
思路,定义一个Boy类(case class),用来加载这些属性,利用隐式转换定义一个排序规则,具体如下
CustomSort1
object CustomSort1 { def main(args: Array[String]): Unit = { import com._51doit.spark04.MyPredef val conf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[*]") // 创建SparkContext val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.parallelize(List("jack,30,99.99", "sherry,18,9999.99", "Tom,29,99.99")) // 处理数据,使用mapPartitions,减少Boy的创建 val BoyRDD: RDD[Boy] = lines.mapPartitions(it => { it.map(t => { val split: Array[String] = t.split(",") Boy(split(0), split(1).toInt, split(2).toDouble) }) }) import MyPredef.Boy2OrderingBoy val res: RDD[Boy] = BoyRDD.sortBy(t => t) print(res.collect().toBuffer) } }
MyPredef
两种写法都行
Ordered的形式
object MyPredef { implicit val Boy2OrderingBoy: Boy => Ordered[Boy] = (boy:Boy) => new Ordered[Boy]{ override def compare(that: Boy): Int = { if(boy.fv == that.fv){ boy.age - that.age } else{ -(boy.fv - that.fv).toInt } } } }
Ordering的形式
object MyPredef { implicit val Boy2OrderingBoy: Ordering[Boy] = new Ordering[Boy] { override def compare(x: Boy, y:Boy): Int = { if (x.fv == y.fv) { x.age - y.age } else { -(x.fv - y.fv).toInt } } } }
Boy
case class Boy(name:String, age:Int, fv: Double)
2.2 第二种形式(借助元组)
object CustomSort2 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[*]") // 创建SparkContext val sc: SparkContext = new SparkContext(conf) val lines: RDD[String] = sc.parallelize(List("jack,30,99.99", "sherry,18,9999.99", "Tom,29,99.99")) // 处理数据,使用mapPartitions,减少Boy的创建 val tpRDD: RDD[(String, Int, Double)] = lines.mapPartitions(it => { it.map(t => { val split: Array[String] = t.split(",") (split(0), split(1).toInt, split(2).toDouble) }) }) // 使用元组的默认规则进行排序 val sorted: RDD[(String, Int, Double)] = tpRDD.sortBy(t => (-t._3, t._2)) println(sorted.collect().toBuffer) } }