package com.yzc.lilei.spark.transformoperate;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 常见算子操作
* @author lilei
* @description
* @Date 2018/11/22 14:13
* @Version 1.0
**/
public class TransformOperate {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("TransformOperate");
// .setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);
map(jsc);
System.out.println("=========================================");
flatMap(jsc);
System.out.println("=========================================");
mapPartiions(jsc);
System.out.println("=========================================");
glom(jsc);
System.out.println("=========================================");
union(jsc);
System.out.println("=========================================");
mapPartitionsWithIndex(jsc);
System.out.println("=========================================");
sample(jsc);
System.out.println("=========================================");
intersection(jsc);
System.out.println("=========================================");
distinct(jsc);
System.out.println("=========================================");
groupByKey(jsc);
System.out.println("=========================================");
reduceByKey(jsc);
System.out.println("=========================================");
aggregateByKey(jsc);
System.out.println("=========================================");
sortByKey(jsc);
System.out.println("=========================================");
join(jsc);
System.out.println("=========================================");
cogroup(jsc);
System.out.println("=========================================");
cartesian(jsc);
System.out.println("=========================================");
coalesce(jsc);
System.out.println("=========================================");
repartition(jsc);
jsc.close();
}
/**
* repartition算子操作 可以增加或者减少分区数量
* @param jsc
*/
public static void repartition(JavaSparkContext jsc){
List<String> staffList = Arrays.asList("张三", "李四", "王二", "麻子",
"赵六", "王五", "李大个", "王大妞", "小明", "小倩");
JavaRDD<String> staffRDD = jsc.parallelize(staffList, 3);
JavaRDD<String> rdd1 = staffRDD.mapPartitionsWithIndex((index, iterator) -> {
List<String> list = new ArrayList<>();
while (iterator.hasNext()) {
String staff = iterator.next();
list.add("repartition1::::分区[" + (index + 1) + "], " + staff);
}
return list.iterator();
}, true);
rdd1.foreach(tuple -> System.out.println(tuple));
JavaRDD<String> rdd2 = rdd1.repartition(6);
JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex((index, iterator) -> {
List<String> list = new ArrayList<>();
while (iterator.hasNext()) {
String staff = iterator.next();
list.add("repartition2::::分区[" + (index + 1) + "], " + staff);
}
return list.iterator();
}, true);
rdd3.foreach(tuple -> System.out.println(tuple));
}
/**
* coalesce算子操作 用来减少分区,聚合数据
* @param jsc
*/
public static void coalesce(JavaSparkContext jsc){
List<String> staffList = Arrays.asList("张三", "李四", "王二", "麻子",
"赵六", "王五", "李大个", "王大妞", "小明", "小倩");
JavaRDD<String> staffRDD = jsc.parallelize(staffList, 6);
JavaRDD<String> rdd1 = staffRDD.mapPartitionsWithIndex((index, iterator) -> {
List<String> list = new ArrayList<>();
while (iterator.hasNext()) {
String staff = iterator.next();
list.add("coalesce1::::分区[" + (index + 1) + "], " + staff);
}
return list.iterator();
}, true);
rdd1.foreach(tuple -> System.out.println(tuple));
JavaRDD<String> rdd2 = rdd1.coalesce(3);
JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex((index, iterator) -> {
List<String> list = new ArrayList<>();
while (iterator.hasNext()) {
String staff = iterator.next();
list.add("coalesce2::::分区[" + (index + 1) + "], " + staff);
}
return list.iterator();
}, true);
rdd3.foreach(tuple -> System.out.println(tuple));
}
/**
* cartesian 算子操作 笛卡尔积操作
* @param jsc
*/
public static void cartesian(JavaSparkContext jsc){
List<String> clothes = Arrays.asList("夹克", "T恤", "皮衣", "风衣");
JavaRDD<String> clothesRDD = jsc.parallelize(clothes);
List<String> trousers = Arrays.asList("皮裤", "运动裤", "牛仔裤", "休闲裤");
JavaRDD<String> trousersRDD = jsc.parallelize(trousers);
JavaPairRDD<String, String> pairsRDD = clothesRDD.cartesian(trousersRDD);
/**
* 打印结果
*/
pairsRDD.foreach(tuple -> System.out.println("cartesian:"+tuple));
}
/**
* cogroup 算子操作
* @param jsc
*/
public static void cogroup(JavaSparkContext jsc){
List<String> list1 = new ArrayList<>();
List<String> list2 = new ArrayList<>();
list1.add("dog");
list1.add("dog");
list1.add("dog");
list1.add("dog");
list1.add("dog");
list1.add("dog1");
list1.add("dog1");
list1.add("dog1");
list1.add("dog1");
list1.add("dog2");
list1.add("dog2");
list1.add("dog2");
list2.add("dog3");
list2.add("dog3");
list2.add("dog3");
list1.add("salmon");
list2.add("salmon");
list1.add("salmon1");
list2.add("salmon1");
list2.add("rat");
list2.add("rat");
list1.add("elephant");
list2.add("elephant");
JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);
JavaPairRDD<String, Integer> pairRDD1 = rdd1.mapToPair(str -> new Tuple2<>(str, 1));
JavaPairRDD<String, Integer> pairRDD2 = rdd2.mapToPair(str -> new Tuple2<>(str, 1));
JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupRDD = pairRDD1.cogroup(pairRDD2);
cogroupRDD.foreach(str -> System.out.println("cogroupRDD:"+str._1+":::["+str._2._1+"="+str._2._2+"]"));
}
/**
* join 算子操作
* @param jsc
*/
public static void join(JavaSparkContext jsc){
List<String> list1 = new ArrayList<>();
List<String> list2 = new ArrayList<>();
list1.add("dog");
list1.add("dog");
list1.add("dog");
list1.add("dog");
list1.add("dog");
list1.add("dog1");
list1.add("dog1");
list1.add("dog1");
list1.add("dog1");
list1.add("dog2");
list1.add("dog2");
list1.add("dog2");
list2.add("dog3");
list2.add("dog3");
list2.add("dog3");
list1.add("salmon");
list2.add("salmon");
list1.add("salmon1");
list2.add("salmon1");
list2.add("rat");
list2.add("rat");
list1.add("elephant");
list2.add("elephant");
JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);
JavaPairRDD<String, Integer> pairRDD1 = rdd1.mapToPair(str -> new Tuple2<>(str, 1));
JavaPairRDD<String, Integer> pairRDD2 = rdd2.mapToPair(str -> new Tuple2<>(str, 1));
JavaPairRDD<String, Tuple2<Integer, Integer>> joinRDD = pairRDD1.join(pairRDD2);
joinRDD.foreach(str -> System.out.println("joinRDD:"+str._1+":::["+str._2._1+"="+str._2._2+"]"));
}
/**
* sortByKey 算子操作排序
* @param jsc
*/
public static void sortByKey(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog1");
list.add("dog1");
list.add("dog1");
list.add("tuple2");
list.add("tuple2");
list.add("tuple3");
list.add("tuple4");
list.add("tuple4");
list.add("tuple4");
JavaRDD<String> rdd = jsc.parallelize(list, 3);
/**
* mapToPair是将每一个字母都映射成键值对的形式
*/
JavaPairRDD<String, Integer> rdd2pairRDD = rdd.mapToPair(str -> new Tuple2<>(str, 1));
JavaPairRDD<String, Integer> reduceByKeyRDD = rdd2pairRDD.reduceByKey((v1, v2) -> v1 + v2);
JavaPairRDD<Integer, String> reduceByKeyPairRDD = reduceByKeyRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
/**
* sortByKey 有一个参数 默认是升序,填false是降序排列
*/
JavaPairRDD<Integer, String> sortByKeyRDD = reduceByKeyPairRDD.sortByKey(false);
/**
* 打印一下 可以不用在意
*/
sortByKeyRDD.mapToPair(tuple -> new Tuple2<>(tuple._2,tuple._1))
.foreach(tuple -> System.out.println("sortByKeyRDD:"+tuple._1+"="+tuple._2));
}
/**
* aggregateByKey 算子操作
* @param jsc
*/
public static void aggregateByKey(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog1");
list.add("dog1");
list.add("dog1");
list.add("tuple2");
list.add("tuple2");
list.add("tuple3");
list.add("tuple4");
list.add("tuple4");
list.add("tuple4");
JavaRDD<String> rdd = jsc.parallelize(list, 3);
JavaPairRDD<String, Integer> rdd2pairRDD = rdd.mapToPair(str -> new Tuple2<>(str, 1));
/**
* 三个参数,第一个参数是初始值,第二个参数是分区本地合并计算函数,
* 第三个参数会触发shuffle操作 分区间进行合并计算函数
*/
JavaPairRDD<String, Integer> aggregateByKeyRDD = rdd2pairRDD.aggregateByKey(0, (v1, v2) -> v1 + v2, (v1, v2) -> v1 + v2);
aggregateByKeyRDD.foreach(tuple -> System.out.println("aggregateByKeyRDD:"+tuple._1+"="+tuple._2));
}
/**
* map算子操作
*/
public static void map(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog");
list.add("salmon");
list.add("salmon111");
list.add("rat");
list.add("elephant");
JavaRDD<String> mapRDD = jsc.parallelize(list, 3);
mapRDD.zip(mapRDD.map(word -> word.length())).foreach(tuple -> System.out.println("map:"+tuple._1+"="+tuple._2));
}
/**
* flatMap 算子操作
* @param jsc
*/
public static void flatMap(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("do g");
list.add("sal mon");
list.add("sal mon1");
list.add("ra t");
list.add("ele phant");
JavaRDD<String> flatMapRDD = jsc.parallelize(list, 3);
JavaRDD<String> resultRDD = flatMapRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
/**
* 打印这个rdd里面的元素,foreach是action操作,触发任务提交
*/
resultRDD.foreach(str -> System.out.println("flatMap:"+str));
}
/**
* mapPartiions算子操作
*/
public static void mapPartiions(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog");
list.add("dog1");
list.add("dog2");
list.add("dog3");
list.add("salmon");
list.add("salmon1");
list.add("rat");
list.add("elephant");
JavaRDD<String> mapPartiionsRDD = jsc.parallelize(list, 3);
JavaRDD<String> resultRDD = mapPartiionsRDD.mapPartitions(iterator -> {
List<String> list1 = new ArrayList<>();
while (iterator.hasNext()){
String next = iterator.next();
if (next.length() > 5){
list1.add(next);
}
}
return list1.iterator();
});
/**
* 打印这个rdd里面的元素,foreach是action操作,触发任务提交
*/
resultRDD.foreach(str -> System.out.println("mapPartiions:"+str));
}
/**
* mapPartitionsWithIndex 算子操作
* @param jsc
*/
public static void mapPartitionsWithIndex(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog");
list.add("dog1");
list.add("dog2");
list.add("dog3");
list.add("salmon");
list.add("salmon1");
list.add("rat");
list.add("elephant");
JavaRDD<String> mapPartitionsWithIndexRDD = jsc.parallelize(list, 3);
JavaRDD<String> resultRDD = mapPartitionsWithIndexRDD.mapPartitionsWithIndex((index, iterator) -> {
List<String> result = new ArrayList<>();
while (iterator.hasNext()) {
result.add(index + "::" + iterator.next());
}
return result.iterator();
}, true);
/**
* 打印这个rdd里面的元素,foreach是action操作,触发任务提交
*/
resultRDD.foreach(str -> System.out.println("mapPartitionsWithIndex:"+str));
}
/**
* sample 算子操作
* @param jsc
*/
public static void sample(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog");
list.add("dog1");
list.add("dog2");
list.add("dog3");
list.add("salmon");
list.add("salmon1");
list.add("rat");
list.add("elephant");
JavaRDD<String> sampleRDD = jsc.parallelize(list, 3);
//测试第一种,true的话 经过测试随机出来的元素可能重复
// JavaRDD<String> resultRDD = sampleRDD.sample(true, 0.6);
//测试第二种 经过测试随机出来的元素不可能重复
// JavaRDD<String> resultRDD = sampleRDD.sample(false, 0.6);
// 测试第三种
JavaRDD<String> resultRDD = sampleRDD.sample(false, 0.6,9);
/**
* 打印这个rdd里面的元素,foreach是action操作,触发任务提交
*/
resultRDD.foreach(str -> System.out.println("sample:"+str));
}
/**
* glom 算子操作
* @param jsc
*/
public static void glom(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog");
list.add("dog1");
list.add("dog2");
list.add("dog3");
list.add("salmon");
list.add("salmon1");
list.add("rat");
list.add("elephant");
JavaRDD<String> glom = jsc.parallelize(list, 3);
glom.glom().foreach(list1 -> System.out.println("glom"+list1));
}
/**
* union 算子操作
* @param jsc
*/
public static void union(JavaSparkContext jsc){
List<String> list1 = new ArrayList<>();
List<String> list2 = new ArrayList<>();
list1.add("dog");
list1.add("dog1");
list1.add("dog2");
list2.add("dog3");
list2.add("salmon");
list2.add("salmon1");
list2.add("rat");
list2.add("elephant");
JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);
JavaRDD<String> unionRDD = rdd1.union(rdd2);
unionRDD.foreach(str -> System.out.println("union:"+str));
}
/**
* intersection 算子操作
* @param jsc
*/
public static void intersection(JavaSparkContext jsc){
List<String> list1 = new ArrayList<>();
List<String> list2 = new ArrayList<>();
list1.add("dog");
list1.add("dog1");
list1.add("dog2");
list2.add("dog");
list2.add("salmon");
list2.add("salmon1");
list2.add("rat");
list2.add("elephant");
JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);
JavaRDD<String> intersectionRDD = rdd1.intersection(rdd2);
intersectionRDD.foreach(str -> System.out.println("intersectionR:"+str));
}
/**
* distinct 算子操作
* @param jsc
*/
public static void distinct(JavaSparkContext jsc){
List<String> list1 = new ArrayList<>();
list1.add("dog");
list1.add("dog");
list1.add("dog");
list1.add("dog1");
list1.add("dog1");
list1.add("dog1");
list1.add("dog2");
list1.add("dog2");
list1.add("dog2");
JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
/**
* distinct的算子有一个参数numPartitions,可不填,填的话指定多少个分区
*/
JavaRDD<String> distinctRDD = rdd1.distinct();
distinctRDD.foreach(str -> System.out.println("distinct:"+str));
}
/**
* groupByKey 算子操作
* @param jsc
*/
public static void groupByKey(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog1");
list.add("dog1");
list.add("dog1");
list.add("tuple2");
list.add("tuple2");
list.add("tuple3");
list.add("tuple4");
list.add("tuple4");
list.add("tuple4");
JavaRDD<String> rdd = jsc.parallelize(list, 3);
JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(str -> new Tuple2(str, 1));
/**
* groupByKeyRDD,可不填,填的话指定多少个分区
*/
JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();
groupByKeyRDD.foreach(str -> System.out.println("groupByKeyRDD:"+str._1+":::"+str._2));
}
public static void reduceByKey(JavaSparkContext jsc){
List<String> list = new ArrayList<>();
list.add("dog1");
list.add("dog1");
list.add("dog1");
list.add("tuple2");
list.add("tuple2");
list.add("tuple3");
list.add("tuple4");
list.add("tuple4");
list.add("tuple4");
JavaRDD<String> rdd = jsc.parallelize(list, 3);
JavaPairRDD<String, Integer> reduceByKeyRDD = rdd.mapToPair(str -> new Tuple2(str, 1));
/**
* reduceByKey,可不填,填的话指定多少个分区
*/
JavaPairRDD<String, Integer> reduceByKeyPairRDD = reduceByKeyRDD.reduceByKey((v1, v2) -> v1 + v2);
reduceByKeyPairRDD.foreach(str -> System.out.println("reduceByKeyPairRDD:"+str._1+":::"+str._2));
}
}
spark常见算子操作
猜你喜欢
转载自blog.csdn.net/u013164612/article/details/84348444
今日推荐
周排行