/** * createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建 那个键对应的累加器的初始值 mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并 mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。 */ public class CombineByKey { public static void main(String[] args){ SparkConf conf = new SparkConf().setAppName("CombineKeyCountAvg").setMaster("local[2]"); JavaSparkContext javaSparkContext = new JavaSparkContext(conf); ArrayList<ScoreDetail> scoreDetails = new ArrayList<>(); scoreDetails.add(new ScoreDetail("xiaoming", "math", 90)); scoreDetails.add(new ScoreDetail("xiaoming", "English", 80)); scoreDetails.add(new ScoreDetail("xiaohong", "math", 70)); scoreDetails.add(new ScoreDetail("xiaohong", "English", 60)); scoreDetails.add(new ScoreDetail("xiaozhang", "math", 50)); scoreDetails.add(new ScoreDetail("xiaozhang", "English", 40)); //list => javaRDD JavaRDD<ScoreDetail> scoreDetailsRDD = javaSparkContext.parallelize(scoreDetails); //scoreDetail => (name, scoreDetail) JavaPairRDD<String, ScoreDetail> nameWithScoreDetail = scoreDetailsRDD .mapToPair(new PairFunction<ScoreDetail, String, ScoreDetail>() { @Override public Tuple2<String, ScoreDetail> call(ScoreDetail scoreDetail) throws Exception { return new Tuple2<String, ScoreDetail>(scoreDetail.name, scoreDetail); } }); //传入一个ScoreDetail,返回一个Tuple,当前没有遇到这个scoreDetail时,会创建这个Tuple //new Function<ScoreDetail, Float,Integer>(); Function<ScoreDetail, Tuple2<Float, Integer>> createCombine = new Function<ScoreDetail, Tuple2<Float, Integer>>() { @Override public Tuple2<Float, Integer> call(ScoreDetail scoreDetail) throws Exception { return new Tuple2<>(scoreDetail.score, 1); } }; //根据要合并的对象nameWithScoreDetail中的name作为键,scoreDetail作为值 //传入操作是传入键对应的值scoreDetail和Tuple<Float, Integer>,返回一个Tuple Function2<Tuple2<Float, Integer>, ScoreDetail, Tuple2<Float, Integer>> mergeValue = new Function2<Tuple2<Float, Integer>, ScoreDetail, Tuple2<Float, Integer>>() { @Override public Tuple2<Float, Integer> call(Tuple2<Float, Integer> t, ScoreDetail scoreDetail) throws Exception { return new Tuple2<>(t._1 + scoreDetail.score, t._2 + 1); } }; //合并分区,传入两个Tuple,返回一个Tuple Function2<Tuple2<Float, Integer>, Tuple2<Float, Integer>, Tuple2<Float, Integer>> mergeCombiners = new Function2<Tuple2<Float, Integer>, Tuple2<Float, Integer>, Tuple2<Float, Integer>>() { @Override public Tuple2<Float, Integer> call(Tuple2<Float, Integer> t1, Tuple2<Float, Integer> t2) throws Exception { return new Tuple2<>(t1._1 + t2._1, t1._2 + t2._2); } }; JavaPairRDD<String, Tuple2<Float, Integer>> combineByKey = nameWithScoreDetail.combineByKey(createCombine, mergeValue, mergeCombiners); System.out.println(combineByKey.collect()); } }
java实现spark算子combineByKey
猜你喜欢
转载自blog.csdn.net/qq_37755661/article/details/79679939
今日推荐
周排行