package com.sparktest; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.Function3; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 数据: * (1,3),(1,2) 第一个partition * (1,4),(2,3) 第二个partition * aggregateByKey: * 第一个参数:初始值,会在本地合并时加入 * 第二个参数:seqFunc:用于本地合并(即合并在一个partition中的值),或者说是map端合并 * 第三个参数:combFunc:用于跨partition合并,或者说reduce端合并 * 注意 seqFunc,combFunc 都是对(key,value)对的value进行合并 * 比如下面的例子: seqFunc 求两个数的和 * combFunc 求两个数的最小值 * 加入初始值 seqFunc(两两合并) * 先本地合并 :(1,3),(1,2) --> (1,(3,2)) --------->(1,(3,2,100)) --------------->(1,105) * (1,4),(2,3) --> (1,4) --------->(1,(4,100)) --------------->(1,104) * (2,3) --------->(2,(3,100)) --------------->(2,103) * 再跨partition合并: (1,(105,104)) --------------------------------------->(1,104) * (2,103) ---------------------------------------->(2,103) */ public class AggregateByKey { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("AggregateByKey") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, Integer>> localList = Arrays.asList( new Tuple2<Integer,Integer>(1,3), new Tuple2<Integer,Integer>(1,2), new Tuple2<Integer,Integer>(1,4), new Tuple2<Integer,Integer>(2,3)); JavaRDD<Tuple2<Integer, Integer>> dataRDD = sc.parallelize(localList,2); JavaPairRDD<Integer, Integer> pairRDD = dataRDD.mapToPair(new PairFunction<Tuple2<Integer,Integer>,Integer,Integer>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> tuple) throws Exception { return new Tuple2<Integer, Integer>(tuple._1, tuple._2); } }); //本地值(一个partition) Function2<Integer,Integer,Integer> seqFunc = new Function2<Integer,Integer,Integer>() { @Override public Integer call(Integer num1, Integer num2) throws Exception { // TODO Auto-generated method stub return num1 + num2; } }; //不同partition的值合并 Function2<Integer,Integer,Integer> combFunc = new Function2<Integer,Integer,Integer>() { @Override public Integer call(Integer num1, Integer num2) throws Exception { // TODO Auto-generated method stub return Math.min(num1, num2); } }; //100代表初始值 JavaPairRDD<Integer, Integer> resultRDD = pairRDD.aggregateByKey(100, seqFunc,combFunc); resultRDD.foreach(new VoidFunction<Tuple2<Integer,Integer>>() { @Override public void call(Tuple2<Integer, Integer> tuple) throws Exception { System.out.println(tuple._1 + " " + tuple._2); } }); } }
spark-aggregateByKey
猜你喜欢
转载自blog.csdn.net/js54100804/article/details/80067623
今日推荐
周排行