详解 Spark 核心编程之 RDD 算子

RDD 算子就是 RDD 的方法

一、转换算子

根据数据处理方式的不同可以分为单 Value 类型、双 Value 类型和 Key-Value 类型

1. map

/**
	单 Value 类型算子
	函数签名:def map[U: ClassTag](f: T => U): RDD[U]
	功能:将处理的数据逐条进行映射转换,可以是类型的转换,也可以是值的转换
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        // 案例 1:将集合中的每个元素 * 2
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4))
        
        def mapFunc(num: Int): Int = {
    
    
            num * 2
        }
        
        val mapRdd = rdd.map(mapFunc)
        mapRdd.collect().foreach(println)
        
        // 使用匿名函数和至简原则传参
        val mapRdd2 = rdd.map(_ * 2)
        mapRdd2.collect().foreach(println)
        
        // 案例2:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径
        // apache.log 内容格式:ip - - 时间戳 时区 请求方式 URL资源路径
        val rdd2 = sc.textFile("data/apache.log")
        val mapRdd3 = rdd2.map(
        	line => {
    
    
                val dataArr = line.split(" ")
                dataArr(6)
            }
        )
        mapRdd3.collect().foreach(println)
        
        // 案例3:map 的并行计算
        val rdd3 = sc.makeRDD(List(1,2,3,4), 1)
        rdd3.map(num => {
    
    
            println(">>>> " + num)
            num
        }).map(num => {
    
    
            println("@@@@ " + num)
            num
        }).collect() 
        // 结论:同一个分区内的数据 map 执行是有序的,即前一个数据执行完全部操作后再执行下一个
        
        val rdd4 = sc.makeRDD(List(1,2,3,4), 2)
        rdd4.map(num => {
    
    
            println(">>>> " + num)
            num
        }).map(num => {
    
    
            println("@@@@ " + num)
            num
        }).collect()
        // 结论:不同分区内的数据 map 执行是无序
        
    }
}

2. mapPartitions

/**
	单 Value 类型算子
	函数签名:def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
	功能:将待处理的数据以分区为单位发送到计算节点进行处理,处理是指可以进行任意的处理,即使是过滤数据
	缺点:该算子会将分区的整个数据加载到内存中并引用,由于存在引用,所以数据处理完不会被释放掉,在内存较小且数据量较大的情况下会造成内存溢出
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        // 案例 1:将集合中的每个元素 * 2
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val mpRdd = rdd.mapPartitions(iter => {
    
    
            println(">>>>>") // 执行次数与分区数一致
            iter.map(_ * 2)
        })
        
        mpRdd.collect().foreach(println)
        
        // 案例2:获取每个数据分区的最大值
        val mpRdd2 = rdd.mapPartitions(iter => {
    
    
            val maxValue = iter.max()
            List(maxValue).iterator
        })
        mpRdd2.collect().foreach(println)
        
    }
}

3. map VS mapPartitions

  • 数据处理角度:map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作

  • 功能的角度:map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。mapPartitions 算子是传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

  • 性能的角度:map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,可能会导致内存溢出,所以在内存有限的情况下,推荐使用 map 操作

4. mapPartitionsWithIndex

/**
	单 Value 类型算子
	函数签名:def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
	功能:将待处理的数据以分区为单位发送到计算节点进行处理,在处理时同时可以获取当前分区索引
	
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        // 案例 1:输出第二个分区中集合的数据
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        rdd.mapPartitionsWithIndex((index, iter) => {
    
    
            if(index == 1) iter else Nil.iterator
        }).collect().foreach(println)
     	
        // 案例2:将集合中的元素及其分区显示输出
        val rdd2 = sc.makeRDD(List(1,2,3,4))
        rdd2.mapPartitionsWithIndex((index, iter) => {
    
    
            iter.map(num => (index, num))
        }).collect().foreach(println)
        
    }
}

5. flatMap

/**
	单 Value 类型算子
	函数签名:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
	功能:将处理的数据进行扁平化后再进行映射处理,也称之为扁平映射
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        // 案例 1:将 List(List(1,2),List(3,4))进行扁平化操作
        val rdd: RDD[List[Int]] = sc.makeRDD(List(
        	List(1,2), List(3,4)
        ))
        
        rdd.flatMap(list => list).collect().foreach(println)
        
        // 案例 2:将 List(List(1,2),3,List(4,5))进行扁平化操作
        val rdd2 = sc.makeRDD(List(
            List(1,2), 3, List(4,5)
        ))
        
        rdd2.flatMap(data => {
    
    
            case list: List[_] => list
            case n: Int => List(n)
            case _ =>
        }).collect().foreach(println)
        
    }
}

6. glom

/**
	单 Value 类型算子
	函数签名:def glom(): RDD[Array[T]]
	功能:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变(分区数和数据所在分区不变)
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        // 案例 1
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)
        val glomRdd: RDD[Array[Int]] = rdd.glom()
        val glomArr: Array[Array[Int]] = glomRdd.collect()
        glomArr.foreach(println(_.mkString(",")))
        
        // 案例 2:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
        val maxSum: Int = rdd.glom().map(_.max).collect().sum()
        println(maxSum)
        
    }
}

7. groupBy

/**
	单 Value 类型算子
	函数签名:def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
	功能:将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,这样的操作称为 shuffle。极限情况下,数据可能被分在同一个分区中,一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        // 案例 1:按奇偶数分组
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        def groupFunc(num: Int): Int = {
    
     // 接收集合元素返回分组 key
            num % 2
        }
        
        val groupRdd: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunc)
        groupRdd.collect().foreach(println)
        
        // 案例 2:按首字母分组
        val rdd2 = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
        rdd2.groupBy(_.charAt(0)).collect().foreach(println)
        
        // 案例 3:从服务器日志数据 apache.log 中获取每个时间段访问量
        // apache.log 内容格式:ip - - 时间戳 时区 请求方式 URL资源路径
        val rdd3 = sc.textFile("data/apache.log")
        rdd3.map(line => {
    
    
            val time: String = line.split(" ")(3)
            val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
            val date: Date = sdf.parse(time)
            val sdf1 = new SimpleDateFormat("HH")
            val hour: String = sdf1.format(date)
            (hour, 1)
        }).groupBy(_._1).map(data => {
    
    
            (_._1, _._2.size)
        }).collect().foreach(println)
        
    }
}

8. filter

/**
	单 Value 类型算子
	函数签名:def filter(f: T => Boolean): RDD[T]
	功能:将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        // 案例 1:过滤集合中的奇数
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        rdd.filter(_ % 2 == 0).collect().foreach(println)
        
        // 案例 2:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径
        val rdd2 = sc.textFile("data/apache.log")
        rdd2.filter(data => {
    
    
            val date: String = data.split(" ")(3)
            date.startsWith("17/05/2015")
        }).collect().foreach(println)
        
    }
}

9. sample

/**
	单 Value 类型算子
	函数签名:def sample(
                withReplacement: Boolean,
                fraction: Double,
                seed: Long = Utils.random.nextLong
             ): RDD[T]
	功能:根据指定的规则从数据集中抽取数据,一般用于在分区数据倾斜时抽取数据判断原因
	参数说明:
		1.第一个参数表示数据抽取完后是否放回数据源(true:放回 false:丢弃)
		2.第二个参数:
			2.1 在数据抽取不放回时:表示每个数据被抽取的概率,指定了第三个参数有基准值的概念,每				次抽取的数据都是一样的
			2.2 在数据抽取放回时:表示每个数据可能被抽取的次数,但最终可能小于或大于这个值
		3.第三个参数表示抽取数据时随机算法的种子,默认值是系统时间
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
        
        // 抽取不放回
        // 底层使用伯努利算法,又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
		// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
        val arr1 = rdd.sample(false, 0.4, 1).collect().mkString(",")
        println(arr1)
        
        // 抽取放回
        // 底层使用泊松算法
        val arr2 = rdd.sample(true, 2).collect().mkString(",")
        println(arr2)
    }
}

10. distinct

/**
	单 Value 类型算子
	函数签名:
		def distinct()(implicit ord: Ordering[T] = null): RDD[T]
		def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
	
	功能:将数据集中重复的数据去重
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))
        rdd.distinct().collect().foreach(println)
        
        /**
        	List().distinct 的原理:底层使用 HashSet 处理去重
        	rdd.distinct() 的原理:底层实现为 map(x => (x, null)).reduceByKey((x, _) => x).map(_._1)
        	1-> 1,2,3,4,1,2,3,4 => (1,null),(2,null),(3,null),(4,null),(1,null),(2,null),(3,null),(4,null)
        	2-> (1,null) (1,null) => (1,null),.....
        	3-> (1,null) => 1
        */
        
    }
}

