spark常见算子操作

package com.yzc.lilei.spark.transformoperate;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


/**
 * 常见算子操作
 * @author lilei
 * @description
 * @Date 2018/11/22 14:13
 * @Version 1.0
 **/
public class TransformOperate {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("TransformOperate");
//                .setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(conf);

        map(jsc);
        System.out.println("=========================================");
        flatMap(jsc);
        System.out.println("=========================================");
        mapPartiions(jsc);
        System.out.println("=========================================");
        glom(jsc);
        System.out.println("=========================================");
        union(jsc);
        System.out.println("=========================================");
        mapPartitionsWithIndex(jsc);
        System.out.println("=========================================");
        sample(jsc);
        System.out.println("=========================================");
        intersection(jsc);
        System.out.println("=========================================");
        distinct(jsc);
        System.out.println("=========================================");
        groupByKey(jsc);
        System.out.println("=========================================");
        reduceByKey(jsc);
        System.out.println("=========================================");
        aggregateByKey(jsc);
        System.out.println("=========================================");
        sortByKey(jsc);
        System.out.println("=========================================");
        join(jsc);
        System.out.println("=========================================");
        cogroup(jsc);
        System.out.println("=========================================");
        cartesian(jsc);
        System.out.println("=========================================");
        coalesce(jsc);
        System.out.println("=========================================");
        repartition(jsc);

