说明
函数签名
- zeroValue:初始值,用于和RDD分区内的数据依次迭代。
- seqOp:(U,V) => U:分区内的计算规则,初始值根据此规则依次与分区内的数据进行迭代。
- combOp:(U,U) => U:分区间的计算规则,用于合并分区内的数据。
代码示例
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 3), ("c", 6), ("c", 4),
("b", 3),("a", 2), ("c", 8)), 2)
println("------------------分区内数据------------------")
rdd.mapPartitionsWithIndex{
case (index, datas) => {
println(index + "--->" + datas.mkString(","))
datas
}
}.collect()
println("------------------分割线------------------")
val resRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(Math.max(_, _), _ + _)
resRDD.collect().foreach(println)
sc.stop()