1.基础排序算法
2.二次排序算法
3.更高级别排序
4.排序算法内幕
1.基础排序算法
sc.textFile("/data/putfile.txt").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_,1).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)).collect
//key value交换
sc.setLogLevel("WARN")
2.二次排序算法
所谓二次排序就是指排序的时候考虑两个维度(有可能10次排序)
Java版本
package com.dt.java.spark;
import akka.util.HashCode;
import scala.math.Ordered;
import java.io.Serializable;
//实现Ordered接口(scala的)
public class SecondarySort implements Ordered<SecondarySort>,Serializable {
//自定义二次排序的key
private int first;
private int second;
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public SecondarySort(int first,int second)
{
this.first =first;
this.second=second;
}
@Override
public int compare(SecondarySort that) {
if (this.first - that.getFirst()!=0)
{
return this.first - that.getFirst();
}else
{
return this.second - that.getSecond();
}
}
@Override
public boolean $less(SecondarySort that) {
if(this.first < that.getFirst())
{
return true;
}else if(this.first == that.getFirst() && this.second < that.getSecond())
{
return true;
}
return false;
}
@Override
public boolean $greater(SecondarySort that) {
if(this.first > that.getFirst()){
return true;
}else if(this.first == that.getFirst() && this.second > that.second)
{
return true;
}
return false;
}
@Override
public boolean $less$eq(SecondarySort that) {
if(this.$less(that)){
return true;
}else if(this.first == that.getFirst() && this.second == that.second)
{
return true;
}
return false;
}
@Override
public boolean $greater$eq(SecondarySort that) {
if(this.$greater(that))
{
return true;
}else if(this.first == that.getFirst() && this.second == that.getSecond())
{
return true;
}
return false;
}
@Override
public int compareTo(SecondarySort that) {
if (this.first - that.getFirst()!=0)
{
return this.first - that.getFirst();
}else
{
return this.second - that.getSecond();
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SecondarySort that = (SecondarySort) o;
if (first != that.first) return false;
return second == that.second;
}
@Override
public int hashCode() {
int result = first;
result = 31 * result + second;
return result;
}
}
package com.dt.java.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
//二次排序,具体实现步骤
//第一步:按照Ordered和Serrializable接口实现自定义排序的Key
//第二步:将要进行二次排序的文件加载进来生成《key,value》类型的RDD
//第三步:使用sortByKey基于自定义的Key进行二次排序
//第四步:去除掉排序的key,,只保留排序结果
public class SecondarySortApp {
public static void main(String[] args){
SparkConf conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD lines = sc.textFile("D:\\JavaWorkspaces\\sparkproject\\sparktest.txt");
JavaPairRDD,String> pairs = lines.mapToPair(new PairFunction, SecondarySort, String>() {
@Override
public Tuple2, String> call(String line) throws Exception {
String[] splited = line.split(" ");
SecondarySort key = new SecondarySort(Integer.valueOf(splited[0]),Integer.valueOf(splited[1]));
return new Tuple2, String>(key,line);
}
}
);
JavaPairRDD,String> sorted = pairs.sortByKey();//完成二次排序
//过滤掉排序后自定的key,保留排序的结果
JavaRDD secondarysorted = sorted.map(new Function, String>, String>() {
@Override
public String call(Tuple2, String> sortedContent) throws Exception {
return sortedContent._2();
}
}
);
//
secondarysorted.foreach(new VoidFunction() {
@Override
public void call(String sorted) throws Exception {
System.out.println(sorted);
}
});
}
}//生成默认的构造器
Scala版本
package com.dt.scala.spark
class SecondarySort(val first:Int, val second:Int) extends Ordered[SecondarySort] with Serializable{
override def compare(that: SecondarySort): Int = {
if(this.first - that.first != 0)
{
this.first - that.first
}else {
this.second - that.second
}
}
}
package com.dt.scala.spark
import org.apache.spark.{SparkContext, SparkConf}
object SecondarySortApp {
def main (args: Array[String]) {
//第一步;创建spark的配置对象sparkconf
val conf = new SparkConf()//创建sparkconf对象
conf.setAppName("SecondarySortApp")//设置应用程序的名称
conf.setMaster("local")//设置本地运行
//创建sparkcontext对象,sparkcontext是程序的唯一入口
val sc = new SparkContext(conf)
val lines = sc.textFile("D:\\JavaWorkspaces\\sparkproject\\sparktest.txt")
val pairWithSortkey = lines.map(line =>(
new SecondarySort( line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
))
val sorted = pairWithSortkey.sortByKey(false)
val sortedResult = sorted.map(sortedline => sortedline._2)
sortedResult.collect.foreach(println)
}
}
作业2:阅读RangePartitioner源码
1、scala 实现二次排序 2;自己阅读RangePartitioner
***********排序****************
在 Python 中以字符串顺序对整数进行自定义排序
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))
在 Scala 中以字符串顺序对整数进行自定义排序
val input: RDD[(Int, Venue)] = ...
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()
在 Java 中以字符串顺序对整数进行自定义排序
class IntegerComparator implements Comparator<Integer> {
public int compare(Integer a, Integer b) {
return String.valueOf(a).compareTo(String.valueOf(b))
}
}
rdd.sortByKey(comp)
val kv1=sc.parallelize(List((5, 2), (3, 4), (4, 2)))
scala> kv1.sortByKey().foreach(print) //无顺序
(5,2)(3,4)(4,2)
scala> kv1.sortByKey().foreach(print) //无顺序
(3,4)(5,2)(4,2)
scala> kv1.sortByKey().collect //升序
res2: Array[(Int, Int)] = Array((3,4), (4,2), (5,2))
scala> kv1.sortByKey(false).collect //降序
res3: Array[(Int, Int)] = Array((5,2), (4,2), (3,4))
scala> print(implicitly[Ordering[Int]])
scala.math.Ordering$Int$@c88a842
implicit val sortM = new Ordering[Int] {
override def compare(a: Int, b: Int) = b-a
}
scala> kv1.sortByKey().collect
res5: Array[(Int, Int)] = Array((5,2), (4,2), (3,4))
implicit val sortM1 = new Ordering[Int] {
override def compare(a: Int, b: Int) = a-b
}
scala> print(implicitly[Ordering[Int]])
<console>:28: error: ambiguous implicit values:
both value sortM of type => Ordering[Int]
and value sortM1 of type => Ordering[Int]
match expected type Ordering[Int]
print(implicitly[Ordering[Int]])
scala> kv1.sortByKey().collect
<console>:31: error: value sortByKey is not a member of org.apache.spark.rdd.RDD[(Int, Int)]
kv1.sortByKey().collect
^
***********sortBy函数*********************************
scala> val data = List(3,1,90,3,5,12)
data: List[Int] = List(3, 1, 90, 3, 5, 12)
scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.collect
res0: Array[Int] = Array(3, 1, 90, 3, 5, 12)
scala> rdd.sortBy(x => x).collect
res1: Array[Int] = Array(1, 3, 3, 5, 12, 90)
scala> rdd.sortBy(x => x, false).collect
res2: Array[Int] = Array(90, 12, 5, 3, 3, 1)
scala> val result = rdd.sortBy(x => x, false)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at sortBy at <console>:28
scala> result.partitions.size
res3: Int = 4
scala> val result = rdd.sortBy(x => x, false, 1)
result: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[18] at sortBy at <console>:28
scala> result.partitions.size
res4: Int = 1
***********sortByKey函数*********************************
scala> val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[19] at parallelize at <console>:24
scala> val b = sc. parallelize (1 to a.count.toInt , 2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:26
scala> val c = a.zip(b)
c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[21] at zip at <console>:28
scala> c.sortByKey().collect
res5: Array[(String, Int)] = Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))
scala> val b = sc.parallelize(List(3,1,9,12,4),2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[28] at zip at <console>:28
scala> c.sortByKey().collect
res8: Array[(Int, String)] = Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,397090770))
implicit val sortIntegersByString = new Ordering[Int]{
override def compare(a: Int, b: Int) =
a.toString.compare(b.toString)}
scala> implicit val sortIntegersByString = new Ordering[Int]{
| override def compare(a: Int, b: Int) =
| a.toString.compare(b.toString)}
sortIntegersByString: Ordering[Int] = $anon$1@50514648
scala> c.sortByKey().collect
res9: Array[(Int, String)] = Array((1,iteblog), (12,397090770), (3,wyp), (4,test), (9,com))
***********Spark二次排序*********************************
scala> val at=List((2,2),(1,31),(4,4),(1,11),(2,2),(1,31),(4,4),(1,21),(1,1),(2,2),(1,6),(3,8),(1,11),(3,23),(3,3),(2,22),(1,11),(3,3),(2,2),(3,8),(1,6),(0,5),(4,4))
at: List[(Int, Int)] = List((2,2), (1,31), (4,4), (1,11), (2,2), (1,31), (4,4), (1,21), (1,1), (2,2), (1,6), (3,8), (1,11), (3,23), (3,3), (2,22), (1,11), (3,3), (2,2), (3,8), (1,6), (0,5), (4,4))
scala> val kv1=sc.parallelize(at)
kv1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[43] at parallelize at <console>:32
scala> kv1.sortByKey(false).collect
res16: Array[(Int, Int)] = Array((4,4), (4,4), (4,4), (3,8), (3,23), (3,3), (3,3), (3,8), (2,2), (2,2), (2,2), (2,22), (2,2), (1,31), (1,11), (1,31), (1,21), (1,1), (1,6), (1,11), (1,11), (1,6), (0,5))
--先按first降序再按second升序
class SortByTwice(val first:Int,val second:Int) extends Ordered[SortByTwice] with Serializable{
def compare(that:SortByTwice):Int={
if(this.first-that.first!=0){
return that.first-this.first
} else{
return this.second-that.second
}
}
}
scala> kv1.map(x=>new SortByTwice(x._1,x._2)).sortBy(x=>x).map(x=>(x.first,x.second)).collect
res18: Array[(Int, Int)] = Array((4,4), (4,4), (4,4), (3,3), (3,3), (3,8), (3,8), (3,23), (2,2), (2,2), (2,2), (2,2), (2,22), (1,1), (1,6), (1,6), (1,11), (1,11), (1,11), (1,21), (1,31), (1,31), (0,5))
scala> kv1.map(x=>new SortByTwice(x._1,x._2)).sortBy(x=>x).map(x=>(x.first,x.second)).collect.foreach(println)
(4,4)
(4,4)
(4,4)
(3,3)
(3,3)
(3,8)
(3,8)
(3,23)
(2,2)
(2,2)
(2,2)
(2,2)
(2,22)
(1,1)
(1,6)
(1,6)
(1,11)
(1,11)
(1,11)
(1,21)
(1,31)
(1,31)
(0,5)
scala> kv1.map(x=>(new SortByTwice(x._1,x._2),x)).sortByKey().map(x=>x._2).collect.foreach(println)
(4,4)
(4,4)
(4,4)
(3,3)
(3,3)
(3,8)
(3,8)
(3,23)
(2,2)
(2,2)
(2,2)
(2,2)
(2,22)
(1,1)
(1,6)
(1,6)
(1,11)
(1,11)
(1,11)
(1,21)
(1,31)
(1,31)
(0,5)
--先按first升序再按second降序
scala> kv1.map(x=>(new SortByTwice(x._1,x._2),x)).sortByKey(false).map(x=>x._2).collect.foreach(println)
(0,5)
(1,31)
(1,31)
(1,21)
(1,11)
(1,11)
(1,11)
(1,6)
(1,6)
(1,1)
(2,22)
(2,2)
(2,2)
(2,2)
(2,2)
(3,23)
(3,8)
(3,8)
(3,3)
(3,3)
(4,4)
(4,4)
(4,4)
scala> print(implicitly[Ordering[SortByTwice]])
scala.math.LowPriorityOrderingImplicits$$anon$6@5343c732
通过groupBy进行二次排序(也有一种情况是需要将Key进行groupBy的Key,Value二次排序)
scala> kv1.groupByKey.map(x=>(x._1,x._2.toList.sortWith(_>_))).collect.foreach(println)
(4,List(4, 4, 4))
(0,List(5))
(1,List(31, 31, 21, 11, 11, 11, 6, 6, 1))
(2,List(22, 2, 2, 2, 2))
(3,List(23, 8, 8, 3, 3))
scala> kv1.groupByKey.map(x=>(x._1,x._2.toList.sortWith(_<_))).collect.foreach(println)
(4,List(4, 4, 4))
(0,List(5))
(1,List(1, 6, 6, 11, 11, 11, 21, 31, 31))
(2,List(2, 2, 2, 2, 22))
(3,List(3, 3, 8, 8, 23))