● aggregate源码
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
● 分析
多个()构成,是一个柯里化方法(我是这么理解的),三部分构成 :
1. (zeroValue: U) : 指定一个默认值
2. (seqOp: (U, T) : seqOp其实是计算每个分区,类似hadoop的map阶段
3. combOp: (U, U) : 将所有分区聚合(就是将seqOp操作的结果聚合),类似hadoop的reduce阶段
● 栗子(求和)
scala> val a = sc.parallelize(List(1,2,3,4,5,6),2)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> a.partitions.length
res48: Int = 2
scala> a.aggregate(0)(_+_,_+_)
res50: Int = 21
通过并行化创建一个RDD,指定分区的数量2。
aggregate(0) : 给 (zeroValue: U) 指定一个默认值0,每个分区操作的时候都需要加上此默认值,也就是 分区数 * 默认值;需要注意的是所有分区聚合的时候(combOp操作)也算作一次;
那么应该这样计算 : (n + 1) * 默认值 ,n :代表分区数,这是个可变参数;
(_+_,_+_) : 第一个参数(_+_)的含义是将每个分区的元素进行累加,seqOp操作;
第二个参数(_+_)的含义是将第一个参数的结果再次累加,combOp操作;
最终结果 :(2 + 1)* 0 + 第一个分区和+(第二个分区和) = 0 + (1+2+3)+(4+5+6) = 21
指定默认值1 :
scala> a.aggregate(1)(_+_,_+_)
res51: Int = 24
最终结果符合上述计算公式,没毛病
● 栗子(求每个分区最大值和)
scala> a.collect
res52: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> a.partitions.length
res53: Int = 2
scala> a.aggregate(0)(math.max(_,_),_+_)
res55: Int = 9
第一个分区最大值3 + 第二个分区最大值 6 = 9
scala> a.aggregate(10)(math.max(_,_),_+_)
res57: Int = 30
计算默认值为10时,有点看不懂了,我原以为结果为39,然而打脸了,结果有点摸不着头脑了。
经过一番搜索,终于明白了,
原来默认值都会参与分区计算的函数中,以此栗子来算:
首先求出第一个分区的最大值3,然后最大值再与默认值10比大小!!!,所以第一个分区最大值为10!!
第二个分区计算同上!
最后在聚合分区的时候 10 + 10 + 10 = 30!!!
●注意
在此文章中结果并不会因为分区的顺序不同而改变,在其他场所因分区的无序性可能会出现多个结果,其实是结果输出顺序紊乱了!!!
因为计算分区数据时,task时快时慢,不能保证分区的结果顺序,最后聚合时候产生的结果也不一样!!!
● aggregateByKey : 对相同的key进行聚合
scala> a
res136: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> a.collect
res137: Array[(String, Int)] = Array((ws,1), (hi,1), (hello,1), (ws,3), (nice,1))
scala> a.aggregateByKey(10)(_+_,_+_)
res138: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[39] at aggregateByKey at <console>:27
scala> a.aggregateByKey(10)(_+_,_+_).collect
res139: Array[(String, Int)] = Array((hi,11), (ws,14), (nice,11), (hello,11))
流程 :
先将每个分区相同的key进行聚合计算 , 然后初始值 + 已计算好分区的key值。
再将聚合好的分区结果传递给combOp再次聚合相同的key,得到最终的结果。
注意,combOp阶段不需要计算初始值了!!