版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_18377515/article/details/82982557
说明:spark 算子分为两类:一类是Transformation算子,一类是Action算子,其中Transformation算子不会触发作业提交,Action算子会触发作业提交。
Transformation算子
- map(输入分区与输出分区一对一)
例子:
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples: Seq[(Int, Int, Int)] = Seq(
(1, 2, 3),
(4, 5, 6)
)
val rdd1: RDD[(Int, Int, Int)] = spark.sparkContext.parallelize(tuples)
val rdd2: RDD[(Int, Int, Int)] = rdd1.map(f => {
(f._1 + 1, f._2 + 1, f._3 + 1)
})
rdd2.foreach(f=>{
println(f._1+"::"+f._2+"::"+f._3)
})
spark.stop()
解释:左边的框代表rdd1 通过 map 进行元素加一操作 变换成rdd2
2. flatMap(输入分区与输出分区一对一)
例子:
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples: Seq[String] = Seq(
"v1,v2,v3",
"u1,u2,u3"
)
val rdd1: RDD[String] = spark.sparkContext.parallelize(tuples)
val rdd2: RDD[(String, String)] = rdd1.flatMap(f => {
f.split(",").map(m => {
(m, f)
})
})
println("==============rdd1==============")
rdd1.foreach(f=>{
println(f)
})
println("==============rdd2==============")
rdd2.foreach(f=>{
println(f)
})
spark.stop()
解释:rdd1中有两个集合,通过flatmap(按照逗号切分,分别和每个集合中的所有数据组合)算子进行拆散,形成rdd2
3. mapPartitionsWithIndex/mapPartitions(输入分区与输出分区一对一)
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples: Seq[Int] = Seq(
1,2,3,4,5,6,7,8,12,-12,-5
)
val rdd1: RDD[Int] = spark.sparkContext.parallelize(tuples)
val rdd2: RDD[(Int, Int)] = rdd1.mapPartitionsWithIndex((index, f) => {
f.filter(_>=3).map(m => {
(index, m)
})
})
rdd2.foreach(f=>{
println(f._1+";;"+f._2)
})
spark.stop()
解释:mapPartitions和mapPartitionsWithIndex是通过分区迭代器对整个分区元素进行操作。图中一个方块代表一个rdd分区,通过filter过滤之后,形成右侧的rdd
4. glom(输入分区与输出分区一对一)
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples: Seq[Int] = Seq(
1,2,3,4,5,6,7,8,12,-12,-5
)
val rdd1: RDD[Int] = spark.sparkContext.parallelize(tuples)
val rdd2: RDD[Array[Int]] = rdd1.glom()
rdd2.foreach(f=>{
println(f.mkString(";;"))
})
spark.stop()
解释:将每个分区形成一个数组
5. union(输入分区与输出分区多对一)
例子
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String, String)] = Seq(
("key1", "123a"),
("key2", "123b"),
("key3", "123c"),
("key4", "123d")
)
val tuples2: Seq[(String, String)] = Seq(
("key1", "abc1"),
("key2", "abc2"),
("key3", "abc3"),
("key4", "abc4")
)
val rdd1: RDD[(String, String)] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[(String, String)] = spark.sparkContext.parallelize(tuples2)
val rdd3: RDD[(String, String)] = rdd1.union(rdd2)
rdd3.foreach(println)
spark.stop()
解释:左侧的两个大方块代表两个rdd,右侧的代表合并之后的rdd,使用union函数时需要保证两个rdd元素的数据类型相同,返回的rdd数据类型和被合并的rdd元素数据类型相同,并不进行去重操作。
6. cartesian 笛卡尔积(输入分区与输出分区多对一)
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String, String)] = Seq(
("key1", "123a"),
("key2", "123b"),
("key3", "123c"),
("key4", "123d")
)
val tuples2: Seq[(String, String)] = Seq(
("key1", "abc1"),
("key2", "abc2"),
("key3", "abc3"),
("key4", "abc4")
)
val rdd1: RDD[(String, String)] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[(String, String)] = spark.sparkContext.parallelize(tuples2)
val rdd3: RDD[((String, String), (String, String))] = rdd1.cartesian(rdd2)
rdd3.foreach(println)
spark.stop()
// result
// ((key1,123a),(key2,abc2))
// ((key1,123a),(key1,abc1))
// ((key1,123a),(key4,abc4))
// ((key1,123a),(key3,abc3))
// ((key2,123b),(key1,abc1))
// ((key2,123b),(key2,abc2))
// ((key2,123b),(key3,abc3))
// ((key3,123c),(key2,abc2))
// ((key3,123c),(key1,abc1))
// ((key3,123c),(key3,abc3))
// ((key2,123b),(key4,abc4))
// ((key3,123c),(key4,abc4))
// ((key4,123d),(key2,abc2))
// ((key4,123d),(key1,abc1))
// ((key4,123d),(key3,abc3))
// ((key4,123d),(key4,abc4))
- groupby(输入分区与输出分区多对多)
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String, String)] = Seq(
("key1", "123a"),
("key1", "123e"),
("key1", "123m"),
("key2", "123b"),
("key3", "123c"),
("key4", "123d")
)
val rdd1: RDD[(String, String)] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[(String, Iterable[String])] = rdd1.groupByKey()
rdd2.foreach(f=>{
println(f._1+":"+f._2.toList.mkString(";;"))
})
spark.stop()
// result
// key3:123c
// key4:123d
// key1:123a;;123e;;123m
// key2:123b
- filter(输出分区为输入分区的子集)
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[Int] = Seq(
1,2,3,4,5
)
val rdd1: RDD[Int] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[Int] = rdd1.filter(_>=4)
rdd2.foreach(println)
spark.stop()
// result
// 4
// 5
- distinct(输出分区为输入分区的子集)
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[Int] = Seq(
1,2,2,3,3,4,5
)
val rdd1: RDD[Int] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[Int] = rdd1.distinct
rdd2.foreach(println)
spark.stop()
// result
// 4
// 2
// 3
// 1
// 5
- subtract(输出分区为输入分区的子集)
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[Int] = Seq(
1,2,2,3,3,4,5
)
val tuples2: Seq[Int] = Seq(
3,5,6
)
val rdd1: RDD[Int] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[Int] = spark.sparkContext.parallelize(tuples2)
val rdd3: RDD[Int] = rdd1.subtract(rdd2)//rdd1中去除和rdd2中交集的元素
rdd3.foreach(println)
spark.stop()
// result 2 2 4 1
- sample(输出分区为输入分区的子集)
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[Int] = Seq(
1,2,3,4,5,6,7,8,9,10
)
val rdd1: RDD[Int] = spark.sparkContext.parallelize(tuples1)
//有放回采样 采样50%数据 数量
val rdd2: RDD[Int] = rdd1.sample(true,0.5,9)
rdd2.foreach(println)
spark.stop()
// result
// 1
// 9
// 3
// 3
// 4
// 5
// 7
// 1
- cache
将rdd元素从磁盘缓存到内存,底层是使用persist(StorageLevel.MEMORY_ONLY)函数实现。 - persist
对rdd进行缓存操作。通过参数StorageLevel枚举类型决定数据缓存到什么地方。
//不进行持久化
val NONE = new StorageLevel(false, false, false, false)
//仅持久化到磁盘
val DISK_ONLY = new StorageLevel(true, false, false, false)
//将每个分区持久化到磁盘上并且每个分区备份到集群中两个不同节点上
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
/*默认持久化策略. 将RDD作为JAVA对象反序列化后持久化到JVM虚拟机内存中,如果内存不能容纳 所有的RDD分区,则有些RDD分区数据不能持久化。即不能被命中(cached),当这些RDD分区再次被使用时,需要重新计算*/
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
//将每个分区备份到集群中的两个节点上,其他和MEMORY_ONLY相同
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
/*与 MEMORY_ONLY 的唯一区别:将RDD作为JAVA对象(每个分区占一个字节数组)序列化后持久化到JVM虚拟机内存中。优缺点: 减少内存开销,但是需要反序列化,消耗CPU*/
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
//即将每个分区备份到集群2个不同节点,其他同上
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
//将RDD作为JAVA对象反序列化后进行持久化。如果内存不能全部容纳RDD数据,则剩余部分会被持久化到磁盘。后续从内存和磁盘上按需读取
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
//即将每个分区备份到集群两个不同节点,其他同上
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
//与 MEMORY_ONLY_SER 类似,内存容纳不下的RDD分区数据持久化至磁盘,避免,避免再次需要的时候重新计算
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
//即将每个分区备份到2个集群节点,其他同上
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
//减少垃圾回收的开销,使得executors更小及共享内存池。
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
- mapValues(输入分区与输出分区一对一)
针对(key,value)型数据中的value进行map操作。
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k2",2),
("k3",3),
("k4",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[(String, Int)] = rdd1.mapValues(m => {
m + 2
})
rdd2.foreach(println)
spark.stop()
// result
// (k3,5)
// (k1,3)
// (k4,6)
// (k2,4)
- combineByKey(单个rdd聚集)
推荐博文 - reduceByKey(单个rdd聚集)
将相同key的值进行合并
val spark: SparkSession = SparkSession.builder().master("local[4]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val rdd2 = rdd1.reduceByKey((p,n)=>p+n)
rdd2.foreach(println)
spark.stop()
// result (k1,3) (k2,3) (k3,4)
- partitionBy(单个rdd聚集)
对rdd进行分区操作,如果原有的rdd的分区器和现有的分区器一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的shuffledrdd
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val rdd2 = rdd1.mapPartitionsWithIndex((index, ite) => {
ite.map(m => {
(index, m)
})
})
println("====================rdd2==================")
rdd2.foreach(println)
val rdd3: RDD[(String, Int)] = rdd1.partitionBy(new HashPartitioner(4))
val rdd4: RDD[(Int, (String, Int))] = rdd3.mapPartitionsWithIndex((index, ite) => {
ite.map(m => {
(index, m)
})
})
println("====================rdd4==================")
rdd4.foreach(println)
spark.stop()
// result
// ====================rdd2==================
// (0,(k1,1))
// (0,(k1,2))
// (0,(k2,3))
// (0,(k3,4))
// (1,(k4,1))
// (1,(k5,2))
// (1,(k6,3))
// (1,(k7,4))
// ====================rdd4==================
// (0,(k3,4))
// (0,(k7,4))
// (1,(k4,1))
// (2,(k1,1))
// (2,(k1,2))
// (2,(k5,2))
// (3,(k2,3))
// (3,(k6,3))
- cogroup(对两个rdd聚集)
将两个rdd进行协同划分,每个rdd相同的key元素分别聚合为一个集合,并且返回两个rdd中对应key的元素集合的迭代器
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val tuples2: Seq[(String,Int)] = Seq(
("m1",1),
("m1",2),
("m2",3),
("k3",4),
("m4",1),
("k5",2),
("m6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples2)
val rdd3: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
rdd3.foreach(f=>{
println(f._1+";;"+f._2._1+";;"+f._2._2)
})
spark.stop()
// result
// m4;;CompactBuffer();;CompactBuffer(1)
// k5;;CompactBuffer(2);;CompactBuffer(2)
// k6;;CompactBuffer(3);;CompactBuffer()
// k3;;CompactBuffer(4);;CompactBuffer(4)
// k2;;CompactBuffer(3);;CompactBuffer()
// k7;;CompactBuffer(4);;CompactBuffer(4)
// m2;;CompactBuffer();;CompactBuffer(3)
// k1;;CompactBuffer(1, 2);;CompactBuffer()
// m6;;CompactBuffer();;CompactBuffer(3)
// m1;;CompactBuffer();;CompactBuffer(1, 2)
// k4;;CompactBuffer(1);;CompactBuffer()
- join
//内部实现:先将两个rdd进行cogroup操作形成新的rdd,对每个key下的元素进行笛卡尔积操作,返回的结果在展平,对应key下的所有元素形成一个集合,最后返回RDD[(K,(V,W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
例子:
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val tuples2: Seq[(String,Int)] = Seq(
("m1",1),
("m1",2),
("m2",3),
("k3",4),
("m4",1),
("k5",2),
("m6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples2)
val rdd3: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
rdd3.foreach(f=>{
println("key:"+f._1+",value:"+f._2._1+";;"+f._2._2)
})
spark.stop()
// result
// key:k5value:2,2
// key:k3value:4,4
// key:k7value:4,4
- leftOutJoin(左外连接)和rightOutJoin(右外连接)
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val tuples2: Seq[(String,Int)] = Seq(
("m1",1),
("m1",2),
("m2",3),
("k3",4),
("m4",1),
("k5",2),
("m6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val rdd2: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples2)
val rdd3: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
rdd3.foreach(f=>{
println("key:"+f._1+",value:"+f._2._1+";;"+f._2._2.getOrElse(0))
})
spark.stop()
// result
// key:k6,value:3;;0
// key:k5,value:2;;2
// key:k2,value:3;;0
// key:k3,value:4;;4
// key:k7,value:4;;4
// key:k4,value:1;;0
// key:k1,value:1;;0
// key:k1,value:2;;0
Actions算子
说明:actions算子是通过sparkContext执行runJob操作进行提交作业,触发rdd的DAG执行。
- foreach
对rdd中每个元素都应用f函数操作,不反回Rdd和Array,而是返回Uint
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
rdd1.foreach(f=>println(f))
spark.stop()
- saveAsTextFile 将数据输出到指定的HDFS目录
- saveAsObjectFile 将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile 的格式。
- collect 相当于toArray,toArray已经过时不推荐使用,collect将分布式的Rdd返回一个单机的Scala Array数组。
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val tuples: Array[(String, Int)] = rdd1.collect()
spark.stop()
- collectAsMap 对(K,V)型的RDD数据返回一个单机HashMap。对于重复K的RDD元素,后面的元素覆盖前面的元素。
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val stringToInt: collection.Map[String, Int] = rdd1.collectAsMap()
stringToInt.foreach(f=>{
println(f._1+";;"+f._2)
})
spark.stop()
// result
// k2;;3
// k5;;2
// k4;;1
// k7;;4
// k1;;2
// k3;;4
// k6;;3
- reduceByKeyLocally
先reduce再collectAsmap,先对Rdd的整体进行reduce操作,然后再收集所有的结果返回为一个HashMap.
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val stringToInt: collection.Map[String, Int] = rdd1.reduceByKeyLocally((v1, v2) => {
v1+v2
})
stringToInt.foreach(f=>{
println(f._1+";;"+f._2)
})
spark.stop()
// result
// k1;;3
// k2;;3
// k3;;4
// k4;;1
// k5;;2
// k6;;3
// k7;;4
- lookup
对(key,value)型的rdd操作,返回指定key对应的元素形成的seq。这个函数处理优化的部分在于,如果这个rdd包含分区器,则只会对应处理k所在的分区,然后返回由(k,v)形成的seq.如果rdd不包含分区器,则需要对全rdd元素进行暴力扫描处理,搜索指定k对应的元素。
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val ints: Seq[Int] = rdd1.lookup("k1")
println(ints.length)
spark.stop()
- count 返回整个rdd元素的个数
- top take takeOrdered first
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
//返回最大的k个元素 k7;;4 k6;;3
val top: Array[(String, Int)] = rdd1.top(2)
// 返回最小的k个元素 k1;;1 k1;;2
val take: Array[(String, Int)] = rdd1.take(2)
// 返回最小的k个元素 k1;;1 k1;;2 并且再返回的数组中保持元素的顺序
val takeOrder: Array[(String, Int)] = rdd1.takeOrdered(2)
//相当于top(1)
val first: (String, Int) = rdd1.first()
top.foreach(f=>println(f._1+";;"+f._2))
take.foreach(f=>println(f._1+";;"+f._2))
takeOrder.foreach(f=>println(f._1+";;"+f._2))
println(first._1+";;"+first._2)
spark.stop()
- reduce
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val rdd2: (String, Int) = rdd1.reduce((t1, t2) => {
(t1._1+"@"+t2._1, t1._2 + t2._2)
})
println(rdd2._1+"||"+rdd2._2)
spark.stop()
// result k1@k1@k2@k3@k4@k5@k6@k7||20
- fold
fold和reduce的原理相同,但是与reduce不同的是,迭代器针对每个分区第一个取的元素是zeroValue。
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String,Int)] = Seq(
("k1",1),
("k1",2),
("k2",3),
("k3",4),
("k4",1),
("k5",2),
("k6",3),
("k7",4)
)
val rdd1: RDD[(String,Int)] = spark.sparkContext.parallelize(tuples1)
val rdd2: (String, Int) = rdd1.fold("k0", 10)((x, y) => {
(x._1 + "@" + y._1, x._2 + y._2)
})
println(rdd2._1+";;"+rdd2._2)
spark.stop()
// result k0@k0@k4@k5@k6@k7@k0@k1@k1@k2@k3;;50
- aggregate
aggregate 先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。与fold和reduce不同再于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。而fold和reduce函数需要再每个分区中进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。
val spark: SparkSession = SparkSession.builder().master("local[2]").enableHiveSupport().appName(getClass.getName).getOrCreate()
val tuples1: Seq[(String, Int)] = Seq(
("k1", 1),
("k1", 2),
("k2", 3),
("k3", 4),
("k4", 1),
("k5", 2),
("k6", 3),
("k7", 4)
)
val rdd1: RDD[(String, Int)] = spark.sparkContext.parallelize(tuples1)
//(zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)
val rdd2: (String, Int) = rdd1.aggregate("v0", 1)((u, t) => {
(u._1+"@"+t._1, u._2+t._2)
}, (u1, u2) => {
(u1._1+"@"+u2._1, u1._2+u2._2)
})
println(rdd2._1+";;"+rdd2._2)
spark.stop()