11. coalesce

/**
	单 Value 类型算子
	函数签名:def coalesce(
				numPartitions: Int, 
				shuffle: Boolean = false, 
				partitionCoalescer: Option[PartitionCoalescer] = Option.empty
			)(implicit ord: Ordering[T] = null): RDD[T]
	
	功能:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率;当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4,5,6), 3)
        rdd.saveAsTextFile("output") // [1,2],[3,4],[5,6]
        
        // 不 shuffle,默认
        rdd.coalesce(2).saveAsTextFile("output1") // [1,2],[3,4,5,6]
        
        // shuffle
        rdd.coalesce(2, true).saveAsTextFile("output2") // [1,4,5],[2,3,6]
        
        // 可以扩大分区,必须设置 shuffle 为 true,否则不起作用
        val rdd2 = sc.makeRDD(List(1,2,3,4,5,6), 2)
        // rdd2.coalesce(3).saveAsTextFile("output3") // 不起作用
        rdd2.coalesce(3, true).saveAsTextFile("output3")
        
    }
}

12. repartition

/**
	单 Value 类型算子
	函数签名:def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
	
	功能:扩大或缩减分区,其底层是调用 coalesce 方法,且参数 shuffle 的默认设置为 true
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4,5,6), 2)
        rdd.repartition(3).saveAsTextFile("output") // 底层调用 coalesce(3,true)
        
    }
}

13. sortBy

/**
	单 Value 类型算子
	函数签名:def sortBy[K](
                f: (T) => K, 
                ascending: Boolean = true,
                numPartitions: Int = this.partitions.length
             )(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
	
	功能:用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(6,3,5,1,2,4), 2)
        rdd.sortBy(_).collect().foreach(println) // 默认升序
        
        val rdd2 = sc.makeRDD(List(("1",1),("1",2),("11",3),("2",2)), 2)
        // 按照 tuple 的第一个元素降序
        rdd2.sortBy(_._1, false).collect().foreach(println)
        // 按照 tuple 升序,先比较第一个元素,相同则比较第二个元素,依次类推
        rdd2.sortBy(_, true).collect().foreach(println)
    }
}

14. intersection

/**
	双 Value 类型算子
	函数签名:def intersection(other: RDD[T]): RDD[T]
	功能:对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
	注意事项:三个 RDD 的数据类型必须一致
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd1 = sc.makeRDD(List(1,2,3,4))
        val rdd2 = sc.makeRDD(List(3,4,5,6))
        
        val rdd3: RDD[Int] = rdd1.intersection(rdd2)
        rdd3.collect().foreach(println) // 3, 4
        
    }
}

15. union

/**
	双 Value 类型算子
	函数签名:def union(other: RDD[T]): RDD[T]
	功能:对源 RDD 和参数 RDD 求并集后返回一个新的 RDD
	注意事项:三个 RDD 的数据类型必须一致
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd1 = sc.makeRDD(List(1,2,3,4))
        val rdd2 = sc.makeRDD(List(3,4,5,6))
        
        val rdd3: RDD[Int] = rdd1.union(rdd2)
        rdd3.collect().foreach(println) // 1,2,3,4,3,4,5,6
        
    }
}

16. subtract

/**
	双 Value 类型算子
	函数签名:def subtract(other: RDD[T]): RDD[T]
	功能:以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来返回一个新的 RDD
	注意事项:三个 RDD 的数据类型必须一致
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd1 = sc.makeRDD(List(1,2,3,4))
        val rdd2 = sc.makeRDD(List(3,4,5,6))
        
        val rdd3: RDD[Int] = rdd1.subtract(rdd2)
        rdd3.collect().foreach(println) // 1,2
        
        val rdd4: RDD[Int] = rdd2.subtract(rdd1)
        rdd4.collect().foreach(println) // 5,6
        
    }
}

17. zip

/**
	双 Value 类型算子
	函数签名:def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
	功能:将两个 RDD 中的元素,以键值对的形式进行合并返回一个新的 RDD。其中,键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素
	注意事项:
		1.两个 RDD 的数据类型可以不一致
		2.两个 RDD 的分区数量必须保持一致
		3.两个 RDD 的每个分区中的数据个数必须保持一致
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd1 = sc.makeRDD(List(1,2,3,4))
        val rdd2 = sc.makeRDD(List(3,4,5,6))
        
        val rdd3: RDD[(Int,Int)] = rdd1.zip(rdd2)
        rdd3.collect().foreach(println) // (1,3),(2,4),(3,5),(4,6)
        
        val rdd4: RDD[(Int,Int)] = rdd2.zip(rdd1)
        rdd4.collect().foreach(println) // (3,1),(4,2),(5,3),(6,4)
        
        // 错误案例
        val rdd5 = sc.makeRDD(List(1,2,3,4), 2)
        val rdd6 = sc.makeRDD(List(3,4,5,6), 3)
        rdd5.zip(rdd6) // error
        
        val rdd7 = sc.makeRDD(List(1,2,3,4,5,6), 2)
        val rdd8 = sc.makeRDD(List(3,4,5,6), 2)
        rdd7.zip(rdd8) // error
        
    }
}

18. partitionBy

/**
	Key-Value 类型算子
	函数签名:def partitionBy(partitioner: Partitioner): RDD[(K, V)]
	功能:将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
	注意事项:
		1.partitionBy 方法不是 RDD 中的方法,而是 PairRDDFunctions 类的方法
		2.RDD 的伴生对象中有 RDD 转换为 PairRDDFunctions 的隐式函数,所以 RDD 可以调用 partitionBy 方法
		3.partitionBy 不会改变分区的数量(区别于 coalesce 和 repartition),只是将数据所在的分区位置进行重新分配
		4.当 RDD 调用 partitionBy 指定的 partitioner 类型和分区数与该 RDD 的自身的一致,则不会执行
		5.Spark 所有的分区器都继承 Partitioner 抽象类,也可以通过它自定义分区器
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        // rdd.partitionBy() // error,数据源不是 Key-Value 类型
        val mapRdd = rdd.map((_, 1))
        
        // 二次编译通过隐式函数将 mapRdd 转换成 PairRDDFunctions
        mapRdd.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")
        // 数据分布从 【1,2】【3,4】 变成了 【(1,1),(3,1)】【(2,1),(4,1)】
        // HashPartitioner 底层分区原理是获取数据的 key 的 hashCode 值与分区数取模得到分区号
        
    }
}

19. reduceByKey

/**
	Key-Value 类型算子
	函数签名:
		def reduceByKey(func: (V, V) => V): RDD[(K, V)]
		def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
	功能:将数据按照相同的 Key 对 Value 进行聚合运算
	注意事项:
		1.reduceByKey 每次是对相同 key 的值进行两两聚合直到得到最终的一个结果
		2.如果数据源中的某个 key 只有一份,则不会参与 reduceByKey 运算
		3.reduceByKey 的分区内和分区间的聚合运算规则是一致的
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(
        	("a",1),("a",2),("a",3),("b",4)	
        ))
        
        val reduceRdd: RDD[(String, Int)] = rdd.reduceByKey((v1, v2) => {
    
    
            println(s"v1=$v1 v2=$v2")
            v1 + v2
        })
        
        reduceRdd.collect().foreach(println)
        
    }
}

20. groupByKey

/**
	Key-Value 类型算子
	函数签名:
		def groupByKey(): RDD[(K, Iterable[V])]
        def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
        def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
	功能:将数据源的数据根据 key 对 value 进行分组
	注意事项:
		1.groupByKey 不需要单独指定 key,返回的是 (key, Iterable(value)) 的二元组
		2.groupBy 可以指定进行分组的 key,返回的是 (key, Iterable((key,value))) 的二元组
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(
        	("a",1),("a",2),("a",3),("b",4)	
        ))
        
        val groupRdd: RDD[(String, Iterable[Int])] = rdd.groupByKey()
        
        groupRdd.collect().foreach(println) 
        // (a,CompactBuffer(1,2,3)), (b,CompactBuffer(4))
        
    }
}

21. reduceByKey VS groupByKey

  • 从性能的角度:

    • reduceByKey 和 groupByKey 由于需要打乱重组数据,所以都存在 shuffle 的操作,在 Spark 中 shuffle 操作必须落盘,不能在内存中等待数据,会造成内存溢出

    • reduceByKey 会在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)操作,这样会减少落盘的数据量,所以性能比较高
      在这里插入图片描述

    • groupByKey 只是进行分组,不会进行预聚合减少数据量,所以性能比较低

      在这里插入图片描述

  • 从功能的角度:reduceByKey 包含分组和聚合的两个操作。groupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合,则使用 groupByKey

22. aggregateByKey

/**
	Key-Value 类型算子
	函数签名:def aggregateByKey[U: ClassTag](zeroValue: U)(
				seqOp: (U, V) => U,
				combOp: (U, U) => U
			 ): RDD[(K, U)]
	功能:将数据源的数据根据 key 按不同的规则进行分区内计算和分区间计算
	解释:
		1.aggregateByKey 方法存在函数柯里化,拥有两个参数列表
		2.第一个参数列表需要传递一个参数,表示初始值,用于分区内和第一个元素 key 的 value 进行计算
		3.第二个参数列表需要传递两个函数类型参数,第一个函数类型参数表示分区内运算规则,第二个函数类型参数表示分区间运算规则
		4.aggregateByKey 方法返回的 value 类型取决于初始值的类型
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        // 案例 1:统计每个分区内的最大值,然后求不同分区最大值的和
        val rdd = sc.makeRDD(List(
        	("a",1),("a",2),("a",3),("a",4)	
        ), 2)
        
        /*
        	【("a",1),("a",2)】【("a",3),("a",4)】
        	【("a",2)】【("a",4)】
        	("a", 6)
        */
        
        rdd.aggregateByKey(0)(
        	(x, y) => math.max(x, y),
            (x, y) => x + y
        ).collect().foreach(println) 
        
        // 案例 2:统计每个 key 的 value 的平均值(value和/次数)
        val rdd1: RDD[(String, Int)] = sc.makeRDD(List(
        	("a",1),("a",2),("b",3),
            ("b",4),("b",5),("a",6)	
        ), 2)
        
        // 预期结果:(a, (1+2+6)/(1+1+1)=3),(b, (3+4+5)/(1+1+1)=4)
        val aggRdd: RDD[(String, (Int, Int))] = rdd1.aggregateByKey((0, 0))(
        	(t, v) => (t._1 + v, t._2 + 1),
            (t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)
        )
        
        // aggRdd.map((_._1, _._2._1/_._2._2)).collect().foreach(println)
        val resultRdd: RDD[(String, Int)] = aggRdd.mapValues(_._1/_._2)
        resultRdd.collect().foreach(println)
        
    }
}

