package day05; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; import java.util.Arrays; import java.util.List; public class MyTransformation_java_2 { public static void myReduceBykey(JavaSparkContext jsc){ List<Tuple2<String,Integer>> list= Arrays.asList( new Tuple2<String,Integer>("class1", 80), new Tuple2<String,Integer>("class1", 80), new Tuple2<String,Integer>("class2", 82), new Tuple2<String,Integer>("class2", 83)); JavaPairRDD<String,Integer> parallelizePairs=jsc.parallelizePairs(list); JavaPairRDD<String,Integer> reduceByKeyRDD=parallelizePairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); System.out.println(reduceByKeyRDD.collect().toString()); } public static String mySortBykey(JavaSparkContext jsc) { List<Tuple2<Integer,String>> list = Arrays.asList( new Tuple2<Integer,String>(22,"class1"), new Tuple2<Integer,String>(23,"class1"), new Tuple2<Integer,String>(23,"class2"), new Tuple2<Integer,String>(27,"class2")); JavaPairRDD<Integer,String> parallelizePairs=jsc.parallelizePairs(list); JavaPairRDD<Integer,String> sortByKeyRDD=parallelizePairs.sortByKey(); return sortByKeyRDD.collect().toString(); } public static String myJoin(JavaSparkContext jsc){ List<Tuple2<Integer,String>> list1 = Arrays.asList( new Tuple2<Integer,String>(22,"class1"), new Tuple2<Integer,String>(23,"class1"), new Tuple2<Integer,String>(23,"class2"), new Tuple2<Integer,String>(27,"class2")); List<Tuple2<Integer,Integer>> list2= Arrays.asList( new Tuple2<Integer,Integer>(22,22), new Tuple2<Integer,Integer>(24,20), new Tuple2<Integer,Integer>(22,72), new Tuple2<Integer,Integer>(12,22)); JavaPairRDD<Integer,String> parallelizePairs1=jsc.parallelizePairs(list1); JavaPairRDD<Integer,Integer> parallelizePairs2=jsc.parallelizePairs(list2); JavaPairRDD<Integer, Tuple2<String, Integer>> joinRDD = parallelizePairs1.join(parallelizePairs2); return joinRDD.collect().toString(); } public static String myCoGroup(JavaSparkContext jsc){ List<Tuple2<Integer,String>> list1 = Arrays.asList( new Tuple2<Integer,String>(22,"class1"), new Tuple2<Integer,String>(23,"class1"), new Tuple2<Integer,String>(23,"class2"), new Tuple2<Integer,String>(27,"class2")); List<Tuple2<Integer,Integer>> list2= Arrays.asList( new Tuple2<Integer,Integer>(22,22), new Tuple2<Integer,Integer>(24,20), new Tuple2<Integer,Integer>(22,72), new Tuple2<Integer,Integer>(12,22)); JavaPairRDD<Integer,String> parallelizePairs1=jsc.parallelizePairs(list1); JavaPairRDD<Integer,Integer> parallelizePairs2=jsc.parallelizePairs(list2); JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> cogroup = parallelizePairs1.cogroup(parallelizePairs2); return cogroup.collect().toString(); } public static void main(String[] args){ SparkConf conf= new SparkConf().setMaster("local").setAppName("MyTransformation_java_2"); JavaSparkContext jsc=new JavaSparkContext(conf); //myReduceBykey(jsc); String result=""; result=mySortBykey(jsc); System.out.println(result); String result1=""; result=myJoin(jsc); System.out.println(result1); String result2=""; result=myCoGroup(jsc); System.out.println(result2); } }
用JAVA简单实现Spark转换算子实例(reduceByKey,sortByKey,join,cogroup)
猜你喜欢
转载自blog.csdn.net/wjn19921104/article/details/80230489
今日推荐
周排行