RDD作为spark核心的数据抽象,有关RDD的源码可看spark源码《一》RDD,有大量的api,也就是算子
之前写过两篇算子源码,一篇触发runJob()的算子,一篇是基于combineByKey()算子的源码,有兴趣的可以去看下。
目录
take()&&takeOrdered()&&top()&&first()
union()&&intersection()&&subtract()
zipWithUniqueId()&&zipWithIndex()
map()&&flatMap()
返回一个新的RDD
val data=sc.parallelize(0 to 4,2)
val m=data.map(x=>x to 3)
m.foreach(println)
println(m.count())//RDD元素数量
val fm=data.flatMap(x=>x to 3)
fm.foreach(println)
println(fm.count())
map结果:Range(0, 1, 2, 3),Range(1, 2, 3),Range(2, 3),Range(3),Range() RDD元素数量5
flatMap结果:0 1 2 3 1 2 3 2 3 3 RDD元素数量10
可以看到map()是将一个元素生成一个元素,而flatMap()是将一个元素生成N(也可能0个)个元素。
val data=sc.parallelize(List("tom tom","jerry hehe","hehe"),2)
val m=data.map(x=>x.split(" "))
m.foreach(println)
println(m.count())
val fm=data.flatMap(x=>x.split(" "))
fm.foreach(println)
println(fm.count())
map结果:java.lang.String;@226d19fb,java.lang.String;@ffdbd08,java.lang.String;@3d58c9e5 元素数量3
flatMap结果:tom tom jerry hehe hehe 元素数量5
可以看到map将每个元素转为数组,新RDD的元素类型为数组,而faltMap是将所有数组中元素取出并连接起来。
map()&&mapPartitions()
返回一个新的RDD
map()与mapPartitions()源码:
class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: T => U)
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).map(f)//分区中每个元素执行f方法
}
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: Iterator[T] => Iterator[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = f(prev.iterator(split))//每个分区的数据作为一个整体执行f
}
下面看示例:
val data = sc.parallelize(1 to 10,3)
data.mapPartitions(x=>mapParrtitionsF(x)).count()
data.map(x=>mapF(x)).count()
def mapF(x:Int):Int={
println("调用了mapF方法")
x
}
def mapParrtitionsF(x:Iterator[Int]):Iterator[Int]={
println("调用了mapParrtitionsF方法")
x
}
map结果:调了10次方法,RDD有多少元素执行多少次方法
flatMap结果:调了3次方法,RDD有多少分区执行多少次方法
mapPartitionsWithIndex()
返回一个新的RDD
val data = sc.parallelize(1 to 5,3)
data.mapPartitionsWithIndex((x,iter)=>{
var result=List[String]()
while (iter.hasNext){
result ::=(x+"-"+iter.next())
}
result.toIterator
}).foreach(println)
结果为:0-1,1-2,1-3,2-4,2-5
mapPartitionsWithIndex()传入的方法需要两个参数,一个为分区Id,另一个为分区数据,该方法可用来查看各分区的数据
filter()
返回一个新的RDD
val data = sc.parallelize(1 to 5,3)
data.filter(x=>x%2==0).foreach(println)
结果为: 2 4
filter()接收一个返回值为布尔类型的方法,过滤掉不符合条件的元素,符合条件的元素组成一个新的RDD
take()&&takeOrdered()&&top()&&first()
前三个返回一个数组,最后一个返回一个值
val data=sc.parallelize(List(6,3,2,0,11,45,1),3)
for(e<-data.take(2)){
println(e)
}
for(e<-data.takeOrdered(2)){
println(e)
}
for(e<-data.top(2)){
println(e)
}
println(data.first())
take(2)结果为:6 3 ,取前两个元素
takeOrdered(2)结果为:0 1 ,升序排列后,取前两元素
top(2)结果为:45 11, 降序排列后,取前两元素
first()结果为:6 取第一个元素
sample()&&takeSample()
一个返回新的RDD 一个返回数组
val data = sc.parallelize(1 to 10000,3)
println(data.sample(true,0.5).count())
println(data.takeSample(true,5000).length)
sample()结果为:4976
takeSample()结果为:5000
这两算子都是随机抽取元素,可以传三个参数,第一个为布尔值,表示是否重复抽取元素,第二个sample()为抽取比例,takeSample()为具体数值,第三个为随机种子,java的随机数是伪随机数,通过计算随机种子得到某些数,可用默认值。
从结果可以看出,抽取比例为0.5时是抽不到5000个元素的,如果想抽取具体多少元素,可用takeSample()
union()&&intersection()&&subtract()
返回一个新的RDD
val d4=sc.parallelize(List(1,3,4,6,3,8,1),2)
val d5=sc.parallelize(List(2,3,4,5,3,8,9),3)
d4.union(d5).foreach(println)
println(d4.union(d5).partitions.length)//新RDD分区数
d4.intersection(d5).foreach(println)
println(d4.intersection(d5).partitions.length)
println(d4.intersection(d5,6).partitions.length)//指定新RDD分区数
d4.subtract(d5).foreach(println)
println(d5.subtract(d4).partitions.length)
println(d4.subtract(d5).partitions.length)
union()结果为:1,3,4,6,3,8,1,2,3,4,5,3,8,9 分区数为2+3=5
intersection()结果为:8,3,4 分区数为:3 6
subtract()结果为:1,6,1 分区数为:3 2
这三个算子分别求:并集,交集,差集。其中intersection()交集去重,别的union(),subtract()均不去重,union()分区数为两个RDD分区数之和,intersection()分区默认为max(rdd1分区,rdd2分区),可以手动指定分区数,subtract()默认分区数为调该方法的RDD的分区数,也可以指定。
reduce()&&fold()
返回一个值
val data = sc.parallelize(1 to 5,3)
println(data.reduce(_+_))
println(data.fold(5)(_+_))
reduce()结果为:15
fold()结果为:35
关于reduce()与fold()源码可看我之前的博客。
reduce()的计算过程:(((1+2)+3)+4)+5,而fold()的计算过程是先计算各分区与初值的结果,最后计算各分区结果与初值,
假设data的数据分布是:(1),(2,3),(4,5)
则fold()的计算结果是:(1+5)+(2+3+5)+(4+5+5)+5,
所以reduce()与fold()的区别就是一个有初值,一个无初值。
fold()&&aggregate()
返回一个值
val data = sc.parallelize(1 to 5,3)
println(data.fold(5)(_+_))
println(data.aggregate(5)(_+_,_*_))
fold()结果为:35
aggregate()结果为:4200
aggregate()需要传入两个方法,第一个方法计算各分区内数据,第二个方法计算各分区之前结果数据。
aggregate()计算过程:(1+5)*(2+3+5)*(4+5+5)*5=4200
而fold()传一个方法,计算各分区内数据不仅用这个方法,计算各分区之前结果数据也用该方法。
所以fold()与aggregate()区别是:一个传一个方法,一个传两个方法。
zipWithUniqueId()&&zipWithIndex()
返回一个新的RDD
val data = sc.parallelize(1 to 5,3)
data.zipWithUniqueId().foreach(println)
data.zipWithIndex().foreach(println)
zipWithUniqueId()结果为:(2,1),(3,4),(4,2),(5,5),(1,0)
zipWithIndex()结果为:(4,3),(2,1),(3,2),(5,4),(1,0)
可以看出,这两种方法都是返回一个键值对的RDD,键为元素,zipWithIndex()值为下标,从0到RDD元素数-1,而zipWithUniqueId()值为唯一的Id,不受最大值为RDD元素数-1的约束
zip()&&zipPartitions()
返回一个新的RDD
val d1=sc.parallelize(List("tom","jerry","hehe","zlq","wnn","nm","sl"),2)
val d3=sc.parallelize(List(6,3,2,0,11,45,1),2)
d1.zip(d3).foreach(println)
d1.zipPartitions(d3)((iter1,iter2)=>{//方法的参数为迭代器
var result=List[String]()
while (iter1.hasNext&&iter2.hasNext){
result ::=(iter1.next()+"-"+iter2.next())
}
result.iterator
}).foreach(println)
zip()结果为:(zlq,0),(tom,6),(wnn,11),(nm,45),(sl,1),(jerry,3),(hehe,2)
zipPartitions()结果为:hehe-2,jerry-3,tom-6,sl-1,nm-45,wnn-11,zlq-0
zip()要求两个RDD分区数与元素数必须相等,而zipPartitions()只要求分区数相同。生成新RDD与原RDD分区数相同
未完待续........