spark2.x-ShuffleManager

概述

大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。但是也必须提醒大家的是,影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。因此大家务必把握住调优的基本原则,千万不要舍本逐末。下面我们就给大家详细讲解shuffle的原理,以及相关参数的说明,同时给出各个参数的调优建议。

shuffle的定义
Spark的运行主要分为2部分:

  一部分是驱动程序,其核心是SparkContext;

  另一部分是Worker节点上Task,它是运行实际任务的。程序运行的时候,Driver和Executor进程相互交互:运行什么任务,即Driver会分配Task到Executor,Driver 跟 Executor 进行网络传输; 任务数据从哪儿获取,即Task要从 Driver 抓取其他上游的 Task 的数据结果,所以有这个过程中就不断的产生网络结果。其中,下一个 Stage 向上一个 Stage 要数据这个过程,我们就称之为 Shuffle。

shuffle write阶段
主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

shuffle read阶段
shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

ShuffleManager发展概述

用于shuffle的可插拔接口。ShuffleManager是在SparkContext上创建的驱动程序。
*在每个executor上,基于spark.shuffle.manager设置。Driver使用它注册shuffle,
executors(或者本地运行在driver上的tasks)可以请求读写数据。

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

HashShuffleManager(spark 2.x弃用)

未经优化的HashShuffleManager
HashShuffleManager01

优化后的HashShuffleManager
HashShuffleManager02

优化,是指我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

SortShuffleManager

SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

这里写图片描述
SortShuffleManager

在基于排序的shuffle中,根据目标分区ID对传入的记录进行排序,然后写入一个单一的map output file 。reducers 拉取该文件的相邻区域,以便读取他们的map output 部分。在map output数据太大而无法加载内存的情况下,已排序的输出子集可以溢出到磁盘,而磁盘文件上的子集被合并,生成最终输出文件。

基于sort的洗牌有两种不同的生成路径来产生它的map output 文件:
1、Serialized sorting
当以下三个条件都成立时使用:
shuffle依赖项不指定聚集或输出排序;
支持序列化值的重新定位(当前支持KryoSerializer 和 Spark SQL’s custom serializers);
shuffle产生少于16777216个输出分区
2、Deserialized sorting
用于处理所有其他情况。

Serialized sorting mode序列化排序模式
在序列化排序模式中,传入的记录只要传递到shuffle写入器,并在排序过程中以串行形式缓冲,做了几处优化:

1、排序操作的是二进制序列化数据而不是java对象,从而降低了内存消耗和GC开销。
这种优化要求记录序列化器具有一定的属性允许序列化记录被重新排序,而不需要反序列化。

2、它使用一种专门的缓存高效排序器(ShuffleExternalSorter)排序,压缩记录指针和分区ID的数组。在排序数组中,每条记录只使用8字节的空间,这可以将更多的数组放入缓存中。

3、 溢出合并过程对属于同一个分区的序列化记录块进行操作。在合并过程中不需要反序列化记录。

4、当溢出压缩编解码器支持压缩数据的级联时,溢出合并简单地串联序列化和压缩溢出分区以生成最终输出分区。
这允许使用高效的数据复制方法,如NIO的“transferTo”。为了避免在合并过程中分配解压或复制缓冲区。

ByPassSortShuffleManager

该类实现基于排序的 shuffle hash-style shuffle fallback 路径。写入路径将传入记录写入单独的文件,每个reduce分区一个文件,然后将这些文件串联起来。每个分区文件形成单个输出文件,其中的区域被提供给reducers.记录不缓存在内存中,基本和HashShuffleWriter类似,
除了除了它以格式写入输出,可以通过IndexShuffleBlockResolver服务或消费,这个写入路径对于大量的reduce分区是无效的,因为
同时为所有分区打开单独的序列化程序和文件流。

启动bypass机制的条件:

no Ordering is specified,
no Aggregator is specific
the number of partitions is less than spark.shuffle.sort.bypassMergeThreshold

参考资料:
https://www.cnblogs.com/qingyunzong/p/8954552.html
spark-2.3.0 源码

猜你喜欢

转载自blog.csdn.net/qq_16038125/article/details/80314696