SparkShuffle之Sort Based Shuffle溢写过程分析

1 溢写时机分析


    protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
        var shouldSpill = false
        if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
          // Claim up to double our current memory from the shuffle memory pool
          val amountToRequest = 2 * currentMemory - myMemoryThreshold//需要申请的内存值
       //向 taskMemoryManager申请内存
      val granted =
            taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
          myMemoryThreshold += granted
          // If we were granted too little memory to grow further (either tryToAcquire returned 0,
          // or we already had more memory than myMemoryThreshold), spill the current collection
          shouldSpill = currentMemory >= myMemoryThreshold
        shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
        // Actually spill
        if (shouldSpill) {
          _spillCount += 1
          _elementsRead = 0
          _memoryBytesSpilled += currentMemory


  1. 当前集合包含的 records 数超过 spark.shuffle.spill.numElementsForceSpillThreshold 指定值,该值默认大小为 Long.MaxValue
  2. 这种情况复杂些,其判断流程如下:

主要是在进行 spill 之前会尝试申请更多的内存来存放 records 来避免 spill。

若需要进行 spill,则首先会进行 spill 操作,然后释放已 spill 的集合对应的内存,释放主要是释放 execution pool 内存以供其他 tasks 使用,并将 myMemoryThreshold 赋值为初始值 spark.shuffle.spill.initialMemoryThreshold 对应值,即初始值


buffer.estimateSize方法计算内存(Spark shuffle时内存预测算法


    private val samples = new mutable.Queue[Sample]//一个队列,用于存储对数据的采样样本
    private val SAMPLE_GROWTH_RATE = 1.1//采样间隔次数增长率
    private var bytesPerUpdate: Double = _//根据samples中最后两个样本计算出的记录内存平均增长率
    private var numUpdates: Long = _//更新操作(包括插入和更新)的总次数
    private var nextSampleNum: Long = _//下一次采样操作的次数
    case class Sample(size: Long, numUpdates: Long)



·bytesPerUpdate:内存平均增长率   计算公式如下:






private def takeSample(): Unit = {
  // 估算集合的大小并作为样本
  samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
  if (samples.size > 2) { // 保留样本队列的最后两个样本
  val bytesDelta = samples.toList.reverse match {
    case latest :: previous :: tail =>
      (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
    case _ => 0
  bytesPerUpdate = math.max(0, bytesDelta) // 计算每次更新的字节数
  // 计算下次采样的采样号
  nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
     * Estimate the current size of the collection in bytes. O(1) time.
    def estimateSize(): Long = {
      val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
      (samples.last.size + extrapolatedDelta).toLong

这个estimateSize 就是上次的size+增长率*增长量。增长率和size就是上次抽样得到的。

2 spill操作

spill 操作

spill 的操作要考虑到之后要对之后生成的 spill 文件做 merge,因为最终一个 Shuffle Map Task 只生成一个输出文件和 Index 文件。

如果是需要做 map 端 combine,spill 时会对 map 中的数据先按 partition id 进行排序,若也提供了 key comparator,则会对属于同一个 partition 的 records 按 key 进行排序。做完排序后,先进行序列化再写入磁盘文件。

如果是不需要做 map 端的 combine,则只需对 buffer 按 partition id 进行排序即可,不需要对同一partition 的 records 按 key 进行排序。排序后,同样先序列化,再写入磁盘文件。

之后做 merge 时,使用 SpillReader 来读取 spill 数据又要先反序列化,再做最终排序,再写入最终文件,这一过程是 shuffle 过程中消耗比较大的一部分。

3 合并数据

合并的核心流程如下,由 ExternalSorter#writePartitionedFile(...) 方法实现

其中,最关键的 merge 流程如下:

  1. 为每个 spill 出来的文件生成一个 reader: SpillReader,得到 readers: Seq[SpillReader](reader 读取 spilled 文件要先反序列化)
  2. 将内存集合进行 buffered,得到 inMemBuffered
  3. 针对每个 partition p,执行以下操作:
  • 取出 inMemBuffered 中属于 partition p 的 records,得到迭代器 inMemIterator
  • 使用 readers 取出属于 partition p 的 records 对应的迭代器 Seq 与上一步中得到的 inMemIterator 合并得到最终的迭代器序列
  • 如果定义了 map 端聚合,则先对上一步得到的迭代器序列中的 records 进行聚合,若还定义了 key comparator,则使用该 comparator 对 records 进行排序
  • 若没有定义 map 端聚合但定义了 key comparator,则不做聚合而直接对 records 进行排序
  • 若既没有定义 map 端聚合也没有定义 key comparator,则直接返回总体 records 对应的迭代器

