1.Transformations转换算子
Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。
- filter:过滤符合条件的记录数,true保留,false过滤掉。
- map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。输入一条,输出一条数据。
- flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
- sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
- reduceByKey:将相同的Key根据相应的逻辑进行处理。
- sortByKey/sortBy:作用在K,V格式的RDD上,对key进行升序或者降序排序。
- parallelize/makeRDD:将集合转为RDD格式,并指定分区数
2.Action行动算子
Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。
- take(n):返回一个包含数据集前n个元素的集合。
- count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。
- first:first=take(1),返回数据集中的第一个元素。
- foreach:循环遍历数据集中的每个元素,运行相应的逻辑。
- collect:将计算结果回收到Driver端。
- countByKey:作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。
- countByValue:根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
- reduce:根据聚合逻辑聚合数据集中的每个元素。
public class JavaWordCount2 {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("JavaWordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C:/words.txt");
JavaRDD<String> filter = lines.filter(new Function<String, Boolean>() {
public Boolean call(String s) throws Exception {
System.out.println("***************************");
return s.equals("hello spark");
}
});
// filter.foreach(new VoidFunction<String>() {
// public void call(String s) throws Exception {
// System.out.println(s);
// }
// });
sc.stop();
}
}
3.持久化算子
持久化算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
- cache:默认将RDD的数据持久化到内存中。cache是懒执行。
- persist:可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
- checkpoint:checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。
注意:
3.1 cache和persist的注意事项:
- cache和persist都是懒执行,必须有一个action类算子触发执行。
- cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
- cache和persist算子后不能立即紧跟action算子。
- persist如果数据存在磁盘上,当运行程序执行完后自动回收,checkpoint 则不会回收。
3.2 checkpoint 的执行原理
- 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
- 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
- Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
3.3 对checkpoint 优化
对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
4. 连接类型
public class JavaJoinTest {
public static void main(final String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, String>("Sam", "a"),
new Tuple2<String, String>("Tom", "b"),
new Tuple2<String, String>("marry", "c")));
JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, Integer>("Sam", 100),
new Tuple2<String, Integer>("Tom", 200),
new Tuple2<String, Integer>("Jetty", 300)));
/**
* join,join要作用在k,v格式的rdd上,按照两个RDD的key关联
* join后的分区数与父RDD分区数多的那一个相同
*/
// JavaPairRDD<String, Tuple2<String, Integer>> join1 = rdd1.join(rdd2);
// JavaPairRDD<String, Tuple2<String, Optional<Integer>>> join2 = rdd1.leftOuterJoin(rdd2);
// JavaPairRDD<String, Tuple2<Optional<String>, Integer>> join3 = rdd1.rightOuterJoin(rdd2);
JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> join4 = rdd1.fullOuterJoin(rdd2);
join4.foreach(new VoidFunction<Tuple2<String, Tuple2<Optional<String>, Optional<Integer>>>>() {
public void call(Tuple2<String, Tuple2<Optional<String>, Optional<Integer>>> stringTuple2Tuple2) throws Exception {
System.out.println(stringTuple2Tuple2);
}
});
sc.stop();
}
}
join:
fullOuterJoin:
扫描二维码关注公众号,回复:
4893085 查看本文章
leftOuterJoin:
rightOuterJoin:
public class JavaJoinTest {
public static void main(final String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a", "a", "b", "b", "c", "d", "e"));
/**
* 去重,distinct相当于map+reduceByKey+map操作
*/
JavaRDD<String> distinct = rdd.distinct();
distinct.foreach(new VoidFunction<String>() {
public void call(String s) throws Exception {
System.out.println(s);
}
});
sc.stop();
}
}
public class JavaJoinTest {
public static void main(final String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, String>("Sam", "a"),
new Tuple2<String, String>("Tom", "b"),
new Tuple2<String, String>("marry", "c")));
JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, Integer>("Sam", 100),
new Tuple2<String, Integer>("Tom", 200),
new Tuple2<String, Integer>("Jetty", 300)));
JavaPairRDD<String, String> rdd3 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, String>("Sam", "a"),
new Tuple2<String, String>("Tom", "b"),
new Tuple2<String, String>("Jetty", "300")));
/**
* union 合并RDD,类型要一致
* 返回新的RDD的分区数是合并RDD分区数的总和
*/
JavaPairRDD<String, String> union = rdd1.union(rdd3);
/**
* intersection取两个数据集的交集
*/
JavaPairRDD<String, String> intersection = rdd1.intersection(rdd3);
/**
* subtract 取两个数据集的差集
*/
JavaPairRDD<String, String> subtract = rdd1.subtract(rdd3);
/**
* cogroup 当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))
*/
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<String>>> cogroup = rdd1.cogroup(rdd3);
/**
* map是一条条处理数据,mapPartitions是一个个分区处理数据。mapPartitions可以完全代替map
*/
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e", "f"),2);
parallelize.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
public Iterator<String> call(Iterator<String> stringIterator) throws Exception {
List<String> list = new ArrayList<String>();
System.out.println("创建数据库连接。。。");
while (stringIterator.hasNext()){
String s = stringIterator.next();
System.out.println("拼接数据。。。"+s);
list.add(s);
}
System.out.println("关闭数据库。。。");
return list.iterator();
}
}).collect();
/**
* foreach是一条条处理数据,foreachPartition是一个个分区处理数据
* 如果数据操作完要生成rdd则用mapPartitions,否则用foreachPartition
*/
parallelize.foreachPartition(new VoidFunction<Iterator<String>>() {
public void call(Iterator<String> stringIterator) throws Exception {
List<String> list = new ArrayList<String>();
System.out.println("创建数据库连接。。。");
while (stringIterator.hasNext()){
String s = stringIterator.next();
System.out.println("拼接数据。。。"+s);
list.add(s);
}
System.out.println("关闭数据库。。。");
}
});
sc.stop();
}
}
5.补充算子
public class JavaExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("JavaExample");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("love1","love2","love3","love4",
"love5","love6","love7","love8",
"love9","love10","love11","love12"),3);
/**
* mapPartitionWithIndex:会将RDD中的partition索引下标带出来,index是partition的索引下标
*/
JavaRDD<String> mapPartitionsWithIndex = rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
List<String> list = new ArrayList<String>();
while (iterator.hasNext()) {
String one = iterator.next();
System.out.println("partition index = " + index + ", value = " + one);
list.add(one);
}
return list.iterator();
}
}, true);
/**
* repartition:有shuffle的算子,对RDD进行重新分区,可以增加分区或减少分区
*/
JavaRDD<String> rdd2 = mapPartitionsWithIndex.repartition(4);
JavaRDD<String> repartition = rdd2.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
List<String> list = new ArrayList<String>();
while (iterator.hasNext()) {
String one = iterator.next();
System.out.println("partition index = " + index + ", value = " + one);
list.add(one);
}
return list.iterator();
}
}, true);
/**
* coalesce
* coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
* true为产生shuffle,false不产生shuffle。默认是false。
* 如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,
* 如果设置成true,效果和repartition一样。
*/
JavaRDD<String> rdd3 = mapPartitionsWithIndex.coalesce(2, true);
JavaRDD<String> coalesce = rdd3.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
List<String> list = new ArrayList<String>();
while (iterator.hasNext()) {
String one = iterator.next();
System.out.println("partition index = " + index + ", value = " + one);
list.add(one);
}
return list.iterator();
}
}, true);
coalesce.collect();
sc.stop();
}
}
public class JavaExample {
public static void main(final String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("JavaExample");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, String>("Sam", "18"),
new Tuple2<String, String>("Sam", "180"),
new Tuple2<String, String>("Jetty", "19"),
new Tuple2<String, String>("Jetty", "190"),
new Tuple2<String, String>("Tom", "100"),
new Tuple2<String, String>("Tom", "200")),2);
JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<String, Integer>("Sam", 100),
new Tuple2<String, Integer>("Sam", 200),
new Tuple2<String, Integer>("Jetty", 300),
new Tuple2<String, Integer>("Jetty", 400),
new Tuple2<String, Integer>("Tom", 500),
new Tuple2<String, Integer>("Tom", 600)));
/**
* groupByKey:作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)
*/
JavaPairRDD<String, Iterable<String>> groupByKey = rdd1.groupByKey();
/**
* zip:将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同
*/
JavaPairRDD<Tuple2<String, String>, Tuple2<String, Integer>> zip = rdd1.zip(rdd2);
/**
* zipWithIndex:该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
*/
JavaPairRDD<Tuple2<String, String>, Long> zipWithIndex = rdd1.zipWithIndex();
zipWithIndex.foreach(new VoidFunction<Tuple2<Tuple2<String, String>, Long>>() {
public void call(Tuple2<Tuple2<String, String>, Long> tuple2LongTuple2) throws Exception {
System.out.println(tuple2LongTuple2);
}
});
sc.stop();
}
}