spark学习记录(四、算子(函数))

1.Transformations转换算子

Transformations类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

  • filter:过滤符合条件的记录数,true保留,false过滤掉。
  • map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。输入一条,输出一条数据。
  • flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
  • sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
  • reduceByKey:将相同的Key根据相应的逻辑进行处理。
  • sortByKey/sortBy:作用在K,V格式的RDD上,对key进行升序或者降序排序。
  • parallelize/makeRDD:将集合转为RDD格式,并指定分区数

2.Action行动算子

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

  • take(n):返回一个包含数据集前n个元素的集合。
  • count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。
  • first:first=take(1),返回数据集中的第一个元素。
  • foreach:循环遍历数据集中的每个元素,运行相应的逻辑。
  • collect:将计算结果回收到Driver端。
  • countByKey:作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。
  • countByValue:根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
  • reduce:根据聚合逻辑聚合数据集中的每个元素。
public class JavaWordCount2 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("JavaWordCount");

        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("C:/words.txt");

        JavaRDD<String> filter = lines.filter(new Function<String, Boolean>() {
            public Boolean call(String s) throws Exception {
                System.out.println("***************************");
                return s.equals("hello spark");
            }
        });
//        filter.foreach(new VoidFunction<String>() {
//            public void call(String s) throws Exception {
//                System.out.println(s);
//            }
//        });
        sc.stop();
    }
}

3.持久化算子

持久化算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

  • cache:默认将RDD的数据持久化到内存中。cache是懒执行。
  • persist:可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2”表示有副本数。
  • checkpoint:checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。

注意:

3.1 cache和persist的注意事项:

  1. cache和persist都是懒执行,必须有一个action类算子触发执行。
  2. cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
  3. cache和persist算子后不能立即紧跟action算子。
  4. persist如果数据存在磁盘上,当运行程序执行完后自动回收,checkpoint 则不会回收。

3.2 checkpoint 的执行原理 

  1. 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
  2. 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
  3. Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。

3.3 对checkpoint 优化 

对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。

4. 连接类型

public class JavaJoinTest {
    public static void main(final String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String, String>("Sam", "a"),
                new Tuple2<String, String>("Tom", "b"),
                new Tuple2<String, String>("marry", "c")));
        JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String, Integer>("Sam", 100),
                new Tuple2<String, Integer>("Tom", 200),
                new Tuple2<String, Integer>("Jetty", 300)));
        /**
         * join,join要作用在k,v格式的rdd上,按照两个RDD的key关联
         * join后的分区数与父RDD分区数多的那一个相同
         */
//        JavaPairRDD<String, Tuple2<String, Integer>> join1 = rdd1.join(rdd2);
//        JavaPairRDD<String, Tuple2<String, Optional<Integer>>> join2 = rdd1.leftOuterJoin(rdd2);
//        JavaPairRDD<String, Tuple2<Optional<String>, Integer>> join3 = rdd1.rightOuterJoin(rdd2);
        JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> join4 = rdd1.fullOuterJoin(rdd2);
        join4.foreach(new VoidFunction<Tuple2<String, Tuple2<Optional<String>, Optional<Integer>>>>() {
            public void call(Tuple2<String, Tuple2<Optional<String>, Optional<Integer>>> stringTuple2Tuple2) throws Exception {
                System.out.println(stringTuple2Tuple2);
            }
        });
        sc.stop();
    }
}

join: 

 fullOuterJoin:

扫描二维码关注公众号,回复: 4893085 查看本文章

leftOuterJoin:

rightOuterJoin:

public class JavaJoinTest {
    public static void main(final String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a", "a", "b", "b", "c", "d", "e"));

        /**
         * 去重,distinct相当于map+reduceByKey+map操作
         */
        JavaRDD<String> distinct = rdd.distinct();
        distinct.foreach(new VoidFunction<String>() {
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        sc.stop();
    }
}
public class JavaJoinTest {
    public static void main(final String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("test");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String, String>("Sam", "a"),
                new Tuple2<String, String>("Tom", "b"),
                new Tuple2<String, String>("marry", "c")));
        JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String, Integer>("Sam", 100),
                new Tuple2<String, Integer>("Tom", 200),
                new Tuple2<String, Integer>("Jetty", 300)));
        JavaPairRDD<String, String> rdd3 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String, String>("Sam", "a"),
                new Tuple2<String, String>("Tom", "b"),
                new Tuple2<String, String>("Jetty", "300")));

        /**
         * union 合并RDD,类型要一致
         * 返回新的RDD的分区数是合并RDD分区数的总和
         */
        JavaPairRDD<String, String> union = rdd1.union(rdd3);
        /**
         * intersection取两个数据集的交集
         */
        JavaPairRDD<String, String> intersection = rdd1.intersection(rdd3);
        /**
         * subtract 取两个数据集的差集
         */
        JavaPairRDD<String, String> subtract = rdd1.subtract(rdd3);
        /**
         * cogroup 当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))
         */
        JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<String>>> cogroup = rdd1.cogroup(rdd3);
        /**
         * map是一条条处理数据,mapPartitions是一个个分区处理数据。mapPartitions可以完全代替map
         */
        JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e", "f"),2);
        parallelize.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            public Iterator<String> call(Iterator<String> stringIterator) throws Exception {
                List<String> list = new ArrayList<String>();
                System.out.println("创建数据库连接。。。");
                while (stringIterator.hasNext()){
                    String s = stringIterator.next();
                    System.out.println("拼接数据。。。"+s);
                    list.add(s);
                }
                System.out.println("关闭数据库。。。");
                return list.iterator();
            }
        }).collect();
        /**
         * foreach是一条条处理数据,foreachPartition是一个个分区处理数据
         * 如果数据操作完要生成rdd则用mapPartitions,否则用foreachPartition
         */
        parallelize.foreachPartition(new VoidFunction<Iterator<String>>() {
            public void call(Iterator<String> stringIterator) throws Exception {
                List<String> list = new ArrayList<String>();
                System.out.println("创建数据库连接。。。");
                while (stringIterator.hasNext()){
                    String s = stringIterator.next();
                    System.out.println("拼接数据。。。"+s);
                    list.add(s);
                }
                System.out.println("关闭数据库。。。");
            }
        });

        sc.stop();
    }
}

5.补充算子 

public class JavaExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("JavaExample");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("love1","love2","love3","love4",
                "love5","love6","love7","love8",
                "love9","love10","love11","love12"),3);

        /**
         * mapPartitionWithIndex:会将RDD中的partition索引下标带出来,index是partition的索引下标
         */
        JavaRDD<String> mapPartitionsWithIndex = rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
                List<String> list = new ArrayList<String>();

                while (iterator.hasNext()) {
                    String one = iterator.next();
                    System.out.println("partition index = " + index + ", value = " + one);
                    list.add(one);
                }
                return list.iterator();
            }
        }, true);

        /**
         * repartition:有shuffle的算子,对RDD进行重新分区,可以增加分区或减少分区
         */
        JavaRDD<String> rdd2 = mapPartitionsWithIndex.repartition(4);
        JavaRDD<String> repartition = rdd2.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
                List<String> list = new ArrayList<String>();

                while (iterator.hasNext()) {
                    String one = iterator.next();
                    System.out.println("partition index = " + index + ", value = " + one);
                    list.add(one);
                }
                return list.iterator();
            }
        }, true);

        /**
         * coalesce
         * coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
         * true为产生shuffle,false不产生shuffle。默认是false。
         * 如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,
         * 如果设置成true,效果和repartition一样。
         */
        JavaRDD<String> rdd3 = mapPartitionsWithIndex.coalesce(2, true);
        JavaRDD<String> coalesce = rdd3.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
                List<String> list = new ArrayList<String>();

                while (iterator.hasNext()) {
                    String one = iterator.next();
                    System.out.println("partition index = " + index + ", value = " + one);
                    list.add(one);
                }
                return list.iterator();
            }
        }, true);

        coalesce.collect();
        sc.stop();
    }
}
public class JavaExample {
    public static void main(final String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("JavaExample");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String, String>("Sam", "18"),
                new Tuple2<String, String>("Sam", "180"),
                new Tuple2<String, String>("Jetty", "19"),
                new Tuple2<String, String>("Jetty", "190"),
                new Tuple2<String, String>("Tom", "100"),
                new Tuple2<String, String>("Tom", "200")),2);
        JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String, Integer>("Sam", 100),
                new Tuple2<String, Integer>("Sam", 200),
                new Tuple2<String, Integer>("Jetty", 300),
                new Tuple2<String, Integer>("Jetty", 400),
                new Tuple2<String, Integer>("Tom", 500),
                new Tuple2<String, Integer>("Tom", 600)));
        /**
         * groupByKey:作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)
         */
        JavaPairRDD<String, Iterable<String>> groupByKey = rdd1.groupByKey();
        /**
         * zip:将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同
         */
        JavaPairRDD<Tuple2<String, String>, Tuple2<String, Integer>> zip = rdd1.zip(rdd2);
        /**
         * zipWithIndex:该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
         */
        JavaPairRDD<Tuple2<String, String>, Long> zipWithIndex = rdd1.zipWithIndex();

        zipWithIndex.foreach(new VoidFunction<Tuple2<Tuple2<String, String>, Long>>() {
            public void call(Tuple2<Tuple2<String, String>, Long> tuple2LongTuple2) throws Exception {
                System.out.println(tuple2LongTuple2);
            }
        });
        sc.stop();
    }
}

猜你喜欢

转载自blog.csdn.net/qq_33283652/article/details/85338204