SparkCore系列(二)rdd聚合操作,rdd之间聚合操作

一:rdd聚合操作

count

            val conf = new SparkConf().setAppName("HelloWorld").setMaster("local")
            val sc = new JavaSparkContext(conf).sc

            val dataLength = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).count()//相当于数组的length

            println(dataLength)

countByValue

            val initialScores1: Array[(String, Double)] =
            Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
            val data1 = sc.parallelize(initialScores1)
            println(data1.countByValue) // 以当前值作为key计数

reduce

            val conf = new SparkConf().setAppName("HelloWorld").setMaster("local")
            val sc = new JavaSparkContext(conf).sc

            val dataLength = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>x.toInt).reduce((x,y)=>x+y)//相当于数组的sum

            println(dataLength)

reduceByKey

            val avg = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>(x.toInt,1))
                        .reduceByKey((x,y)=>x+y).collect().map(x=>println(x)) //reduceByKey现在map端进行聚合,在真正开发过程中也常用

sortByKey

            val conf = new SparkConf().setAppName("HelloWorld").setMaster("local")
            val sc = new JavaSparkContext(conf).sc

            val data = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>(x.toInt,1)).sortByKey(true)//true正序,false倒序

            println(data.collect().map(x=>println(x)))

countByKey生产一般不用

            val data = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>(x.toInt,1))
                        .countByKey() //map结构 key->key value->的个数

            println(data)

collectAsMap生产一般不用

            val data = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>(x.toInt,1))
                        .collectAsMap() //map结构   

            println(data)

flod

            val data = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>x.toInt)
                        .fold(100)((x,y)=>x+y)//带初始值的聚合

            println(data)

groupByKey

            val avg = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>(x.toInt,1))
                        .groupByKey().collect().map(x=>println(x))//value 是一个数组,需要循环value时候使用

aggregate

            //自定义聚合函数
            //第一个参数 两个函数都会以2为参数算一遍
            //第二个参数 文件内部行与行之间操作
            //第三个参数 文件结果 操作
            val sum = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>x.toInt)
                        .aggregate(2)(pfun1,pfun2)
                        println(sum)

            def pfun1(p1: Int, p2: Int): Int = {//行与行之间操作
                        println("p1"+p1+" p2:"+p2)
                        p1 * p2
            }
            def pfun2(p3: Int, p4: Int): Int = {//文件之间结果操作
                        p3 + p4
            }                           //sum

            def pfun1(p1:Tuple2[Int,Int], p2: Int): Tuple2[Int,Int] = {//行与行之间操作
                        (p1._1 + 1,p1._2 + p2)
            }
            def pfun2(p1:Tuple2[Int,Int], p2: Tuple2[Int,Int]): Tuple2[Int,Int] = {//文件之间结果操作
                        (p1._1 + p2._1,p1._2 + p2._2)
            }
            val avg = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>x.toInt)
                        .aggregate(0,0)(pfun1,pfun2)
            println(avg._2/avg._1)                           //avg

combineByKey

            type MVType = (Int, Int)
            val avg = sc.textFile("/software/java/idea/data")
                        .flatMap(x=>x.split("\\|")).map(x=>(x.toInt,1))
                        .combineByKey(
                                    score => (score,1), //创建元素
                                    (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), //处理已经遇到的键
                                    (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) //处理已经未遇到的键
                        ).collect().map(x=>println(x))//value 是一个数组,需要循环value时候使用

            //aggregate功能很类似

二:rdd聚合操作

union

            val initialScores1 = Array(("A", 88.0), ("B", 95.0), ("C", 91.0))
            val data1 = sc.parallelize(initialScores1)

            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)

            data1.union(data2).collect().map(x=> println(x)) //SQL中UNION

intersection

            val initialScores1 = Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
            val data1 = sc.parallelize(initialScores1)

            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)

            data1.intersection(data2).collect().map(x=> println(x)) //SQL INNER JOIN

join

            val initialScores1: Array[(String, Double)] =
            Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
            val data1 = sc.parallelize(initialScores1)

            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)

            data1.join(data2).collect().map(x=>println(x))
            //SQL INNER JOIN

subtract

            val initialScores1 = Array(("A", 88.0), ("B", 95.0), ("C", 91.0),(
            val data1 = sc.parallelize(initialScores1)
            
            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)

            data1.subtract(data2).collect().map(x=> println(x)) //LEFT ANTI

subtractByKey

            val initialScores1: Array[(String, Double)] =
            Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
            val data1 = sc.parallelize(initialScores1)

            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)

            data1.subtractByKey(data2).collect().map(x=>println(x))
            //删掉rdd1中与rdd2的key相同的元素 相当于subtract

rightOuterJoin

            val initialScores1: Array[(String, Double)] =
                        Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
            val data1 = sc.parallelize(initialScores1)

            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)

            data1.rightOuterJoin(data2).collect().map(x=>println(x))
            //右外连接

leftOuterJoin

            val initialScores1: Array[(String, Double)] =
            Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
            val data1 = sc.parallelize(initialScores1)

            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)

            data1.leftOuterJoin(data2).collect().map(x=>println(x))
            //左外连接

cartesian

            val initialScores1 = Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D",
            val data1 = sc.parallelize(initialScores1)
            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)
            data1.cartesian(data2).collect().map(x=> println(x)) // key不相同的笛卡尔积

cogroup

            val initialScores1: Array[(String, Double)] =
            Array(("A", 88.0), ("B", 95.0), ("C", 91.0),("D", 93.0))
            val data1 = sc.parallelize(initialScores1)

            val initialScores2 = Array(("D", 93.0), ("E", 95.0), ("F", 98.0))
            val data2 = sc.parallelize(initialScores2)

            data1.cogroup(data2).collect().map(x=>println(x))
            //key 相同的笛卡尔积

猜你喜欢

转载自www.cnblogs.com/wuxiaolong4/p/12046673.html