目录
基本概念
行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发触发作业(Job)的执行。其底层代码调用的就是runJob的方法,底层会创建ActiveJob,并提交执行。
算子介绍
1. reduce
函数定义
def reduce(f: (T, T) => T): T
说明
聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
2. collect
函数定义
def collect(): Array[T]
说明
在驱动程序中,以数组Array 的形式返回数据集的所有元素 。
3. count
函数定义
def count(): Long
说明
返回RDD 中元素的个数。
4. first
函数定义
def first(): T
说明
返回RDD 中的第一个元素 。
5. take
函数定义
def take(num: Int): Array[T]
说明
返回一个由RDD 的前 n 个元素组成的数组。
6. takeOrdered
函数定义
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
说明
返回该RDD 排序后的前 n 个元素组成的数组。
案例实操1-6
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{
SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
//TODO - 行动算子
//reduce
// val i = rdd.reduce(_ + _)
//
// println(i)
//collect:方法会讲不同分区的数据按照分区顺序采集到Driver端内存中,形成数组
// val ints: Array[Int] = rdd.collect()
//
// println(ints.mkString(","))
//统计数据源中的数据的个数
val cnt = rdd.count()
println(cnt)
//first:获取数据源中第一个数据
val first = rdd.first()
println(first)
//take:获取N个数据
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(","))
val rdd1 = sc.makeRDD(List(4,2,3,1))
//takeOrdered:排序后,获取N个数据
val ints1: Array[Int] = rdd1.takeOrdered(3)
println(ints1.mkString(","))
sc.stop()
}
}
7. aggregate
函数定义
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
8. fold
函数定义
def fold(zeroValue: T)(op: (T, T) => T): T
说明
折叠操作,aggregate 的简化版操作。
案例实操7-8
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{
SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
//TODO - 行动算子
//aggregateByKey:初始值只会参与分区内的计算
//aggregate:初始值会参与分区内的计算,并且参与分区间的计算
// val result: Int = rdd.aggregate(10)(_ + _, _ + _)
val result: Int = rdd.fold(10)(_ + _)
println(result)
sc.stop()
}
}
9. countByKey
函数定义
def countByKey(): Map[K, Long]
说明
统计每种key 的个数。
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{
SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,1,3,4),2)
val rdd1 = sc.makeRDD(List(("a",1),("a",2),("a",3)))
//TODO - 行动算子
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
println(intToLong)
val stringToLong: collection.Map[String, Long] = rdd1.countByKey()
println(stringToLong)
sc.stop()
}
}
案例实操
10. save相关算子
函数定义
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
说明
将数据保存到不同格式的文件中。
案例实操
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{
SparkConf, SparkContext}
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
//TODO - 行动算子
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
//saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
11. foreach
函数定义
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
说明
分布式遍历RDD 中的每一个元素,调用指定函数。
案例实操
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{
SparkConf, SparkContext}
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 准备环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4) )
//foreach其实Driver端内存集合循环遍历的方法
rdd.collect().foreach(println)
println("*************")
//foreach其实是Executor端内存数据打印
rdd.foreach(println)
//算子:Operator(算子)
//RDD的方法和Scala集合对象的方法不一样
//集合对象的方法都是在同一个节点的内存中完成的
//RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
//为了区分不同的处理效果,所以将RDD的方法称之为算子
//RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在EXecutor端执行
sc.stop()
}
}