此经验来源于《图解Spark核心技术与案例》一书,书挺不错的,有需要学习的可以去看看。
接着上回说到的RDD接口往下说,上回简单的通过举例将RDD的一些常用借口展示了一下,咱们此篇文章将主要解释一下RDD的创建和转换,同样,主要也会是文字解释加代码实操。
目前创建RDD主要是通过两种基础的RDD:一种是并行集合(Parallelized Collections),接收一个已经存在的scala集合,然后进行各种并行化计算;另外一种就是从外部存储创建RDD,外部存储可以使文本文件或者是HDFS文件系统,还可以从Hadoop的API接口创建。
并行化集合创建
并行化集合是通过调用sparkContext的parallelize方法,在一个已经存在的scala集合上创建的(实际上是一个Seq对象)。集合的对象将会被复制,创建出一个可以被并行操作的分布式数据集。示例代码如下:
//这是通过parallelize方法创建的RDD
scala> val rdd =sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> rdd.partitions.size
res2: Int = 2
scala> var rdd2 =sc.parallelize(1 to 10,4)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd2.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> rdd2.partitions.size
res4: Int = 4
//makeRDD方法和parallelize方法类似,不过该方法可以指定每一个分区的首选位置。
scala> var collect = Seq(1 to 10,Seq("master","slave1")),(11 to 15,Seq("slave2","slave3")))
<console>:1: error: ';' expected but ',' found.
var collect = Seq(1 to 10,Seq("master","slave1")),(11 to 15,Seq("slave2","slave3")))
^
scala> var collect = Seq((1 to 10,Seq("master","slave1")),(11 to 15,Seq("slave2","slave3")))
collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),List(master, slave1)), (Range(11, 12, 13, 14, 15),List(slave2, slave3)))
scala> var rdd = sc.makeRDD(collect)
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[2] at makeRDD at <console>:26
scala> rdd.partitions.size
res5: Int = 2
scala> rdd.preferredLocations(rdd.partitions(0))
res6: Seq[String] = List(master, slave1)
scala> rdd.preferredLocations(rdd.partitions(1))
res7: Seq[String] = List(slave2, slave3)
外部存储创建RDD
spark可以将任何Hadoop支持的存储资源转化为RDD,例如本地文件、HDFS、Cassandra、HBase、S3等。
textFile(path:file path,minPartitions:min partitions):RDD[String]
在演示之前我在本地目录创建了一个sparkLearn.txt文件,然后把这个文件上传到HDFS上:
sparkLearn.txt文件内容:
Everyone haas
a story
in life
smiled and
a young
couple sitting
Everyone haas
a story
in life
smiled and
a young
couple sitting
story life
Everyone haas
a story
in life
smiled and
a young
couple sitting
上传本地文件到HDFS
[root@C6-hdp01 ~]# hadoop fs -copyFromLocal /root/spark/sparkLearn.txt /aaa/text
//分别从本地文件和HDFS文件系统创建了两个RDD
scala> var rdd = sc.textFile("/root/spark/sparkLearn.txt")
rdd: org.apache.spark.rdd.RDD[String] = /root/spark/sparkLearn.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd.count
[Stage 0:> (0 + 1) / res0: Long = 19
scala> var rdd = sc.textFile("hdfs://C6-hdp01:9000/aaa/text/sparkLearn.txt")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://C6-hdp01:9000/aaa/text/sparkLearn.txt MapPartitionsRDD[7] at textFile at <console>:24
scala> rdd.count
[Stage 1:> (0 + 1) / res3: Long = 19
textFile第二个slice是一个可选参数,默认情况下是为每个数据块分配一个slice,用户也可以自定义slice来制定更多的分片,但是不能少于HDFS数据块的分片数。
转换操作
转换操作这里我分了两部分来叙述,一个是基础转换操作,另外一个是键值对转换操作。首先来看一下基础转换操作:
基础转换操作:
- map[U] (f:(T)=>U):RDD[U]
- distinct():RDD[(T)]
- distinct(numPartitions:Int):RDD[T]
- flatMap[U](f: (T))=>TraversableOnce[U]):RDD[U]
map操作是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。distinct操作是去除RDD中重复的元素,返回所有元素不重复的RDD。而flatMap操作与map操作类似,区别是原RDD中每个元素经过map处理后只能生成一个元素,而在flatMap操作中原RDD中的每个元素可生成一个或者多个元素来构建新的RDD。
示例代码:
scala> var data = sc.textFile("/root/spark/sparkLearn.txt")
data: org.apache.spark.rdd.RDD[String] = /root/spark/sparkLearn.txt MapPartitionsRDD[9] at textFile at <console>:24
scala> data.map(line => line.split("\\s+")).collect
res4: Array[Array[String]] = Array(Array(Everyone, haas), Array(a, story), Array(in, life), Array(smiled, and), Array(a, young), Array(couple, sitting), Array(Everyone, haas), Array(a, story), Array(in, life), Array(smiled, and), Array(a, young), Array(couple, sitting), Array(story, life), Array(Everyone, haas), Array(a, story), Array(in, life), Array(smiled, and), Array(a, young), Array(couple, sitting))
scala> data.flatMap(line => line.split("\\s+")).collect
res5: Array[String] = Array(Everyone, haas, a, story, in, life, smiled, and, a, young, couple, sitting, Everyone, haas, a, story, in, life, smiled, and, a, young, couple, sitting, story, life, Everyone, haas, a, story, in, life, smiled, and, a, young, couple, sitting)
scala> data.flatMap(line => line.split("\\s+")).distinct.collect
[Stage 4:> (0 + 1) / res6: Array[String] = Array(sitting, life, couple, smiled, young, a, Everyone, haas, in, story, and)
分区RDD:
- coalesce(numPartitions: Int,shuffle:Boolean = false):RDD[T]
- repartition(numPartitions:Int):RDD[T]
coalesce和repartition都是对RDD进行重新分区。coalesce操作使用HashPartitione进行分区,第一个参数为重分区的数目,第二个为是否进行shuffle,默认情况为false。repartition的操作是coalesce函数第二个参数为true的实现。
scala> var data = sc.textFile("/root/spark/sparkLearn.txt")
data: org.apache.spark.rdd.RDD[String] = /root/spark/sparkLearn.txt MapPartitionsRDD[17] at textFile at <console>:24
scala> data.partitions.size
res7: Int = 2
scala> var datacoa =data.coalesce(1)
datacoa: org.apache.spark.rdd.RDD[String] = CoalescedRDD[18] at coalesce at <console>:25
scala> datacoa.partitions.size
res8: Int = 1
//如果重分区的数目大于原来的分区数,那么必须制定shuffle的参数为true,否则分区数不会变化。
scala> var datacoa = data.coalesce(4)
datacoa: org.apache.spark.rdd.RDD[String] = CoalescedRDD[19] at coalesce at <console>:25
scala> datacoa.partitions.size
res9: Int = 2
- randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong):Array[RDD[T]]
- glom():RDD[Array[T]]
randomSplit操作是根据weights权重将一个RDD分割为多个RDD,而glom操作则是RDD每一个分区所有类型为T的数据转变元素类型为T的数组[Array[T]]
示例代码:
scala> var rdd = sc.makeRDD(1 to 10,10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at <console>:24
scala> var splitRDD = rdd.randomSplit(Array(1.0,2.0,3.0,4.0))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[21] at randomSplit at <console>:25, MapPartitionsRDD[22] at randomSplit at <console>:25, MapPartitionsRDD[23] at randomSplit at <console>:25, MapPartitionsRDD[24] at randomSplit at <console>:25)
//splitRDD是一个数组
scala> splitRDD.size
res10: Int = 4
scala> splitRDD(0).collect
res11: Array[Int] = Array()
scala> splitRDD(1).collect
res12: Array[Int] = Array(2, 5, 6, 7, 9)
scala> splitRDD(2).collect
res13: Array[Int] = Array(3)
scala> splitRDD(3).collect
res14: Array[Int] = Array(1, 4, 8, 10)
scala> var rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at makeRDD at <console>:24
scala> rdd.glom().collect
res15: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
- union(other:RDD[T]):RDD[T]
- intersection(other:RDD[T]):RDD[T]
- intersection(other:RDD[T],numPartitions:Int):RDD[T]
- intersection(other:RDD[T],partitioner:Partitioner):RDD[T]
- subtract(other: RDD[T]): RDD[T]
- subtract(other: RDD[T],numPartitions:Int): RDD[T]
- subtract(other: RDD[T],p:Partitioner): RDD[T]
union操作试讲两个RDD合并,返回两个RDD的并集,返回元素不去重。itersection操作类似于SQL中的inner join操作。返回两个RDD的交集,返回元素去重。在intersection和subtract操作的numPartitions指定的是返回RDD的分区数,参数partitioner用于指定分区函数。
示例代码:
scala> var rdd = sc.makeRDD(1 to 2,1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at <console>:24
scala> var rdd1 = sc.makeRDD(2 to 3,1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at makeRDD at <console>:24
scala> rdd.union(rdd1).collect
res17: Array[Int] = Array(1, 2, 2, 3)
scala> rdd.intersection(rdd1).collect
res18: Array[Int] = Array(2)
- mapPartitions[U] (f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
- mapPartitionsWithIndex[U](f: (Int ,Iterator[T])=>Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
mapPartitions的操作和map操作类似,不过映射的参数有RDD种的每一个元素变成了RDD中的每一个分区的迭代器,其中参数preservesPartitioning表示的是是否保留父RDD的partitioner分区信息。如果在映射过程中需要频繁的创建额外的对象,那么使用mapPartitions操作比map操作高效的多。比如,将RDD中的所有数据通过jdbc连接写入数据库,如果使用map函数,可能需要为每一个元素都要创建一个connection,这样开销就很大。但是如果使用mapPartitions,那么只需要对应一个分区建立一个连接。mapPartitionsWithIndex的操作类似于mapPartitions,只是输入参数多了一个分区索引。
代码实例:
scala> var rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at <console>:24
scala> var rdd2 = rdd.mapPartitions{x =>{
| var res = List[Int]()
| var i =0
| while(x.hasNext){
| i += x.next()
| }
| res.::(i).iterator
| }}
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[38] at mapPartitions at <console>:25
scala> rdd2.collect
res20: Array[Int] = Array(3, 12)
scala> rdd2.partitions.size
res21: Int = 2
使用分区索引的方式,在每个分区的累加结果前面加上了索引
scala> var rdd1 =sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> var rdd2 = rdd1.mapPartitionsWithIndex{
| (x,iter)=>{
| var res =List[String]()
| var i =0
| while(iter.hasNext){
| i += iter.next()
| }
| res.::(x+"|"+i).iterator
| }
| }
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:25
scala> rdd2.collect
[Stage 0:> (0 + 1) / [Stage 0:> (0 + 2) / res0: Array[String] = Array(0|3, 1|12)
- zip[U] (other: RDD[U]): RDD[(T,U)]
- zipPartitions[B,V] (rdd2:RDD[B])(f: (Iterator[T],Iterator[B])=> Iterator[V]): RDD[V]
- zipPartitions[B,V] (rdd2: RDD[B],preservesPartitioning: Boolean)(f: (Iterator[T],Iterator[B])=>Iterator[V])=>Iterator[V]): RDD[V]
- zipPartitions[B,C,V] (rdd2: RDD[B],rdd3:RDD[C])(f: (Iterator[T],Iterator[B],Iterator[C])=>Iterator[V]:RDD[V])
- zipPartitions[B,C,V] (rdd2:RDD[B],rdd3: RDD[C])(f: (Iterator[T],Iterator[B],Iterator[C])=>Iterator[V]):RDD[V]
- zipPartitions[B,C,D,V] (rdd2:RDD[B],rdd3: RDD[C],rdd4:RDD[D])(f: (Iterator[T],Iterator[B],Iterator[C],Iterator[D])=>Iterator[V]):RDD[V]
- zipPartitions[B,C,D,V] (rdd2:RDD[B],rdd3: RDD[C],rdd4:RDD[D],preservesPartitioning: Boolean)(f: (Iterator[T],Iterator[B],Iterator[C],Iterator[D])=>Iterator[V]):RDD[V]
zip操作用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都想同,否则会抛异常。zipPartitions操作将多个RDD按照partition组合成新的RDD,该操作需要组合RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。
示例代码:
scala> var rdd =sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> var rdd1 =sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at makeRDD at <console>:24
//这里需要注意的是进行zip操作的时候,如果两个RDD的分区数不同,会报一个分区数不正确的错误,大家可以自行尝试一下。
scala> rdd.zip(rdd1).collect
[Stage 0:> (0 + 1) / [Stage 0:> (0 + 2) / res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))
scala> var rdd2 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at makeRDD at <console>:24
scala> rdd2.mapPartitionsWithIndex{
| (x,iter)=>{
| var res = List[String]()
| while(iter.hasNext){
| res.::=("part_"+x+"|"+iter.next())
| }
| res.iterator
| }
| }.collect
res2: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)
- zipWithIndex(): RDD[(T,Long)]
- zipWithUniqueId(): RDD[(T,Long)]
zipWithIndex操作将RDD的元素和这个元素在RDD中的ID(索引)组合为键值对。zipWithUniqueId操作则是将RDD中的元素和一个唯一ID组合为键值对。唯一ID生成规则如下: - 每个分区中第一个元素的唯一ID值是这个分区的索引号;
- 每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值)+(该RDD总的分区数)
zipWithIndex需要启动一个spark作业来计算每个分区的索引号,而zipWithUniqueId则不需要。
示例代码:
scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at <console>:24
scala> var rdd =sc.makeRDD(Seq("A","B","C","D","E","F"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at makeRDD at <console>:24
scala> var rdd1 =sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at makeRDD at <console>:24
scala> rdd1.zipWithIndex().collect
res3: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3), (E,4))
scala> rdd.zipWithUniqueId().collect
res4: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
键值转换操作
- partitionBy(partitioner: Partitioner): RDD[(K,V)]
- mapValues[U] (f:[V] =>U): RDD[(K,U)]
- flatMapValues[U] (f:(V)=> TraversableOnce[U]): RDD[(K,U)]
partitonBy操作根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。mapValues类似于map,只不过前者针对[K,V]中的V值进行map操作,同样,flatMapValues也是对Values进行的操作。
示例代码:
scala> var rdd =sc.makeRDD(Array((1,"a"),(2,"b"),(3,"c"),(4,"d")),2)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[10] at makeRDD at <console>:24
//查看RDD每个分区的元素
scala> rdd.mapPartitionsWithIndex{
| (partIdx,iter)=>{
| var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
| while(iter.hasNext){
| var part_name ="part_" + partIdx;
| var elem =iter.next()
| if(part_map.contains(part_name)){
| var elems =part_map(part_name)
| elems ::=elem
| part_map(part_name)=elems
| }else{
| part_map(part_name) =List[(Int,String)]{elem}
| }
| }
| part_map.iterator
| }
| }.collect
res5: Array[(String, List[(Int, String)])] = Array((part_0,List((2,b), (1,a))), (part_1,List((4,d), (3,c))))
//使用partitionBy重分区
scala> var rdd1 =rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[12] at partitionBy at <console>:25
//查看分区后的每个分区的元素
scala> rdd1.mapPartitionsWithIndex{
| (partIdx,iter)=>{
| var part_map =scala.collection.mutable.Map[String,List[(Int,String)]]()
| while(iter.hasNext){
| var part_name ="part_"+partIdx;
| var elem =iter.next()
| if(part_map.contains(part_name)){
| var elems =part_map(part_name)
| elems ::=elem
| part_map(part_name) =elems
| }else{
| part_map(part_name) = List[(Int,String)]{elem}
| }
| }
| part_map.iterator
| }
| }.collect
res6: Array[(String, List[(Int, String)])] = Array((part_0,List((4,d), (2,b))), (part_1,List((3,c), (1,a))))
//使用mapValues进行操作
scala> var rdd = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at makeRDD at <console>:24
scala> rdd.mapValues(x=>x+"_").collect
res7: Array[(Int, String)] = Array((1,A_), (2,B_), (3,C_), (4,D_))
- combineByKey[C] (createConbiner: (V)=> C,mergeValue: (C,V)=> C,mergeCombiners:(C,C)=>C): RDD[(K,C)]
- combineByKey[C] (createConbiner: (V)=> C,mergeValue: (C,V)=> C,mergeCombiners:(C,C)=>C,numPartitions: Int): RDD[(K,C)]
- combineByKey[C] (createConbiner: (V)=> C,mergeValue: (C,V)=> C,mergeCombiners:(C,C)=>C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer=null): RDD[(K,C)]
- foldByKey(zeroValue: V)(func:(V,V)=>V):RDD[(K,V)]
- foldByKey(zeroValue: V,numPartitions: Int)(func: (V,V)=>V): RDD[(K,V)]
foldByKey(zeroValue: V,partitioner:Partitioner)(func: (V,V)=>V): RDD[(K,V)]
combineByKey操作用于RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同,也可以不同。foldByKey操作用于RDD[K,V]根据K,V做折叠,合并处理,其中参数zeroValue表示根据映射函数将zeroValue应用于V,进行初始化V,然后再讲映射函数应用于初始化的V。
combineByKey中的参数如下:createCombiner:组合器函数,会将V型转换为C型
- mergeValue:合并值函数,将一个C型和一个V型合并为一个C型
- mergeCombiner:合并组合器函数,将两个C型合并为一个C型
- numpartitions:结果分区数
- partitioner:分区函数
- mapsideCombine:是否在map端进行combine操作,默认为true。
示例代码:
scala> var rdd = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at makeRDD at <console>:24
scala> rdd.foldByKey(0)(_+_).collect
res9: Array[(String, Int)] = Array((B,3), (A,3), (C,1))
scala> rdd.foldByKey(5)(_+_).collect
res10: Array[(String, Int)] = Array((B,8), (A,8), (C,6))
reduceByKey和groupByKey
groupByKey操作用于将RDD中的每个K对应的V合并到一个集合Iterable[V]中,reduceByKey则是将RDD中每个K对应的V根据映射函数来运算。
示例代码:
scala> var rdd = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[20] at makeRDD at <console>:24
scala> rdd.groupByKey().collect
res11: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(1, 2)), (A,CompactBuffer(0, 2)), (C,CompactBuffer(1)))
scala> rdd.reduceByKey((x,y)=>x+y).collect
res12: Array[(String, Int)] = Array((B,3), (A,2), (C,1))
//特别的
scala> rdd.reduceByKeyLocally((x,y)=>x+y)
res13: scala.collection.Map[String,Int] = Map(A -> 2, B -> 3, C -> 1)
cogroup
cogroup相当于SQL中的全外关联,返回左右RDD中的记录,关联不上的为空,可传入的参数有1-3个RDD,还可以指定分区数和分区函数。
示例代码:
scala> var rdd = sc.makeRDD(Array(("A",0),("B",2),("C",3)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at makeRDD at <console>:24
scala> var rdd = sc.makeRDD(Array(("A",0),("B",2),("C","c")),2)
rdd: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[25] at makeRDD at <console>:24
scala> var rdd1 = sc.makeRDD(Array(("A",0),("B",2),("C",3)),2)
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[26] at makeRDD at <console>:24
scala>
scala> var rdd2 = sc.makeRDD(Array(("A","A"),("B","e")),2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[27] at makeRDD at <console>:24
scala> var rdd3 = rdd.cogroup(rdd2,rdd1)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Any], Iterable[String], Iterable[Int]))] = MapPartitionsRDD[29] at cogroup at <console>:29
scala> rdd3.collect
res14: Array[(String, (Iterable[Any], Iterable[String], Iterable[Int]))] = Array((B,(CompactBuffer(2),CompactBuffer(e),CompactBuffer(2))), (A,(CompactBuffer(0),CompactBuffer(A),CompactBuffer(0))), (C,(CompactBuffer(c),CompactBuffer(),CompactBuffer(3))))
scala> rdd3.partitions.size
res15: Int = 2
join、fullOuterJoin、leftOuterJoin、rightOuterJoin、subtractByKey
上述的RDD测操作除了subtractByKey之外都是针对RDD中的K相等的连接操作,分别对应内连接、全连接、左连接和右连接,这些实现都是调用的cogroup实现的,而subtractByKey针对的是键值操作。
代码实例:
scala> var rdd = sc.makeRDD(Array(("A",1),("B",2),("C",3)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at makeRDD at <console>:24
scala> var rdd2 = sc.makeRDD(Array(("A","a"),("B","b"),("C","c"),("D","d")),2)
rdd2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[31] at makeRDD at <console>:24
//内连接
scala> rdd.join(rdd2).collect
res16: Array[(String, (Int, String))] = Array((B,(2,b)), (A,(1,a)), (C,(3,c)))
//左连接
scala> rdd.leftOuterJoin(rdd2).collect
res17: Array[(String, (Int, Option[String]))] = Array((B,(2,Some(b))), (A,(1,Some(a))), (C,(3,Some(c))))
//右连接
scala> rdd.rightOuterJoin(rdd2).collect
res18: Array[(String, (Option[Int], String))] = Array((B,(Some(2),b)), (D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))
至此,spark编程模型RDD的创建和转换的基本操作就介绍的差不多了,下一部分的内容将要介绍的是RDD的控制类型和行动类型。有兴趣的小伙伴可以关注我的博客,不定时更新spark相关的技术,大家一同学习,共同进步!!