Transformation转换算子
Value类型
- map()映射
- flatMap()扁平化
- groupBy()分组
- filter()过滤
- distinct()去重
- sortBy()排序
- mapPartitions() 每次处理一个分区的数据
- mapPartitionsWithIndex() 携带分区号
wordcount案例
public static void main(String[] args) throws InterruptedException {
//1. 创建spark配置
SparkConf conf = new SparkConf().setAppName("SparkCore").setMaster("local[*]");
//2.创建SparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//3.编写代码
JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("A", "B", "A", "B", "C", "A"),2);
JavaRDD<Tuple2<String, Integer>> map = javaRDD.map(s -> new Tuple2<String, Integer>(s, 1));
// map.collect().forEach(System.out::println);
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> groupBy = map.groupBy(v1 -> v1._1);
List<Tuple2<String, Iterable<Tuple2<String, Integer>>>> collect = groupBy.collect();
groupBy.map(stringIterableTuple2 -> {
int sum=0;
for (Tuple2<String, Integer> elem : stringIterableTuple2._2) {
sum+=elem._2;
}
return new Tuple2<String, Integer>(stringIterableTuple2._1, sum);
}).collect().forEach(System.out::println);
Thread.sleep(99999);
//4.关闭资源
sc.stop();
}
Key-Value类型
要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD
- mapToPair 将JavaRDD转化为JavaPairRDD
- sc.parallelizePairs(list) 将元素类型为Tuple2的集合创建为JavaPairRDD
Key-Value类型 转换算子
- mapValues()只对V进行操作
- groupByKey()按照K重新分组(没有预聚合)
- reduceByKey()按照K聚合V (有预聚合)
- sortByKey()按照K进行排序
求平均值案例
public class Test06_reduceByKey_avg {
public static void main(String[] args) {
//1. 创建spark配置
SparkConf conf = new SparkConf().setAppName("SparkCore").setMaster("local[*]");
//2.创建SparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//3.编写代码
ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2<>("A", 3));
list.add(new Tuple2<>("B", 6));
list.add(new Tuple2<>("A", 8));
list.add(new Tuple2<>("B", 2));
list.add(new Tuple2<>("B", 4));
list.add(new Tuple2<>("C", 34));
list.add(new Tuple2<>("D", 45));
list.add(new Tuple2<>("C", 5));
list.add(new Tuple2<>("C", 5));
list.add(new Tuple2<>("D", 6));
list.add(new Tuple2<>("A", 3));
list.add(new Tuple2<>("B", 2));
list.add(new Tuple2<>("A", 1));
JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(list, 1);
pairRDD
.mapValues(integer -> new Tuple2<Integer,Integer>(integer, 1))
.reduceByKey((t1,t2)->new Tuple2<Integer,Integer>(t1._1+t2._1,t1._2+t2._2))
.mapValues(v1 -> Double.parseDouble(String.format("%.2f",v1._1.doubleValue()/v1._2)))
.collect()
.forEach(System.out::println);
//4.关闭资源
sc.stop();
}
}
Action行动算子
- collect()以数组的形式返回数据集
- count()返回RDD中元素个数
- first()返回RDD中的第一个元素
- take()返回由RDD前n个元素组成的数组
- countByKey()统计每种key的个数
save相关算子
- saveAsTextFile(path)保存成Text文件
- saveAsObjectFile(path) 序列化成对象保存到文件
- foreach()遍历RDD中每一个元素
- foreachPartition ()遍历RDD中每一个分区
action算子案例:
public class Test01_action {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME","atguigu");
//1. 创建spark配置
SparkConf conf = new SparkConf().setAppName("SparkCore").setMaster("local[2]");
//2.创建SparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
//3.编写代码
JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("A", "B", "A", "B", "C", "A"));
ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2<>("A",1));
list.add(new Tuple2<>("B",2));
list.add(new Tuple2<>("C",5));
list.add(new Tuple2<>("B",3));
list.add(new Tuple2<>("D",4));
list.add(new Tuple2<>("E",6));
list.add(new Tuple2<>("A",1));
JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(list);
System.out.println("----------------------count-----------------------");
System.out.println(javaRDD.count());
System.out.println(javaPairRDD.count());
System.out.println("----------------------first-----------------------");
System.out.println(javaRDD.first());
System.out.println(javaPairRDD.first());
System.out.println("----------------------take-----------------------");
System.out.println(javaRDD.take(3));
System.out.println(javaPairRDD.take(3));
System.out.println("----------------------countByKey-----------------------");
System.out.println(javaRDD.countByValue());
System.out.println(javaPairRDD.countByKey());
System.out.println("----------------------saveAsTextFile-----------------------");
/**
windows写hdfs hdfs:/hadoop102:8020/output 或者把hdfs-site.xml 和 core-site.xml 放入
linux写hdfs 1.直接写hdfs目录 2. hdfs://hadoop102:8020/output
linux 写本地文件系统 file:///opt/module/data
*/
javaRDD.saveAsTextFile("hdfs://hadoop102:8020/output1");
// javaRDD.saveAsTextFile("file:///opt/data/hdfs");
System.out.println("----------------------saveAsObjectFile-----------------------");
javaRDD.saveAsObjectFile("output");
//4.关闭资源
sc.stop();
}
}