Spark 调优
避免创建重复RDD
- 同一份数据只创建一个RDD
- 避免重复计算
尽量复用同一个RDD
- 数据存在包含关系或者重复的情况下尽量复用RDD
- 避免重复计算
对多次使用的RDD进行持久化
- 由于RDD是惰性计算,执行RDD需要从源头算起,因此可以对多次重复使用的RDD进行Persist或者Cache,将当前RDD保存到内存或者磁盘中
- 避免重复计算
- 持久化级别后缀_2(如MEMORY_ONLY_2)表示将每个持久化数据复制1份duplicate
- 选取持久化策略:具体根据内存估计去选择
尽量避免使用shuffle类算子
- shuffle类算子有:
- 去重:
- distinct
- 聚合
- reduceByKey
- groupBy
- cogroup
- aggregateByKey
- combineByKey
- 排序
- sortByKey
- sortBy
- 重分区
- coalesce
- repartition
- 集合或表操作
- intersection
- substract
- subtractByKey
- join
- leftOuterJoin
- 去重:
- shuffle简单来说会将分布在多个节点上的数据拉去到同一节点进行聚合,会引发大量网络传输,且如果某个节点key值过多而不够内存则会溢写到磁盘文件造成IO,以上两者是shuffle性能较差的主要原因。
使用map-side预聚合
- 类似于MR的本地combiner
- 通过本地节点预先进行根据key值聚合,大大减少shuffle时需要拉去的数据量从而减小磁盘IO和网络传输开销
- 通常建议使用reduceByKey和aggregateByKey来替代groupByKey,因为reduceByKey和aggregateByKey会使用用户自定义函数进行节点的本地预聚合,而groupByKey是不会的
mapPartition替代普通map
- 使用一次函数处理一个partition所有的数据相对于一次函数一条一条处理性能会高一些,但有时候一次性处理整个partition的数据容易造成OOM。
- Warning:预估内存大小
使用foreachPartition替代foreach
- 原理类似上一条
- 在需要对RDD中的数据进行数据库读写时尤其推荐foreachPartition,如果使用foreach,可能会每一条数据都创建一个数据库连接,这样会频繁地创建和小会数据库连接,性能会非常差,而使用foreachPartition则可以使用一个链接处理整个分区的数据,性能相对高很多。
filter后进行coalesce操作
- filter过滤数据后建议使用coalesce减少分区,能够减少后续stage处理的task的数量进而提升性能
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
- 官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
广播大变量
- Map-side Join
- 大表join小表时: 将小表广播(Broadcast)到各个Executor,大表RDD使用Map手动进行join
- 在算子函数中使用到外部变量时
- 默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本,一个Executor就会有多份变量。使用Spark的广播功能后,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
使用使用Kryo优化序列化性能
- 在Spark中,主要有三个地方涉及到了序列化: * 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。 * 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。 * 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
Stage划分
- Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。
总结自:
美团技术团队:Spark优化-基础篇