23. foldByKey

/**
	Key-Value 类型算子
	函数签名:def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
	功能:当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(
        	("a",1),("a",2),("b",3),
            ("b",4),("b",5),("a",6)	
        ), 2)
        
        rdd.aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)
        rdd.foldByKey(0)(_ + _).collect().foreach(println)
        
    }
}

24. combineByKey

/**
	Key-Value 类型算子
	函数签名:def combineByKey[C](
                createCombiner: V => C,
                mergeValue: (C, V) => C,
                mergeCombiners: (C, C) => C
             ): RDD[(K, C)]
	功能:最通用的对 key-value 型 rdd 进行聚集操作的聚集函数
	解释:
		1.第一个参数表示对相同 key 的第一个 value 进行转换的操作
		2.第二个参数表示分区内运算规则
		3.第三个参数表示分区间运算规则
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        // 案例:统计每个 key 的 value 的平均值(value和/次数)
        val rdd: RDD[(String, Int)] = sc.makeRDD(List(
        	("a",1),("a",2),("b",3),
            ("b",4),("b",5),("a",6)	
        ), 2)
        
        // 预期结果:(a, (1+2+6)/(1+1+1)=3),(b, (3+4+5)/(1+1+1)=4)
        val aggRdd: RDD[(String, (Int, Int))] = rdd.combineByKey(
            v => (v, 1),
        	(t: (Int, Int), v) => (t._1 + v, t._2 + 1),
            (t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2)
        )
        
        // aggRdd.map((_._1, _._2._1/_._2._2)).collect().foreach(println)
        val resultRdd: RDD[(String, Int)] = aggRdd.mapValues(_._1/_._2)
        resultRdd.collect().foreach(println)
        
    }
}

25. reduceByKey/foldByKey/aggregateByKey/combineByKey 的区别

  • 四个方法底层最终调用的都是 combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] 方法
  • reduceByKey:相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
  • foldByKey:相同 key 的第一个数据和初始值按分区内规则进行计算,分区内和分区间计算规则相同
  • aggregateByKey:相同 key 的第一个数据和初始值按分区内规则进行计算,分区内和分区间计算规则可以不相同
  • combineByKey:相同 key 的第一个数据可以进行结构转换,分区内和分区间计算规则可以不相同

26. join

/**
	Key-Value 类型算子
	函数签名:def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
	功能:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W)) 的 RDD,类似于 SQL 的 join 连接
	说明:
		1.如果两个数据集中的 key 没有匹配上,则这部分数据会被丢弃
		2.如果两个数据集中存在多个相同的 key,则每个 key 会与其他 key 依次匹配,产生笛卡尔积,造成数据量几何性增长,造成内存溢出
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd1 = sc.makeRDD(List(
        	("a", 2), ("b", 2), ("c", 3), ("d", 4)
        ), 2)
        
        val rdd2 = sc.makeRDD(List(
        	("a", 1), ("a", 7), ("c", 5), ("e", 6)
        ), 2)
        
        val joinRdd: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
        joinRdd.collect().foreach(println)
        
    }
}

27. leftOuterJoin/rightOuterJoin

/**
	Key-Value 类型算子
	函数签名:
		def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
		def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
		
	功能:类似于 SQL 语句的左外连接和右外连接 
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd1 = sc.makeRDD(List(
        	("a", 2), ("b", 2), ("c", 3), ("d", 4)
        ), 2)
        
        val rdd2 = sc.makeRDD(List(
        	("a", 1), ("b", 7), ("c", 5), ("e", 6)
        ), 2)
        
        val leftJoinRdd: RDD[(String, (Int, Option(Int))] = rdd1.leftOuterJoin(rdd2)
        leftJoinRdd.collect().foreach(println)
        
        val rightJoinRdd: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)
        rightJoinRdd.collect().foreach(println)
        
    }
}

28. cogroup

/**
	Key-Value 类型算子
	函数签名:def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
	功能:在类型为(K,V)和 1~3 个(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD,相同的 key 连接
	说明:cogroup -> connect + group,即分组后再连接
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd1 = sc.makeRDD(List(
        	("a", 1), ("b", 2), ("c", 3)
        ), 2)
        
        val rdd2 = sc.makeRDD(List(
        	("a", 4), ("b", 5), ("c", 6), ("c", 7),("d", 8)
        ), 2)
        
        val cgRdd: RDD[(String, (Iterable[Int], Iterable[Int])] = rdd1.cogroup(rdd2)
        cgRdd.collect().foreach(println)
        
    }
}

29. sortByKey

/**
	Key-Value 类型算子
	函数签名:def sortByKey(
				ascending: Boolean = true, 
				numPartitions: Int = self.partitions.length
			): RDD[(K, V)]
	功能:在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序
的 RDD
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(("1",1),("11",3),("2",2)), 2)
        
        // rdd.sortBy(_._1, false).collect().foreach(println) // 降序
        rdd.sortByKey(false).collect().foreach(println) // 降序,默认是升序
        
    }
}

30. 案例实操

  • 需求:统计出每一个省份每个广告被点击数量排行的 Top3

    agent.log 文件格式:
    时间戳 省份 城市 用户 广告
    1516609143867 6 7 64 16
    1516609143869 9 4 75 18
    1516609143869 1 7 87 12
    
    
  • 实现:

    object TestAdvClick {
          
          
        def main(args: Array[String]): Unit = {
          
          
            // 1.创建 spark 连接
            val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Adv")
            val sc = new SparkContext(sparkConf)
            
            // 2.读取文件数据
            val dataRdd: RDD[String] = sc.textFile("data/agent.log")
            
            // 3.将一行的字符串数据转换为 ((省份, 广告), 1) 元组
            val mapRdd: RDD[((String, String), Int)] = dataRdd.map(line => {
          
          
                val datas = line.split(" ")
                ((datas(1), datas(4)), 1)
            })
            
            // 4.分组聚合求出每个省份每个广告的点击次数
            val reduceRdd: RDD[((String, String), Int)] = mapRdd.reduceByKey(_ + _)
            
            // 5.将数据结构由 ((省份, 广告), sum) 转换为 (省份, (广告, sum))
            val newMapRdd: RDD[(String, (String, Int))] = reduceRdd.map {
          
          
                case ((province, adv), sum) => (province, (adv, sum))
            }
            
            // 6.按照省份分组
            val groupRdd: RDD[(String, Iterable[(String, Int)])] = newMapRdd.groupByKey()
            
            // 7.对每个省份的广告点击次数进行降序并取 top 3
            val resultRdd: RDD[(String, Iterable[(String, Int)])] = groupRdd.mapValues(iter => {
          
          
                iter.toList().sortBy(_._2)(Ordering.Int.reverse).take(3)
            })
            
            // 8.输出结果
            resultRdd.collect().foreach(println)
            
        }
    }
    
    

二、行动算子

只有行动算子会触发作业的执行

1. reduce

/**
	函数签名:def reduce(f: (T, T) => T): T
	功能:按照规则聚合 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val result: Int = rdd.reduce(_ + _)
        println(result)
        
        sc.stop()
    }
}

2. collect

/**
	函数签名:def collect(): Array[T]
	功能:将不同分区中的数据按照分区顺序采集到的 Driver 端的内存中形成数组
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val arr: Array[Int] = rdd.collect()
        println(arr.mkString(","))
        
        sc.stop()
    }
}

3. count

/**
	函数签名:def count(): Long
	功能:返回 RDD 中元素的个数
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val num: Long = rdd.count()
        println(num)
        
        sc.stop()
    }
}

4. first

/**
	函数签名:def first(): T
	功能:返回 RDD 中的第一个元素
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val elem: Int = rdd.first()
        println(elem)
        
        sc.stop()
    }
}

5. take

/**
	函数签名:def take(num: Int): Array[T]
	功能:返回一个由 RDD 的前 n 个元素组成的数组
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val takeArr: Array[Int] = rdd.take(3)
        println(takeArr.mkString(","))
        
        sc.stop()
    }
}

6. takeOrdered

/**
	函数签名:def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
	功能:返回该 RDD 排序后的前 n 个元素组成的数组,默认是升序
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(4,2,3,1), 2)
        
        val toArr: Array[Int] = rdd.takeOrdered(3)(Ordering[Int].reverse)
        println(toArr.mkString(","))
        
        sc.stop()
    }
}

7. aggregate

/**
	函数签名:def aggregate[U: ClassTag](zeroValue: U)(
				seqOp: (U, T) => U, 
				combOp: (U, U) => U
			 ): U
	功能:每个分区的数据通过初始值和分区内的数据进行聚合,然后初始值再和所有分区间的数据聚合
	对比:
		1.aggregateByKey 只能用于 key-value 类型数据,aggregate 无限制
		2.aggregateByKey 的初始值只参与分区内的计算,而 aggregate 的初始值会同时参与分区内和分区间的计算
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val result: Int = rdd.aggregate(10)(_ + _, _ + _)
        println(result) // 40
        
        sc.stop()
    }
}

8. fold

/**
	函数签名:def fold(zeroValue: T)(op: (T, T) => T): T
	功能:分区内和分区间的计算规则一致时替代 aggregate
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val result: Int = rdd.fold(10)(_ + _)
        println(result)
        
        sc.stop()
    }
}

9. countByKey/countByValue

/**
	函数签名:
		def countByKey(): Map[K, Long]
		def countByValue(): Map[V, Long]
		
	功能:统计每种 key 的个数/统计每种 value 的个数
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,1,1,4), 2)
        
        val valMap: Map[Int, Long] = rdd.countByValue()
        println(valMap) // Map(1 -> 3, 4 -> 1)
        
        val rdd2 = sc.makeRDD(List(
        	"a" -> 1, "a" -> 2, "a" -> 3,
            "b" -> 4
        ), 2)
        
        val keyMap: Map[String, Long] = rdd2.countByKey()
        println(keyMap) // Map(a -> 3, b -> 1)
        
        sc.stop()
    }
}

10. save 相关算子

/**
	函数签名:
		def saveAsTextFile(path: String): Unit
        def saveAsObjectFile(path: String): Unit
        def saveAsSequenceFile(
        	path: String,
        	codec: Option[Class[_ <: CompressionCodec]] = None
        ): Unit
		
	功能:将数据保存到不同格式的文件中:text文件/序列化成对象保存到文件/保存成 Sequencefile 文件
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(
        	("a", 1), ("b", 2), ("c", 3),("d", 4)
        ), 2)
        
        rdd.saveAsTextFile("output")
        rdd.saveAsObjectFile("output1")
        
        // saveAsSequenceFile 要求数据必须为 key-value 类型
        rdd.saveAsSequenceFile("output2")
        
        sc.stop()
    }
}

11. foreach

/**
	函数签名:def foreach(f: T => Unit): Unit = withScope {
                val cleanF = sc.clean(f)
                sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
            }
		
	功能:分布式(在不同的 Executor 端)遍历 RDD 中的每一个元素,调用指定函数
	说明:
		1.Scala集合的方法是在同一个节点内存中执行的
		2.RDD的方法可以将计算逻辑发送到分布式不同的节点执行
		3.为了区分,一般将 RDD 的方法称为算子,算子的外部调用是在 Driver 端执行,而内部的具体计算逻辑是在 Executor 端执行
*/
object TestRDDOperator {
    
    
    def main(args: Array[String]): Unit = {
    
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("op")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        // 此处的 foreach 是调用的 scala 集合中 Array 的方法,是在 Driver 端的内存中执行
        // 数据的打印是按顺序的
        rdd.collect().foreach(println)
        
        println("===================")
        // 此处的 foreach 是调用的 RDD 的算子,是在不同的 Executor 端的内存中执行
        // 数据的打印的顺序是不确定的
        rdd.foreach(println)
        
        sc.stop()
    }
}

猜你喜欢

转载自blog.csdn.net/weixin_44480009/article/details/139331810