- reduceByKey
- 算法解释
reduceByKey 是比 combineByKey 更简单的一种情况,只是两个值合并成一个值,( Int, Int V)to (Int, Int C),比如叠加。所以 createCombiner reduceBykey 很简单,就是直接返回 v,而 mergeValue和 mergeCombiners 逻辑是相同的,没有区别。- 源文件
val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1)))
- scala代码
val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1))) // rdd.reduceByKey((x,y) => x + y).foreach(println(_)) println(rdd.reduceByKey((x,y) => x + y).collect().mkString(","))
- 过滤结果
(ee,1),(aa,2),(dd,1),(cc,1)
- foldByKey
- 算法解释
该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V. foldByKey可以参考我之前的scala的fold的介绍与reduce不同的是 foldByKey开始折叠的第一个元素不是集合中的第一个元素,而是传入的一个元素- 源文件
val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1)))
- scala代码
val rdd = sc.parallelize(Array(("aa", 1), ("aa", 1), ("cc", 1), ("dd", 1), ("ee", 1))) // rdd.reduceByKey((x,y) => x + y).foreach(println(_)) println(rdd.reduceByKey((x,y) => x + y).collect().mkString(","))
- 过滤结果
(ee,1),(aa,2),(dd,1),(cc,1)
- sortByKey
- 算法解释
SortByKey用于对pairRDD按照key进行排序,第一个参数可以设置true或者false,默认是true
SortBy和sortByKey功能相同- 源文件
val rdd = sc.makeRDD(Array(("1", 1), ("5", 1), ("3", 1), ("1", 1), ("2", 1)))
- scala代码
println(rdd.reduceByKey((x, y) => x + y).sortByKey().collect().mkString(",")) println(rdd.reduceByKey((x, y) => x + y).sortBy(_._2).collect().mkString(","))
- 过滤结果
sortByKey : (1,2),(2,1),(3,1),(5,1) sortBy : (2,1),(5,1),(3,1),(1,2)
- groupByKey
- 算法解释
groupByKey会将RDD[key,value] 按照相同的key进行分组,形成RDD[key,Iterable[value]]的形式, 有点类似于sql中的groupby,例如类似于mysql中的group_concat- 源文件
val rdd = sc.makeRDD(Array(("1", 1), ("5", 1), ("3", 1), ("1", 1), ("2", 1)))
- scala代码
val scoreDetail = sc.parallelize(List(("name", "张三"), ("name", "李四"), ("age", 11), ("age", 20))) println(scoreDetail.groupByKey().collect().mkString(","))
- 过滤结果
(name,CompactBuffer(张三, 李四)),(age,CompactBuffer(11, 20))
- cogroup
- 算法解释
groupByKey是对单个 RDD 的数据进行分组,还可以使用一个叫作 cogroup() 的函数对多个共享同一个键的 RDD 进行分组- 源文件
val scoreDetail = sc.parallelize(List(("xiaoming",95),("xiaoming",90),("lihua",95),("lihua",98),("xiaofeng",97))) val scoreDetai2 = sc.parallelize(List(("xiaoming",65),("lihua",63),("lihua",62),("xiaofeng",67))) val scoreDetai3 = sc.parallelize(List(("xiaoming",25),("xiaoming",15),("lihua",35),("lihua",28),("xiaofeng",36)))
- scala代码
val scoreDetail = sc.parallelize(List(("xiaoming",95),("xiaoming",90),("lihua",95),("lihua",98),("xiaofeng",97))) val scoreDetai2 = sc.parallelize(List(("xiaoming",65),("lihua",63),("lihua",62),("xiaofeng",67))) val scoreDetai3 = sc.parallelize(List(("xiaoming",25),("xiaoming",15),("lihua",35),("lihua",28),("xiaofeng",36))) println(scoreDetail.cogroup(scoreDetai2,scoreDetai3).collect().mkString(","))
- 过滤结果
(xiaoming,(CompactBuffer(95, 90),CompactBuffer(65),CompactBuffer(25, 15))),(lihua,(CompactBuffer(95, 98),CompactBuffer(63, 62),CompactBuffer(35, 28))),(xiaofeng,(CompactBuffer(97),CompactBuffer(67),CompactBuffer(36)))