spark使用aggregateByKey替代groupBeKey
使用aggregateByKey需要确定下面三个参数:
-
zeroValue :初始值
-
seqOp : 对于key相同的元素的操作方法
-
combOp : 对于key不同的元素的操作方法
听起来很难懂,看个例子就明白了:
val data=List((1,3),(1,2),(1,4),(2,3))
val rdd=sc.parallelize(data )
//合并不同partition中的值,a,b得数据类型为zeroValue的数据类型
def combOp(a:List[Int],b:List[Int]):List[Int] ={
a ++ b
}
def seqOp(a:List[Int],b:Int):List[Int]={
a.::(b)
}
val aggregateByKeyRDD=rdd.aggregateByKey(List(0))(seqOp, combOp)
aggregateByKeyRDD.foreach(println)
val groupbykeyRDD=rdd.groupByKey()
groupbykeyRDD.foreach(println)
输出结果为:
(1,List(4, 2, 3, 0))
(2,List(3, 0))
(1,CompactBuffer(3, 2, 4))
(2,CompactBuffer(3))