1 溢写时机分析
(1)maybeSpill方法
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
//buffer中每插入32条记录就开始检测(或每32次就开始检测),并且当前估算的buffer内存达到了spill的内存阈值。currentMemory:当前估算的buffer或map的内存。myMemoryThreshold:内存溢出阈值。一开始设定为5M,随着检测次数不断增长,该阈值也在增长,有点类似于水位线一样。
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
//从shuffle内存池中获取当前预估内存最多2倍值
val amountToRequest = 2 * currentMemory - myMemoryThreshold//需要申请的内存值
//向 taskMemoryManager申请内存
val granted =
taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
//阈值增长:申请的内存值加上上一次内存阈值
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
//ExternalSorter#spill方法
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
溢写时机分析:
- 当前集合包含的 records 数超过
spark.shuffle.spill.numElementsForceSpillThreshold
指定值,该值默认大小为Long.MaxValue
- 这种情况复杂些,其判断流程如下:
主要是在进行 spill 之前会尝试申请更多的内存来存放 records 来避免 spill。
若需要进行 spill,则首先会进行 spill 操作,然后释放已 spill 的集合对应的内存,释放主要是释放 execution pool 内存以供其他 tasks 使用,并将 myMemoryThreshold
赋值为初始值 spark.shuffle.spill.initialMemoryThreshold
对应值,即初始值
(2)溢出写内存采样算法
buffer.estimateSize方法计算内存(Spark shuffle时内存预测算法)
SizeTracker中的属性:
private val samples = new mutable.Queue[Sample]//一个队列,用于存储对数据的采样样本
private val SAMPLE_GROWTH_RATE = 1.1//采样间隔次数增长率
private var bytesPerUpdate: Double = _//根据samples中最后两个样本计算出的记录内存平均增长率
private var numUpdates: Long = _//更新操作(包括插入和更新)的总次数
private var nextSampleNum: Long = _//下一次采样操作的次数
......
case class Sample(size: Long, numUpdates: Long)
·SAMPLE_GROWTH_RATE:采样间隔次数增长率,固定值1.1。代表下次抽样时候更新的次数应该是这次抽样更新次数的1.1倍,比如上次是更新10000次时候抽样,下次抽样就得是更新11000次时候再抽样,可以避免每次更新都抽样,减少抽样花销。用于计算nextSampleNum的值
·samples:样本队列。最后两个样本将被用于估算。
·bytesPerUpdate:内存平均增长率 计算公式如下:
·numUpdates:更新操作(包括插入和更新)的总次数。当前buffer中的总操作数(一个记录操作的编号,当然你也可以理解为抽样集合中的元素个数)
·nextSampleNum:下次采样时,numUpdates的值,即numUpdates的值增长到与nextSampleNum相同时,才会再次采样。可以理解为:代表下次要抽样的时候集合的个数,就是此次抽样时候的个数*1.1
了解了SizeTracker的属性,我们就可以更容易理解SizeTracker提供的方法了。
1.采集样本takeSample
takeSample方法用于采集样本,其实现如代码清单所示。
private def takeSample(): Unit = {
// 估算集合的大小并作为样本
samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
if (samples.size > 2) { // 保留样本队列的最后两个样本
samples.dequeue()
}
//计算平均增长率
val bytesDelta = samples.toList.reverse match {
case latest :: previous :: tail =>
(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
case _ => 0
}
bytesPerUpdate = math.max(0, bytesDelta) // 计算每次更新的字节数
// 计算下次采样的采样号
nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
}
/**
* Estimate the current size of the collection in bytes. O(1) time.
*/
def estimateSize(): Long = {
assert(samples.nonEmpty)
val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
(samples.last.size + extrapolatedDelta).toLong
}
这个estimateSize 就是上次的size+增长率*增长量。增长率和size就是上次抽样得到的。
2 spill操作
spill 操作
spill 的操作要考虑到之后要对之后生成的 spill 文件做 merge,因为最终一个 Shuffle Map Task 只生成一个输出文件和 Index 文件。
如果是需要做 map 端 combine,spill 时会对 map 中的数据先按 partition id 进行排序,若也提供了 key comparator,则会对属于同一个 partition 的 records 按 key 进行排序。做完排序后,先进行序列化再写入磁盘文件。
如果是不需要做 map 端的 combine,则只需对 buffer 按 partition id 进行排序即可,不需要对同一partition 的 records 按 key 进行排序。排序后,同样先序列化,再写入磁盘文件。
之后做 merge 时,使用 SpillReader 来读取 spill 数据又要先反序列化,再做最终排序,再写入最终文件,这一过程是 shuffle 过程中消耗比较大的一部分。
3 合并数据
合并的核心流程如下,由 ExternalSorter#writePartitionedFile(...)
方法实现
其中,最关键的 merge 流程如下:
- 为每个 spill 出来的文件生成一个 reader: SpillReader,得到
readers: Seq[SpillReader]
(reader 读取 spilled 文件要先反序列化) - 将内存集合进行 buffered,得到 inMemBuffered
- 针对每个 partition p,执行以下操作:
- 取出 inMemBuffered 中属于 partition p 的 records,得到迭代器 inMemIterator
- 使用 readers 取出属于 partition p 的 records 对应的迭代器 Seq 与上一步中得到的 inMemIterator 合并得到最终的迭代器序列
- 如果定义了 map 端聚合,则先对上一步得到的迭代器序列中的 records 进行聚合,若还定义了 key comparator,则使用该 comparator 对 records 进行排序
- 若没有定义 map 端聚合但定义了 key comparator,则不做聚合而直接对 records 进行排序
- 若既没有定义 map 端聚合也没有定义 key comparator,则直接返回总体 records 对应的迭代器