SparkShuffle之Sort Based Shuffle溢写过程分析

1 溢写时机分析

(1)maybeSpill方法

    protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
        var shouldSpill = false
        //buffer中每插入32条记录就开始检测(或每32次就开始检测),并且当前估算的buffer内存达到了spill的内存阈值。currentMemory:当前估算的buffer或map的内存。myMemoryThreshold:内存溢出阈值。一开始设定为5M,随着检测次数不断增长,该阈值也在增长,有点类似于水位线一样。
        if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
          // Claim up to double our current memory from the shuffle memory pool
        //从shuffle内存池中获取当前预估内存最多2倍值
          val amountToRequest = 2 * currentMemory - myMemoryThreshold//需要申请的内存值
       //向 taskMemoryManager申请内存
      val granted =
            taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
        //阈值增长:申请的内存值加上上一次内存阈值
          myMemoryThreshold += granted
          // If we were granted too little memory to grow further (either tryToAcquire returned 0,
          // or we already had more memory than myMemoryThreshold), spill the current collection
          shouldSpill = currentMemory >= myMemoryThreshold
        }
        shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
        // Actually spill
        if (shouldSpill) {
          _spillCount += 1
          logSpillage(currentMemory)
          //ExternalSorter#spill方法
          spill(collection)
          _elementsRead = 0
          _memoryBytesSpilled += currentMemory
          releaseMemory()
        }
        shouldSpill
      }

溢写时机分析:

  1. 当前集合包含的 records 数超过 spark.shuffle.spill.numElementsForceSpillThreshold 指定值,该值默认大小为 Long.MaxValue
  2. 这种情况复杂些,其判断流程如下:

主要是在进行 spill 之前会尝试申请更多的内存来存放 records 来避免 spill。

若需要进行 spill,则首先会进行 spill 操作,然后释放已 spill 的集合对应的内存,释放主要是释放 execution pool 内存以供其他 tasks 使用,并将 myMemoryThreshold 赋值为初始值 spark.shuffle.spill.initialMemoryThreshold 对应值,即初始值

(2)溢出写内存采样算法

buffer.estimateSize方法计算内存(Spark shuffle时内存预测算法

SizeTracker中的属性:

    private val samples = new mutable.Queue[Sample]//一个队列,用于存储对数据的采样样本
    private val SAMPLE_GROWTH_RATE = 1.1//采样间隔次数增长率
    private var bytesPerUpdate: Double = _//根据samples中最后两个样本计算出的记录内存平均增长率
    private var numUpdates: Long = _//更新操作(包括插入和更新)的总次数
    private var nextSampleNum: Long = _//下一次采样操作的次数
    ......
    case class Sample(size: Long, numUpdates: Long)

·SAMPLE_GROWTH_RATE:采样间隔次数增长率,固定值1.1。代表下次抽样时候更新的次数应该是这次抽样更新次数的1.1倍,比如上次是更新10000次时候抽样,下次抽样就得是更新11000次时候再抽样,可以避免每次更新都抽样,减少抽样花销。用于计算nextSampleNum的值

·samples:样本队列。最后两个样本将被用于估算。

·bytesPerUpdate:内存平均增长率   计算公式如下:

·numUpdates:更新操作(包括插入和更新)的总次数。当前buffer中的总操作数(一个记录操作的编号,当然你也可以理解为抽样集合中的元素个数

·nextSampleNum:下次采样时,numUpdates的值,即numUpdates的值增长到与nextSampleNum相同时,才会再次采样。可以理解为:代表下次要抽样的时候集合的个数,就是此次抽样时候的个数*1.1

了解了SizeTracker的属性,我们就可以更容易理解SizeTracker提供的方法了。

1.采集样本takeSample

takeSample方法用于采集样本,其实现如代码清单所示。

private def takeSample(): Unit = {
  // 估算集合的大小并作为样本
  samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
  if (samples.size > 2) { // 保留样本队列的最后两个样本
    samples.dequeue()
  }
//计算平均增长率
  val bytesDelta = samples.toList.reverse match {
    case latest :: previous :: tail =>
      (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
    case _ => 0
  }
  bytesPerUpdate = math.max(0, bytesDelta) // 计算每次更新的字节数
  // 计算下次采样的采样号
  nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
}
    /**
     * Estimate the current size of the collection in bytes. O(1) time.
     */
    def estimateSize(): Long = {
      assert(samples.nonEmpty)
      val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
      (samples.last.size + extrapolatedDelta).toLong
    }

这个estimateSize 就是上次的size+增长率*增长量。增长率和size就是上次抽样得到的。

2 spill操作

spill 操作

spill 的操作要考虑到之后要对之后生成的 spill 文件做 merge,因为最终一个 Shuffle Map Task 只生成一个输出文件和 Index 文件。

如果是需要做 map 端 combine,spill 时会对 map 中的数据先按 partition id 进行排序,若也提供了 key comparator,则会对属于同一个 partition 的 records 按 key 进行排序。做完排序后,先进行序列化再写入磁盘文件。

如果是不需要做 map 端的 combine,则只需对 buffer 按 partition id 进行排序即可,不需要对同一partition 的 records 按 key 进行排序。排序后,同样先序列化,再写入磁盘文件。

之后做 merge 时,使用 SpillReader 来读取 spill 数据又要先反序列化,再做最终排序,再写入最终文件,这一过程是 shuffle 过程中消耗比较大的一部分。

3 合并数据

合并的核心流程如下,由 ExternalSorter#writePartitionedFile(...) 方法实现

其中,最关键的 merge 流程如下:

  1. 为每个 spill 出来的文件生成一个 reader: SpillReader,得到 readers: Seq[SpillReader](reader 读取 spilled 文件要先反序列化)
  2. 将内存集合进行 buffered,得到 inMemBuffered
  3. 针对每个 partition p,执行以下操作:
  • 取出 inMemBuffered 中属于 partition p 的 records,得到迭代器 inMemIterator
  • 使用 readers 取出属于 partition p 的 records 对应的迭代器 Seq 与上一步中得到的 inMemIterator 合并得到最终的迭代器序列
  • 如果定义了 map 端聚合,则先对上一步得到的迭代器序列中的 records 进行聚合,若还定义了 key comparator,则使用该 comparator 对 records 进行排序
  • 若没有定义 map 端聚合但定义了 key comparator,则不做聚合而直接对 records 进行排序
  • 若既没有定义 map 端聚合也没有定义 key comparator,则直接返回总体 records 对应的迭代器

猜你喜欢

转载自blog.csdn.net/godlovedaniel/article/details/113979588