aggregat和aggregateByKey用法

aggregate用法 —先局部再全局计算

val rdd2= sc.parallelize(List(1,2,3,4,5,6,7,8),2)
其中rdd2被分为两个分区,0分区的数据是1,2,3,4;1分区的数据是5,6,7,8

rdd2.aggregate(0)(math.max(_,_),_+_) 结果是12
分析:初始值0和第一个分区的数据比较,取最大值4,然后初始值0和第二个分区的数据比较,取最大值8,然后全局相加,全局相加的时候0也要加一次,所以 4+8+0=12

rdd2.aggregate(5)(math.max(_,_),_+_) 结果是18
分析:初始值5和第一个分区的数据比较,取最大值5,然后初始值5和第二个分区的数据比较,取最大值8,然后全局相加,全局相加的时候0也要加一次,所以 5+8+5=18

rdd2.aggregate(5)(_+_,_+_) 结果是51
分析:初始值5和第一个分区的所有数据相加,得到1+2+3+4+5=15 ,初始值5和第二个分区的所有数据相加,得到5+6+7+8+5=31
然后全局相加 15 + 31 + 5= 51

val rdd1= sc.parallelize(List(“a”,“b”,“c”,“d”,“e”,“f”),2)

rdd1.aggregate("")(_+_,_+_) 结果可能是abcdef,也可能是defabc,因为是两个分区对应2个task,并行跑,谁先跑完就在最前面

val rdd3= sc.parallelize(List(“12”,“23”,“345”,“4567”),2)

rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y) => x+y) 结果是24或者42

val rdd4= sc.parallelize(List("12","23","345",""),2)

rdd4.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y) => x+y)结果是10或者01
分析:先对初始值空字符进行length 得到的是0 然后0再toString,再length得到的是1,之后分别和第一第二分区的每个数比较,最后一个数据比较之后直接返回,

val rdd5= sc.parallelize(List(“12”,“23”,"",“345”),2)

rdd5.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y) => x+y)结果是11

第二个分区的第一个数据比较之后是0,然后toString,之后还要比较,所以长度是1,在和345比较。

aggregateByKey用法 —只计算局部

val rdd = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)

rdd.reduceByKey(_+_).collect 结果是Array((cat,19),(dog,12),(mouse,6))

rdd.aggregateByKey(100)(_+_,_+_).collect 结果是Array((cat,219),(dog,112),(mouse,206))

rdd.aggregateByKey(100)(math.max(_,_),_+_).collect结果是 Array((dog,100), (cat,200), (mouse,200)) 因为初始值是100,分别和两个分区的每个数据进行、比对,取最大值,然后再相加。100先和"(“cat”,2)比取100,再和(“cat”,5)比还是100,所以第一个分区的cat是100,第二个分区的cat也是100,全局相加100+100=200

rdd.aggregateByKey(5)(math.max(_,_),_+_).collect
结果是Array((dog,12), (cat,17), (mouse,10)) 结果自己分析

猜你喜欢

转载自blog.csdn.net/qaz_125/article/details/83140649