RDD分区
partition概念
在处理大数据时,由于数据量太大,以至于单个节点无法完全存储、计算。所以这些数据需要分割成多个数据块 block,以利用多个集群节点的存储、计算资源。Spark 自动对 RDD 中的大量元素进行分区,并在 worker 节点之间分配分区、计算。记住:分区是逻辑上的而不是物理上的概念。
partition 的相关属性
属性 | 描述 |
---|---|
partitions | 返回包含 RDD 所有分区引用的一个数组 |
partitions.size | 返回 RDD 的分区数量 |
partitioner | 返回一个分区:NONE、HashPartitioner、RangePartitioner、CustomerPartitioner |
spark 使用 partitioner 属性来确定分区算法,以此来确定哪些 worker 需要存储特定的 RDD 记录,如果 partitioner 的值为NONE,表示分区不是基于数据的特性,但是分布是随机的,并且保证了数据在各节点是均匀的。
查看 RDD Partition 信息
textFile 方法有一个可选的预设分区的参数,可以通过 partition.size 来查看。
println(textFileRDD.partitions.size)
注意:使用 textFile 方法读取数据,可以设置 partition 大小。
在集群环境中,读取本地文件 / HDFS 数据,由数据的 Block 个数决定,最小为 2。
特殊情况:如果 local 模式,单线程运行,默认 partitions.size() 为 1。当然在项目中,不会出现此种方式。
注意:每个 partition 会运行一个 task 来处理其数据元素。
RDD 的初始分区
- local:一个线程 -------- sc.defaultParallelism 值为 1
- local[*]:服务器 core 数量 ----- 本地 core 数量,如果 CPU core 为 8 个,那么 sc.defaultParallelism 的值为 8
- local[4]: 4个线程 ----- sc.defaultParallelism 的值为 4
如果 spark.default.parallelism 参数值的说明:
- 如果 spark-default.conf 或 sparkConf 中设置了 spark.default.parallelism 参数值,那么 spark.default.parallelism =设置值;
- 如果 spark-default.conf 或 SparkConf 中没有设置 spark.default.parallelism 参数值,那么:
local 模式:
local --> spark.default.parallelism = 1
local[4] --> spark.default.parallelism = 4
yarn 模式和 standalone 模式:spark.default.parallelism = max (所有executor使用的core总数,2)
由上述规则,确定 spark.default.parallelism 的默认值。
当 spark 程序执行时,会生成 sparkConf 对象,同时会生成以下两个参数值:
sc.defaultParallelism = spark.default.parallelism
sc.defaultMinParallelism = min(spark.default.parallelism, 2)
当sc.defaultParallelism 和 sc.defaultMinPartitions 确认了,就可以推算RDD的分区数了。
在项目中,在 spark-default.conf 文件中,spark.default.parallelism 属性值设置为 executor-cores * executors 个数 * 3。spark.default.parallelism 的设置值为:系统中所有 executCore 的3倍。
transformation 操作对分区的影响
普通 RDD 操作
API 调用 | partition.size | partitioner |
---|---|---|
map | 与父 RDD 相同 | NONE |
flatMap | 与父 RDD 相同 | NONE |
distinct | 与父 RDD 相同 | NONE |
fiter | 与父 RDD 相同 | 与父RDD相同 |
rdd.union(otherRdd) | rdd.partitions.size+otherRdd.partition.size | NONE |
rdd.intersection(otherRdd) | max(rdd.partitions.size, otherRdd.partition.size) | NONE |
rdd.subtract(otherRdd) | rdd.partition.size | NONE |
rdd.cartesian(otherRdd) | rdd.partitions.size * otherRdd.partitions.size | NONE |
key-valueRDD 操作
API 调用 | partition.size | partitioner |
---|---|---|
reduceByKey | 与父 RDD 相同 | HashPartitioner |
foldByKey | 与父 RDD 相同 | HashPartitioner |
combineByKey | 与父 RDD 相同 | HashPartitioner |
groupByKey | 与父 RDD 相同 | HashPartitioner |
sortByKey | 与父 RDD 相同 | RangePartitioner |
mapValues | 与父 RDD 相同 | 父RDD的partitioner |
flatMapValues | 与父 RDD 相同 | 父RDD的partitioner |
cogroup | 取决于所涉及的两个 RDD 的某些输入属性 | HashPartitioner |
join | 取决于所涉及的两个 RDD 的某些输入属性 | HashPartitioner |
leftOuterJoin | 取决于所涉及的两个 RDD 的某些输入属性 | HashPartitioner |
rightOuterJoin | 取决于所涉及的两个 RDD 的某些输入属性 | HashPartitioner |
分区的设置
分区数量太少、太多都有一定的优点和缺点。因此,需要根据集群配置和需求进行明智的分区 core - partition - task。
-
分区太少的缺点
减少并发性 — 缺少并行性的优点,可能存在空闲的 worker 节点
数据倾斜和不恰当的资源利用 — 数据可能在一个分区上倾斜。一个 worker 可能比其他 worker 做的更多,因此
可能出现资源问题。 -
分区太多的缺点
任务调度可能比实际执行时间花费更多的时间。
因此,在分区的数量之间存在权衡。
-
可用 core 数量的 2 ~ 3 倍。Apache Spark 只为 RDD 的每个分区运行一个并发任务,最多可以同时集群中的核心数量个 task,分区数量至少与可用 core 数量相等。可以通过调用 sc.defaultParallelism 获得可用 core 值。单个分区的数据量大小最终取决于执行程序的可用内存。
-
WebUI 上查看任务执行,至少需要 100+ms 时间。如果所用时间少于 100ms,那么应用程序可能会花跟多的时间来调度任务。。此时就是要减少 partition 的数量。
分区器
要使用分区器,首先要创建 PairRDD 类型的 RDD.
spark 有两种类型的分区器。一个是 HashPartitioner,另一个是 RangePartitioner。
HashPartitioner
HashPartitioner 基于 java 的 Object.hashCode() 方法进行分区。
val pairRDD=rdd.map(num =>(num,num))
pairRDD.saveAsTextFile("out/hashPartition4")
// 通过hash创建4个分区来保存输出。
pairRDD.partitionBy(new HashPartitioner(4)).saveAsTextFile("out/hashPartition42")
RangePartitioner
如果有可排序的记录,那么 RangePartitioner 将几乎在相同的范围内划分记录。RangePartitioner 是通过采用传入 RDD 的数据内容来确定的:
首先,RangePartitioner 将根据 Key 对记录进行排序。
然后,根据给定的值将记录划分为若干个分区。
在这里插入代码片
CustomPartitioner
可以通过扩展 Spark 中的默认分区器来定制需要的分区数量和应该存储在这些分区中的内容。
在这里插入代码片