spark 操作的几个步骤
1 数据关联 textFile 和 parallelize
2 转换操作(JavaRDD和JavaPairRDD他们可以通过mapToPair and flatMapToPair转换)
3 action操作,获取数据结果
一、wordcount的例子
//单词统计 public static void wordCount(JavaSparkContext ctx ){ String filePath = "e://log1.log"; JavaRDD<String> lines = ctx.textFile(filePath, 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() + " : " + tuple._2()); } }
二、 各种Transformations 和action测试准备数据
public static void testMap(JavaSparkContext ctx) { List<Integer> data = Arrays.asList(1, 2, 3, 4, 5,1,2,3); JavaRDD<Integer> distData = ctx.parallelize(data); List<Integer> data2 = Arrays.asList(1, 2, 3, 4, 5,6); JavaRDD<Integer> distData2 = ctx.parallelize(data2); List<String> data3 = Arrays.asList("wang zhan,xiao ming,li xin,wang qiang,e,f".split(",")); JavaRDD<String> data3RDD = ctx.parallelize(data3); JavaRDD<Integer> lineLengths ; JavaRDD<String> returnStr; JavaPairRDD<String, Integer> returnStr2; JavaPairRDD<String, Integer> returnStr3; // lineLengths = map( distData); // lineLengths = filter(distData ); // lineLengths = sample(distData ); // lineLengths = union(distData, distData2) ; // lineLengths = intersection(distData, distData2) ; // lineLengths = distinct(distData ) ; // returnStr = flatMap(data3RDD); //数据扁平打散 // returnStr3 = mapToPair(returnStr);// 数据变成键值对的形式 // reduceByKey(returnStr3); //对key进行分组计算 // List<Integer> list = lineLengths.collect(); // WordCount.print(list); // List<String> list2 = returnStr.collect() ; // WordCount.print(list2); // testPersist( data3RDD ); // List<Tuple2<String, Integer>> listTuple = returnStr2.collect(); // printTuple( listTuple ); // returnStr3 = reduceByKey(returnStr2 ); // List<Tuple2<String, Integer>> listTuple2 = returnStr3.collect(); // printTuple( listTuple2 ); //reduceByKey(returnStr2 ); //action //reduce( returnStr2); // count(returnStr2 ); }
三、groupByKey
//对key进行分组处理,但如果需要统计求和则最好不要这样处理 private static JavaPairRDD<String, Integer> groupByKey(JavaPairRDD<String, Integer> returnStr3) { JavaPairRDD<String, Iterable<Integer>> rdd = returnStr3.groupByKey(); return null; }
四、数据去重
//数据去重 private static JavaRDD<Integer> distinct(JavaRDD<Integer> distData) { JavaRDD<Integer> rdd3 = distData.distinct( ); print( rdd3); return null; }
五、交集数据
//获取rdd数据的交集 数据 private static JavaRDD<Integer> intersection(JavaRDD<Integer> distData, JavaRDD<Integer> distData2) { JavaRDD<Integer> rdd3 = distData.intersection(distData2 ); print(rdd3 ); return null; }
六数据持久化
/** * 持久化数据 * @param data3RDD */ private static void testPersist(JavaRDD<String> data3RDD) { System.out.println( "持久化数据到目录。。。"); data3RDD.persist(StorageLevel.MEMORY_ONLY()); // data3RDD.checkpoint(); // data3RDD.isCheckpointed() ; }
七、count统计
//计算统计 元素两两传入到reduce中然后计算统计 private static void count(JavaPairRDD<String, Integer> returnStr2) { System.out.println("元素总数:"+returnStr2.count() ); //获取元素总数 System.out.println("元素总数:"+returnStr2.first() );//获取第一个元素 System.out.println("元素总数:"+returnStr2.take(2) );//获取RDD的前2个元素 System.out.println("元素countByKey总数:"+returnStr2.countByKey( ) ); //根据key进行统计数量 //returnStr2.saveAsTextFile("E://test.txt") ;//数据保存到文件中 returnStr2.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> t) throws Exception { System.out.println( t ); } }); }
八、reduce操作
//计算统计 元素两两传入到reduce中然后计算统计 private static void reduce(JavaPairRDD<String, Integer> returnStr2) { Tuple2<String, Integer> t = returnStr2.reduce( new Function2<Tuple2<String,Integer>, Tuple2<String,Integer>, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> call( Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) throws Exception { System.out.println( v1 +" "+v2 ); return new Tuple2<String, Integer>(v1._1+v2._1,v1._2+v2._2() ); } } ); System.out.println("reduce结果 :"+t._1 +" "+t._2); }
九、reduceBykey使用
//将原来的RDD每一个行数据 变成一个数组,然后所有的数组数据存到一个总得RDD数组中 public static JavaPairRDD<String, Integer> reduceByKey(JavaPairRDD<String, Integer> rdd ) { JavaPairRDD<String, Integer> counts = rdd.reduceByKey( new Function2<Integer, Integer, Integer>() {//泛型分别是 :两个计算参数 ,最后是返回值 @Override public Integer call(Integer i1, Integer i2) {//每次把key相同的数据,与上一次执行的结果,依次传进来计算 System.out.println(i1+" == "+i2); return i1 + i2; } }); printPair(counts ); return counts; }
十 flatMap使用
//将原来的RDD每一个行数据 变成一个数组,然后所有的数组数据存到一个总得RDD数组中 public static JavaRDD<String> flatMap(JavaRDD<String> rdd ) { JavaRDD<String> lineLengths = rdd.flatMap( //返回值是输出的类型 new FlatMapFunction<String, String>() { //第一个参数是输入,第二个参数是输出 @Override public Iterable<String> call(String str) throws Exception { return Arrays.asList(SPACE.split(str)); } }) ; printStr(lineLengths ); return lineLengths; }
十一、mapToPair使用
// 将普通的RDD转换为 map数据的RDD方便计算处理 , a a->1 public static JavaPairRDD<String, Integer> mapToPair(JavaRDD<String> rdd ){ JavaPairRDD<String, Integer> ones = rdd.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>( t, 1); } }) ; return ones ; }
十二、 合并Rdd
// 合并rdd public static JavaRDD<Integer> union(JavaRDD<Integer> rdd , JavaRDD<Integer> rdd2) { JavaRDD<Integer> lineLengths = rdd.union(rdd2) ; print(lineLengths); return lineLengths; }
十三、抽样
// Return a sampled subset of this RDD. 返回一个RDD子集抽样 public static JavaRDD<Integer> sample(JavaRDD<Integer> rdd) { JavaRDD<Integer> lineLengths = rdd.sample(false , 0.4 ) ; print(lineLengths); return lineLengths; }
十四、map使用
// map 源中的每一个元素都进行一个函数操作,生成一个新的RDD ,即每个元素进行一次转换 public static JavaRDD<Integer> map(JavaRDD<Integer> rdd) { JavaRDD<Integer> lineLengths = rdd.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1+1; } }); print( lineLengths); return lineLengths; }
十五、打印数据
/** * 如果rdd中数据过多,则调用take获取一部分数据打印 * 1 直接rdd.foreach 打印rdd的数据,数据打印在各个executor中 * 2 调用rdd.collect.foreach 打印数据,数据打印在driver上 * @param lineLengths */ public static void print(JavaRDD<Integer> lineLengths ){ // lineLengths.foreach( new VoidFunction<Integer>() { // @Override // public void call(Integer t) throws Exception { // } // }); System.out.println("开始打印"); lineLengths.collect().forEach( new Consumer<Integer>(){ @Override public void accept(Integer t) { System.out.println( t); } }); System.out.println("结束打印"); } public static void printStr(JavaRDD<String> lineLengths ){ System.out.println("开始打印"); lineLengths.collect().forEach( new Consumer<String>(){ @Override public void accept(String t) { System.out.println( t); } }); System.out.println("结束打印"); } /** * 打印tuble * @param lineLengths */ public static void printPair( JavaPairRDD<String, Integer> rdd ){ System.out.println("开始打印"); rdd.collect().forEach( new Consumer<Tuple2<String,Integer>>(){ @Override public void accept(Tuple2<String, Integer> t) { System.out.println( t._1() +" "+t._2() ); } }); System.out.println("结束打印"); } public static void print(List list) { if (list == null || list.size() == 0) { return; } for (int i = 0; i < list.size(); i++) { System.out.println(list.get(i)); } } /** * 打印map的RDD * @param listTuple */ public static void printTuple(List<Tuple2<String, Integer>> listTuple) { if (listTuple == null || listTuple.size() == 0) { return; } for (int i = 0; i < listTuple.size(); i++) { System.out.println(listTuple.get(i)); } }
十六、过滤
//对每一个元素进行过滤,然后返回 ,false 的数据会被过滤掉 public static JavaRDD<Integer> filter(JavaRDD<Integer> rdd) { JavaRDD<Integer> lineLengths = rdd.filter(new Function<Integer, Boolean>() { @Override public Boolean call(Integer v1) throws Exception { if (v1 != 1) { return true; } return false; } }); print( lineLengths); return lineLengths; }