多个RDD合并
RDD的合并,按RDD内数据结构的是否相同分为两类。
1、合并的多个RDD结构相同
涉及的Spark函数有union、intersection、subtract
1.1 union
def union(other: RDD[T]): RDD[T]
该函数比较简单,就是将两个RDD进行合并,不去重。
1.2 intersection
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
该函数返回两个RDD的交集,并且去重。
参数numPartitions指定返回的RDD的分区数。
1.3 subtract
def subtract(other: RDD[T]): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
该函数类似于intersection,但返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。
参数含义同intersection
scala> var rdd1 = sc.makeRDD(Seq(1,2,2,3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at makeRDD at :21 scala> rdd1.collect res48: Array[Int] = Array(1, 2, 2, 3) scala> var rdd2 = sc.makeRDD(3 to 4) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[67] at makeRDD at :21 scala> rdd2.collect res49: Array[Int] = Array(3, 4) scala> rdd1.subtract(rdd2).collect res50: Array[Int] = Array(1, 2, 2)
RDD之间的join
cogroup
cogroup相当于SQL中的全外关联full outer join,返回左右RDD中的记录,关联不上的为空。
参数numPartitions用于指定结果的分区数。
参数partitioner用于指定分区函数。
##参数为1个RDD的例子
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) scala> var rdd3 = rdd1.cogroup(rdd2) rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[12] at cogroup at :25 scala> rdd3.partitions.size res3: Int = 2 scala> rdd3.collect res1: Array[(String, (Iterable[String], Iterable[String]))] = Array( (B,(CompactBuffer(2),CompactBuffer())), (D,(CompactBuffer(),CompactBuffer(d))), (A,(CompactBuffer(1),CompactBuffer(a))), (C,(CompactBuffer(3),CompactBuffer(c))) ) scala> var rdd4 = rdd1.cogroup(rdd2,3) rdd4: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String]))] = MapPartitionsRDD[14] at cogroup at :25 scala> rdd4.partitions.size res5: Int = 3 scala> rdd4.collect res6: Array[(String, (Iterable[String], Iterable[String]))] = Array( (B,(CompactBuffer(2),CompactBuffer())), (C,(CompactBuffer(3),CompactBuffer(c))), (A,(CompactBuffer(1),CompactBuffer(a))), (D,(CompactBuffer(),CompactBuffer(d))))
##参数为2个RDD的例子
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) var rdd3 = sc.makeRDD(Array(("A","A"),("E","E")),2) scala> var rdd4 = rdd1.cogroup(rdd2,rdd3) rdd4: org.apache.spark.rdd.RDD[(String, (Iterable[String], Iterable[String], Iterable[String]))] = MapPartitionsRDD[17] at cogroup at :27 scala> rdd4.partitions.size res7: Int = 2 scala> rdd4.collect res9: Array[(String, (Iterable[String], Iterable[String], Iterable[String]))] = Array( (B,(CompactBuffer(2),CompactBuffer(),CompactBuffer())), (D,(CompactBuffer(),CompactBuffer(d),CompactBuffer())), (A,(CompactBuffer(1),CompactBuffer(a),CompactBuffer(A))), (C,(CompactBuffer(3),CompactBuffer(c),CompactBuffer())), (E,(CompactBuffer(),CompactBuffer(),CompactBuffer(E))))
join
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
join相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) scala> rdd1.join(rdd2).collect res10: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))
leftOuterJoin
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) scala> rdd1.leftOuterJoin(rdd2).collect res11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))
rightOuterJoin
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) scala> rdd1.rightOuterJoin(rdd2).collect res12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))
subtractByKey
def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]
def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]
subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数
var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) scala> rdd1.subtractByKey(rdd2).collect res13: Array[(String, String)] = Array((B,2))
Action操作
聚合
aggregate
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。
var rdd1 = sc.makeRDD(1 to 10,2) rdd1.mapPartitionsWithIndex{ (partIdx,iter) => { var part_map = scala.collection.mutable.Map[String,List[Int]]() while(iter.hasNext){ var part_name = "part_" + partIdx; var elem = iter.next() if(part_map.contains(part_name)) { var elems = part_map(part_name) elems ::= elem part_map(part_name) = elems } else { part_map(part_name) = List[Int]{elem} } } part_map.iterator } }.collect res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
##第一个分区中包含5,4,3,2,1
##第二个分区中包含10,9,8,7,6
scala> rdd1.aggregate(1)( | {(x : Int,y : Int) => x + y}, | {(a : Int,b : Int) => a + b} | ) res17: Int = 58
结果为什么是58,看下面的计算过程:
##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1
##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16
## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1
##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
再比如:
scala> rdd1.aggregate(2)( | {(x : Int,y : Int) => x + y}, | {(a : Int,b : Int) => a * b} | ) res18: Int = 1428
##这次zeroValue=2
##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。
fold
def fold(zeroValue: T)(op: (T, T) ⇒ T): T
fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
scala> rdd1.fold(1)( | (x,y) => x + y | ) res19: Int = 58 ##结果同上面使用aggregate的第一个例子一样,即: scala> rdd1.aggregate(1)( | {(x,y) => x + y}, | {(a,b) => a + b} | ) res20: Int = 58
拿元素
lookup
def lookup(key: K): Seq[V]
lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21 scala> rdd1.lookup("A") res0: Seq[Int] = WrappedArray(0, 2) scala> rdd1.lookup("B") res1: Seq[Int] = WrappedArray(1, 2)
take
def take(num: Int): Array[T]
take用于获取RDD中从0到num-1下标的元素,不排序。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 scala> rdd1.take(1) res0: Array[Int] = Array(10) scala> rdd1.take(2) res1: Array[Int] = Array(10, 4)
top
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 scala> rdd1.top(1) res2: Array[Int] = Array(12) scala> rdd1.top(2) res3: Array[Int] = Array(12, 10) //指定排序规则 scala> implicit val myOrd = implicitly[Ordering[Int]].reverse myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef scala> rdd1.top(1) res4: Array[Int] = Array(2) scala> rdd1.top(2) res5: Array[Int] = Array(2, 3)
takeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
takeOrdered和top类似,只不过以和top相反的顺序返回元素。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 scala> rdd1.top(1) res4: Array[Int] = Array(2) scala> rdd1.top(2) res5: Array[Int] = Array(2, 3) scala> rdd1.takeOrdered(1) res6: Array[Int] = Array(12) scala> rdd1.takeOrdered(2) res7: Array[Int] = Array(12, 10)
save
saveAsTextFile、saveAsSequenceFile、saveAsObjectFile、saveAsHadoopFile、saveAsHadoopDataset、saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
参考:http://lxw1234.com/archives/category/spark