        jsc.close();
    }

    /**
     * repartition算子操作 可以增加或者减少分区数量
     * @param jsc
     */
    public static void repartition(JavaSparkContext jsc){
        List<String> staffList = Arrays.asList("张三", "李四", "王二", "麻子",
                "赵六", "王五", "李大个", "王大妞", "小明", "小倩");
        JavaRDD<String> staffRDD = jsc.parallelize(staffList, 3);
        JavaRDD<String> rdd1 = staffRDD.mapPartitionsWithIndex((index, iterator) -> {
            List<String> list = new ArrayList<>();

            while (iterator.hasNext()) {
                String staff = iterator.next();
                list.add("repartition1::::分区[" + (index + 1) + "], " + staff);
            }
            return list.iterator();
        }, true);

        rdd1.foreach(tuple -> System.out.println(tuple));


        JavaRDD<String> rdd2 = rdd1.repartition(6);

        JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex((index, iterator) -> {
            List<String> list = new ArrayList<>();

            while (iterator.hasNext()) {
                String staff = iterator.next();
                list.add("repartition2::::分区[" + (index + 1) + "], " + staff);
            }
            return list.iterator();
        }, true);

        rdd3.foreach(tuple -> System.out.println(tuple));
    }

    /**
     *  coalesce算子操作 用来减少分区,聚合数据
     * @param jsc
     */
    public static void coalesce(JavaSparkContext jsc){
        List<String> staffList = Arrays.asList("张三", "李四", "王二", "麻子",
                "赵六", "王五", "李大个", "王大妞", "小明", "小倩");
        JavaRDD<String> staffRDD = jsc.parallelize(staffList, 6);

        JavaRDD<String> rdd1 = staffRDD.mapPartitionsWithIndex((index, iterator) -> {
            List<String> list = new ArrayList<>();

            while (iterator.hasNext()) {
                String staff = iterator.next();
                list.add("coalesce1::::分区[" + (index + 1) + "], " + staff);
            }
            return list.iterator();
        }, true);

        rdd1.foreach(tuple -> System.out.println(tuple));


        JavaRDD<String> rdd2 = rdd1.coalesce(3);

        JavaRDD<String> rdd3 = rdd2.mapPartitionsWithIndex((index, iterator) -> {
            List<String> list = new ArrayList<>();

            while (iterator.hasNext()) {
                String staff = iterator.next();
                list.add("coalesce2::::分区[" + (index + 1) + "], " + staff);
            }
            return list.iterator();
        }, true);

        rdd3.foreach(tuple -> System.out.println(tuple));
    }

    /**
     * cartesian 算子操作 笛卡尔积操作
     * @param jsc
     */
    public static void cartesian(JavaSparkContext jsc){
        List<String> clothes = Arrays.asList("夹克", "T恤", "皮衣", "风衣");
        JavaRDD<String> clothesRDD = jsc.parallelize(clothes);

        List<String> trousers = Arrays.asList("皮裤", "运动裤", "牛仔裤", "休闲裤");

        JavaRDD<String> trousersRDD = jsc.parallelize(trousers);

        JavaPairRDD<String, String> pairsRDD = clothesRDD.cartesian(trousersRDD);

        /**
         * 打印结果
         */
        pairsRDD.foreach(tuple -> System.out.println("cartesian:"+tuple));

    }

    /**
     * cogroup 算子操作
     * @param jsc
     */
    public static void cogroup(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog2");
        list1.add("dog2");
        list1.add("dog2");
        list2.add("dog3");
        list2.add("dog3");
        list2.add("dog3");
        list1.add("salmon");
        list2.add("salmon");
        list1.add("salmon1");
        list2.add("salmon1");
        list2.add("rat");
        list2.add("rat");
        list1.add("elephant");
        list2.add("elephant");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
        JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);

        JavaPairRDD<String, Integer> pairRDD1 = rdd1.mapToPair(str -> new Tuple2<>(str, 1));
        JavaPairRDD<String, Integer> pairRDD2 = rdd2.mapToPair(str -> new Tuple2<>(str, 1));

        JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupRDD = pairRDD1.cogroup(pairRDD2);

        cogroupRDD.foreach(str -> System.out.println("cogroupRDD:"+str._1+":::["+str._2._1+"="+str._2._2+"]"));
    }

    /**
     * join 算子操作
     * @param jsc
     */
    public static void join(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog2");
        list1.add("dog2");
        list1.add("dog2");
        list2.add("dog3");
        list2.add("dog3");
        list2.add("dog3");
        list1.add("salmon");
        list2.add("salmon");
        list1.add("salmon1");
        list2.add("salmon1");
        list2.add("rat");
        list2.add("rat");
        list1.add("elephant");
        list2.add("elephant");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
        JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);

        JavaPairRDD<String, Integer> pairRDD1 = rdd1.mapToPair(str -> new Tuple2<>(str, 1));
        JavaPairRDD<String, Integer> pairRDD2 = rdd2.mapToPair(str -> new Tuple2<>(str, 1));

        JavaPairRDD<String, Tuple2<Integer, Integer>> joinRDD = pairRDD1.join(pairRDD2);

        joinRDD.foreach(str -> System.out.println("joinRDD:"+str._1+":::["+str._2._1+"="+str._2._2+"]"));
    }
    /**
     * sortByKey 算子操作排序
     * @param jsc
     */
    public static void sortByKey(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog1");
        list.add("dog1");
        list.add("dog1");
        list.add("tuple2");
        list.add("tuple2");
        list.add("tuple3");
        list.add("tuple4");
        list.add("tuple4");
        list.add("tuple4");

        JavaRDD<String> rdd = jsc.parallelize(list, 3);

        /**
         * mapToPair是将每一个字母都映射成键值对的形式
         */
        JavaPairRDD<String, Integer> rdd2pairRDD = rdd.mapToPair(str -> new Tuple2<>(str, 1));

        JavaPairRDD<String, Integer> reduceByKeyRDD = rdd2pairRDD.reduceByKey((v1, v2) -> v1 + v2);

        JavaPairRDD<Integer, String> reduceByKeyPairRDD = reduceByKeyRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1));
        /**
         * sortByKey 有一个参数 默认是升序,填false是降序排列
         */
        JavaPairRDD<Integer, String> sortByKeyRDD = reduceByKeyPairRDD.sortByKey(false);

        /**
         * 打印一下 可以不用在意
         */
        sortByKeyRDD.mapToPair(tuple -> new Tuple2<>(tuple._2,tuple._1))
                .foreach(tuple -> System.out.println("sortByKeyRDD:"+tuple._1+"="+tuple._2));
    }

    /**
     * aggregateByKey 算子操作
     * @param jsc
     */
    public static void aggregateByKey(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog1");
        list.add("dog1");
        list.add("dog1");
        list.add("tuple2");
        list.add("tuple2");
        list.add("tuple3");
        list.add("tuple4");
        list.add("tuple4");
        list.add("tuple4");

        JavaRDD<String> rdd = jsc.parallelize(list, 3);
        JavaPairRDD<String, Integer> rdd2pairRDD = rdd.mapToPair(str -> new Tuple2<>(str, 1));
        /**
         * 三个参数,第一个参数是初始值,第二个参数是分区本地合并计算函数,
         *  第三个参数会触发shuffle操作 分区间进行合并计算函数
         */
        JavaPairRDD<String, Integer> aggregateByKeyRDD = rdd2pairRDD.aggregateByKey(0, (v1, v2) -> v1 + v2, (v1, v2) -> v1 + v2);
        aggregateByKeyRDD.foreach(tuple -> System.out.println("aggregateByKeyRDD:"+tuple._1+"="+tuple._2));
    }

    /**
     * map算子操作
     */
    public static void map(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("salmon");
        list.add("salmon111");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> mapRDD = jsc.parallelize(list, 3);
        mapRDD.zip(mapRDD.map(word -> word.length())).foreach(tuple -> System.out.println("map:"+tuple._1+"="+tuple._2));
    }

    /**
     * flatMap 算子操作
     * @param jsc
     */
    public static void flatMap(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("do g");
        list.add("sal mon");
        list.add("sal mon1");
        list.add("ra t");
        list.add("ele phant");
        JavaRDD<String> flatMapRDD = jsc.parallelize(list, 3);
        JavaRDD<String> resultRDD = flatMapRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        /**
         * 打印这个rdd里面的元素,foreach是action操作,触发任务提交
         */
        resultRDD.foreach(str -> System.out.println("flatMap:"+str));
    }

    /**
     * mapPartiions算子操作
     */
    public static void mapPartiions(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("dog1");
        list.add("dog2");
        list.add("dog3");
        list.add("salmon");
        list.add("salmon1");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> mapPartiionsRDD = jsc.parallelize(list, 3);
        JavaRDD<String> resultRDD = mapPartiionsRDD.mapPartitions(iterator -> {
            List<String> list1 = new ArrayList<>();
            while (iterator.hasNext()){
                String next = iterator.next();
                if (next.length() > 5){
                    list1.add(next);
                }
            }
            return list1.iterator();
        });
        /**
         * 打印这个rdd里面的元素,foreach是action操作,触发任务提交
         */
        resultRDD.foreach(str -> System.out.println("mapPartiions:"+str));
    }

    /**
     * mapPartitionsWithIndex 算子操作
     * @param jsc
     */
    public static void mapPartitionsWithIndex(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("dog1");
        list.add("dog2");
        list.add("dog3");
        list.add("salmon");
        list.add("salmon1");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> mapPartitionsWithIndexRDD = jsc.parallelize(list, 3);
        JavaRDD<String> resultRDD = mapPartitionsWithIndexRDD.mapPartitionsWithIndex((index, iterator) -> {
            List<String> result = new ArrayList<>();
            while (iterator.hasNext()) {
                result.add(index + "::" + iterator.next());
            }
            return result.iterator();
        }, true);
        /**
         * 打印这个rdd里面的元素,foreach是action操作,触发任务提交
         */
        resultRDD.foreach(str -> System.out.println("mapPartitionsWithIndex:"+str));

    }

    /**
     * sample 算子操作
     * @param jsc
     */
    public static void sample(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("dog1");
        list.add("dog2");
        list.add("dog3");
        list.add("salmon");
        list.add("salmon1");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> sampleRDD = jsc.parallelize(list, 3);

        //测试第一种,true的话 经过测试随机出来的元素可能重复
//        JavaRDD<String> resultRDD = sampleRDD.sample(true, 0.6);
        //测试第二种 经过测试随机出来的元素不可能重复
//        JavaRDD<String> resultRDD = sampleRDD.sample(false, 0.6);
        // 测试第三种
        JavaRDD<String> resultRDD = sampleRDD.sample(false, 0.6,9);
        /**
         * 打印这个rdd里面的元素,foreach是action操作,触发任务提交
         */
        resultRDD.foreach(str -> System.out.println("sample:"+str));

    }



    /**
     * glom 算子操作
     * @param jsc
     */
    public static void glom(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog");
        list.add("dog1");
        list.add("dog2");
        list.add("dog3");
        list.add("salmon");
        list.add("salmon1");
        list.add("rat");
        list.add("elephant");
        JavaRDD<String> glom = jsc.parallelize(list, 3);
        glom.glom().foreach(list1 -> System.out.println("glom"+list1));
    }

    /**
     * union 算子操作
     * @param jsc
     */
    public static void union(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog2");
        list2.add("dog3");
        list2.add("salmon");
        list2.add("salmon1");
        list2.add("rat");
        list2.add("elephant");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
        JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);

        JavaRDD<String> unionRDD = rdd1.union(rdd2);

        unionRDD.foreach(str -> System.out.println("union:"+str));
    }

    /**
     * intersection 算子操作
     * @param jsc
     */
    public static void intersection(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        List<String> list2 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog2");
        list2.add("dog");
        list2.add("salmon");
        list2.add("salmon1");
        list2.add("rat");
        list2.add("elephant");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);
        JavaRDD<String> rdd2 = jsc.parallelize(list2, 3);

        JavaRDD<String> intersectionRDD = rdd1.intersection(rdd2);

        intersectionRDD.foreach(str -> System.out.println("intersectionR:"+str));
    }

    /**
     * distinct 算子操作
     * @param jsc
     */
    public static void distinct(JavaSparkContext jsc){
        List<String> list1 = new ArrayList<>();
        list1.add("dog");
        list1.add("dog");
        list1.add("dog");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog1");
        list1.add("dog2");
        list1.add("dog2");
        list1.add("dog2");

        JavaRDD<String> rdd1 = jsc.parallelize(list1, 3);

        /**
         * distinct的算子有一个参数numPartitions,可不填,填的话指定多少个分区
         */
        JavaRDD<String> distinctRDD = rdd1.distinct();

        distinctRDD.foreach(str -> System.out.println("distinct:"+str));
    }

    /**
     * groupByKey 算子操作
     * @param jsc
     */
    public static void groupByKey(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog1");
        list.add("dog1");
        list.add("dog1");
        list.add("tuple2");
        list.add("tuple2");
        list.add("tuple3");
        list.add("tuple4");
        list.add("tuple4");
        list.add("tuple4");

        JavaRDD<String> rdd = jsc.parallelize(list, 3);

        JavaPairRDD<String, Integer> pairRDD = rdd.mapToPair(str -> new Tuple2(str, 1));


        /**
         * groupByKeyRDD,可不填,填的话指定多少个分区
         */
        JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();

        groupByKeyRDD.foreach(str -> System.out.println("groupByKeyRDD:"+str._1+":::"+str._2));
    }

    public static void reduceByKey(JavaSparkContext jsc){
        List<String> list = new ArrayList<>();
        list.add("dog1");
        list.add("dog1");
        list.add("dog1");
        list.add("tuple2");
        list.add("tuple2");
        list.add("tuple3");
        list.add("tuple4");
        list.add("tuple4");
        list.add("tuple4");

        JavaRDD<String> rdd = jsc.parallelize(list, 3);

        JavaPairRDD<String, Integer> reduceByKeyRDD = rdd.mapToPair(str -> new Tuple2(str, 1));


        /**
         * reduceByKey,可不填,填的话指定多少个分区
         */
        JavaPairRDD<String, Integer>  reduceByKeyPairRDD = reduceByKeyRDD.reduceByKey((v1, v2) -> v1 + v2);

        reduceByKeyPairRDD.foreach(str -> System.out.println("reduceByKeyPairRDD:"+str._1+":::"+str._2));
    }
}

猜你喜欢

转载自blog.csdn.net/u013164612/article/details/84348444