Spark调优 数据本地化调优

1. 数据本地化背景

数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码在一起(一个work节点上),那么性能会非常的高。但是如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上,通常来说移动计算比移动数据速度要快,因为代码很小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。

2. 源代码

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApispark

@DeveloperApi
object TaskLocality extends Enumeration {
  // Process local is expected to be used ONLY within TaskSetManager for now.
  val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value

  type TaskLocality = Value

  def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
    // condition:条件 小于等于  constraint:约束
    condition <= constraint
  }
}

/**
  * 什么是NO_PREF?
  *   当Driver应用程序刚刚启动,Driver分配获得的Executor很可能还没有初始化完毕。所以会有一部分任务的本地化级别被设置为NO_PREF,
  * 如果是ShuffleRDD,其本地行始终为NO_PREF,对于这两种本地化级别是NO_PREF的情况,在任务分配时会被优先分配到非本地节点执行,
  * 达到一定的优化效果。
  *
  * PROCESS_LOCAL: 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。
  *
  * NODE_LOCAL: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,恰好有 block 在同一个节点上。速度比
  *             PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取
  *
  * NO_PREF: 数据从哪里访问都一样快,不需要位置优先
  *
  * RACK_LOCAL: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢
  *
  * ANY: 数据在非同一机架的网络上,速度最慢
  *
  */

3. 图解

这里写图片描述

假设1是要计算的数据,2是Driver会调度一个task去计算1的数据,正常情况下会生成3这个task,然后在本executor中直接获取数据执行。但是如果3在一定等待的阈值内一直获取不到资源去运行,此时恰好另外一个executor是有资源可以运行的,那么就会生成4 task,然后4去跨executor获取数据运行
这里有两个注意点:
1. 一个是阈值设置可以根据我们的情况设置大点
2. 4 task是跨executor取数据了,速度会比3 task要慢一点
其他情况和这个相同

4.Spark中的数据本地化流程图

即某个task 计算节点与其输入数据的位置关系,下面将要挖掘Spark 的调度系统如何产生这个结果,这一过程涉及 RDD、DAGScheduler、TaskScheduler,搞懂了这一过程也就基本搞懂了 Spark 的 PreferredLocations(位置优先策略)

这里写图片描述

  1. 第一步:PROCESS_LOCAL–>TaskScheduler首先根据数据所在的节点发送task,
    如果task在Worker1的Executor1中等待了3s(这个3s是spark的默认等待时间,通过spark.locality.wait来设置,可以在SparkConf()中修改),重试了5次,还是无法执行 TaskScheduler会降低数据本地化的级别,从PROCESS_LOCAL降到NODE_LOCAL
  2. 第二步:NODE_LOCAL–>TaskScheduler重新发送task到Worker1中的Executor2中执行, 如果task在Worker1的Executor2中等待了3s,重试了5次,还是无法执行
    TaskScheduler会降低数据本地化的级别,从NODE_LOCAL降到RACK_LOCAL
  3. 第三步:RACK_LOCAL–>TaskScheduler重新发送task到Worker2中的Executor1中执行。
  4. 第四步:当task分配完成之后,task会通过所在Worker的Executor中的 BlockManager来获取数据,如果BlockManager发现自己没有数据,那么它会调用getRemote方法,通过ConnectionManager与原task所在节点的BlockManager中的ConnectionManager先建立连接,然后通过TransferService(网络传输组件)获取数据,通过网络传输回task所在节点(这时候性能大幅下降,大量的网络IO占用资源),计算后的结果返回给Driver。

4.优化

/**
    * 优化:spark.locality.wait
    * 用于获取各个本地化级别的等待时间。
    *
    *     变量名称                       描述                           默认值
    *   spark.locality.wait             本地化级别的默认等待时间(全局)   3000
    *   spark.locality.wait.process     本地进程的等待时间               3000
    *   spark.locality.wait.node        本地节点的等待时间               3000
    *   spark.locality.wait.rack        本地机架的等待时间               3000
    *
    *
    *   提示:在运行的任务时间很长而且数量较多的情况下,适当调高这些参数可以显著提高性能。
    *   然而当这些参数值都已经超过任务运行的时长时,需要调小这些参数。
    *
    */
  private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
    val defaultWait = conf.get("spark.locality.wait", "3s")
    val localityWaitKey = level match {
      case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
      case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
      case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
      case _ => null
    }

    if (localityWaitKey != null) {
      conf.getTimeAsMs(localityWaitKey, defaultWait)
    } else {
      0L
    }
  }

5.优化的时机

观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。
日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL,观察大部分task的数据本地化级别

如果大多都是PROCESS_LOCAL,那就不用调节了
如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长
调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志
看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短

但是注意别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了。

猜你喜欢

转载自blog.csdn.net/qq_21383435/article/details/80467920