Value 类型
map
传递的函数每个元素执行一次
mapPartitions
传递的函数,每个分区执行一次
map()和mapPartitions()的区别
map()
:每次处理一条数据。mapPartitions()
:每次处理一个分区的数据,这个分区的数据处理完后,原 RDD 中该分区的数据才能释放,可能导致 OOM。- 开发指导:当内存空间较大的时候建议使用mapPartitions(),以提高处理效率。
flatMap
glom()
看分区数
groupBy
filter
sample
distinct
coalesce
作用: 缩减分区数到指定的数量,用于大数据集过滤后,提高小数据集的执行效率。
coalesce
一般用来减少分区
默认只能减少分区
coalesce(6,true)
参数2表示是否shuffle,如果是true,就可以增加分区
注意:如果增加分区就一定要shuffle
减少分区一般不要shuffle
repartition
作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络.
新的分区数相比以前可以多, 也可以少
repartition
重新分区
一定shuffle,所以一般用这个算子来增加分区
减少分区不建议使用
coalasce和repartition的区别
coalesce
一般用来减少分区
默认只能减少分区
coalesce(6,true)
参数2表示是否shuffle,如果是true,就可以增加分区
注意:如果增加分区就一定要shuffle
减少分区一般不要shuffle
repartition
重新分区
一定shuffle,所以一般用这个算子来增加分区
减少分区不建议使用
sortBy
pipe
作用: 管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell命令或脚本,返回输出的RDD。一个分区执行一次这个命令. 如果只有一个分区, 则执行一次命令.
注意:
脚本要放在 worker 节点可以访问到的位置
步骤1: 创建一个脚本文件pipe.sh
文件内容如下:
echo "hello"
while read line;do
echo ">>>"$line
done
步骤2: 创建只有 1 个分区的RDD
scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.pipe("pipe.sh").collect
res0: Array[String] = Array(hello, >>>10, >>>20, >>>30, >>>40)
步骤3: 创建有 2 个分区的 RDD
scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd1.pipe("pipe.sh").collect
res2: Array[String] = Array(hello, >>>10, >>>20, hello, >>>30, >>>40)
总结: 每个分区执行一次脚本, 但是每个元素算是标准输入中的一行
双Value类型交互
union(并集)
注意:
union和++是等价的
val rdd3 = rdd1 ++ rdd2
subtract(差集)
intersection(交集)
cartesian(笛卡尔积)
作用: 计算 2 个 RDD 的笛卡尔积. 尽量避免使用
zip(拉链)
拉链:1.对应的分区的元素各个应该一样 2.分区数也要一样
总结:1.总的元素个数相等2.分区数相等
Key-Value 类型
partitionBy
跟partitionBy类似的算子有
coalesce:默认不shuffle(一般用于减少分区)
repartition:一定shuffle(一般用于增加分区)
思考:如何按照value来分区(patitionBy默认按key分区)
思路:将kv调换,分完区之后,调换回来
reduceByKey
没有零值, 并分区内聚合和分区间聚合相同
reduceByKey
1.是一个聚合算子
2.和scala(reduce,foldleft)的不一样 scala最终聚合成1个值
3.spark的这个聚合,是根据key来聚合的:
结果是和key的种类 k1,k2
4.先调整类型为 kv
groupByKey
groupByKey
1.分组,按照key进行分组
2.groupBy(x=>..)按照返回值来分
3.groupByKey只能永辉kv形式的
4.groupBy任意RDD都可以使用
reduceByKey和groupByKey的区别
groupByKey和reduceByKey的区别
https://blog.csdn.net/qq_46548855/article/details/108030239
- reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
- groupByKey:按照key进行分组,直接进行shuffle。
- 开发指导:reduceByKey比groupByKey性能更好,建议使用。但是需要注意是否会影响业务逻辑。
foldByKey
foldByKey
1.折叠,也是聚合,和reduceByKey一样,也有聚合
所有的聚合算子都有预聚合
2.多了一个0值的功能
3.0值到底参与多少次运算
只在分区内聚合(预聚合的)时有效
zeroValue*分区数+计数值=最终结果
zeroValue*分区数+计数值=最终结果
hello:13+4=7
world(只有一个分区):11+1=2
aggregateByKey
aggregateByKey
1.reduceByKey,foldByKey 都有预聚合
分区内预聚合的逻辑和分区间聚合的逻辑是一样的
2.aggregateByKey 实现了分区聚合逻辑和分区间的聚合逻辑不一样
3.零值是写死的
需求:计算每个分区相同key最大值 然后相加
这行代码也可以简化成
val result = rdd.aggregateByKey(Int.MinValue)(_.max(_),_+_)
需求:分区内同时计算最大值和最小值 2.分区间计算最大值的和与最小值的和
需求:计算每个key的平均值 a:每个key的value的值 b:每个key出现的次数
combineKeyBy
combineKeyBy
1.分区内聚合和分区间的聚合逻辑不一样
2.零值不是写死的,零值是根据碰到的每个key的一个value来动态生成
需求:求key的和
需求:每个key在每个分区内的最大值,然后再求出这些最大值的和
简化版
val result = rdd.combineByKey(
+_,
(_:Int).max(_:Int),
(_:Int)+(_:Int)
)
需求:每个key的平均值
sortByKey
sortByKey
1.用来排序
2.只能用来排序kv形式的RDD
3.这个用的比较少
4.sortBy 这个用的比较多
5.即使有很大的数据,也不会出现oom,所以,排序的时候尽量用spark的排序
,避免使用scala的排序
cogroup
join