前面已经给大家讲过RDD原理以及常用的转换算子,今天就再给大家说说RDD的动作算子有哪些,以便大家更能全面的理解和掌握。
对于动作算子来说,本质上动作算子是通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行;所有的动作算子都是急迫型(non-lazy),RDD遇到Action就会立即计算。
常用动作算子
count
- 返回数据集中的元素的个数
案例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.count
//6
countByKey
- 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
案例:
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd.countByKey
//scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
countByValue
- 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
案例:
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd.countByValue
//scala.collection.Map[(Int, Int),Long] = Map((3,6) -> 1, (1,4) -> 1, (1,3) -> 1, (2,3) -> 1, (1,2) -> 1, (3,8) -> 1)
collect
- 以Array返回RDD的所有元素。一般在过滤或者处理足够小的结果的时候使用
- 在上篇文章写转换算子应注意到,前面所有转换操作都结合了collect动作算子进行计算输出
案例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.collect
//Array[Int] = Array(1, 2, 3, 4, 5, 6)
collectAsMap
- collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素
top
- 自定义一个排序规则(倒序),返回最大的n个数组成的数组
案例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.top(3)
// Array[Int] = Array(6, 5, 4)
take
- 返回当前RDD中的前n个元素
案例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.take(3)
// Array[Int] = Array(1, 2, 3)
takeOrdered(n)
- 返回该RDD排序后的前n个元素组成的数组
案例:
val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd.takeOrdered(3)
// Array[Int] = Array(2, 3, 4)
takeSample(withReplacement,num,seed)
- withReplacement:结果中是否可重复
- num:取多少个
- seed:随机种子
- 返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定的随机数生成器种子
- takeSample()函数和sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组
案例:
val rdd1 = sc.makeRDD(1 to 10)
rdd1.takeSample(true,4,10)
//Array[Int] = Array(10, 10, 2, 3)
first
- 返回当前RDD的第一个元素
案例:
val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd.first
// 1
reduce
- 根据指定函数,对RDD中的元素进行两两计算,返回计算结果
案例:
val a=sc.parallelize(1 to 100)
a.reduce((x,y)=>x+y)
或
a.reduce(_+_) //与上面意思一样
// 5050
val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1)))
b.reduce((x,y)=>{(x._1+y._1,x._2+y._2)})
// (AABBC,6)
foreach
- 对RDD中的每个元素都使用指定函数,做循环,无返回值
案例:
val rd=sc.parallelize(1 to 10)
rd.foreach(println)
//1 2 3 4 5 6 7 8 9 10
foreachPartition
- 遍历的数据是每个partition的数据。
lookup
- 用于PairRDD,返回K对应的所有V值
案例:
val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.lookup('a')
//Array(1, 2)
max
- 取RDD中元素的最大值
案例:
val y=sc.parallelize(10 to 30)
y.max
//30
min
- 取RDD中元素的最小值
案例:
val y=sc.parallelize(10 to 30)
y.min
//10
saveAsTextFile(path)
- 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
案例:
val rdd=sc.parallelize(1 to 10,2)
rdd.saveAsTextFile("hdfs://hadoop000:8020/data/rddsave/")
//会把RDD的结果分成2个区的文件保存到hdfs上
saveAsSequenceFile(path)
- 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
- 用于将RDD中的元素序列化成对象,存储到文件中。
aggregate
- aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
案例:
var rdd1 = sc.makeRDD(1 to 10,2)
//将该RDD所有元素相加得到结果
rdd.aggregate(0)(_+_,_+_)
//Int = 55
fold(num)(func)
- 折叠操作,aggregate的简化操作,将每个分区里面的元素通过seqOp和初始值进行聚合
案例:
var rdd1 = sc.makeRDD(1 to 10,2)
//将该RDD所有元素相加得到结果
rdd.fold(0)(_+_)
//Int = 55