在spark中,在shuffle过程中内存放的数据将会通过TaskMemoryManager来进行管理,以ShuffleExternalSorter为例子。
final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
final Object recordPage = taskMemoryManager.getPage(recordPointer);
final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
while (dataRemaining > 0) {
final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
Platform.copyMemory(
recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
writer.write(writeBuffer, 0, toTransfer);
recordReadPosition += toTransfer;
dataRemaining -= toTransfer;
}
writer.recordWritten();
Shuffle的数据会被存放在TaskMemoryManager中,当需要写入到硬盘中进行持久化的时候将会在TaskMemoryManager中根据内存的指针获取对应的内存页,从内存页以及对应的偏移量获取具体的数据分段写入到BlockManager中,以便下游rdd在具体执行时得到。
上文提到的内存页,为TaskMemoryManager中管理内存的一个单位,其中,具体的实现类为MemoryBlock。
protected Object obj;
protected long offset;
protected long length;
一个MemoryBlock具体 由下文三个成员组成,当其为堆外内存的时候,只涉及到offset和length,其中offset为内存的起始地址,length则为堆外内存的具体长度。
而在堆内内存的实现中,obj则为存放具体数据的long数组,offset则为long array下,存放具体内存的首个元素偏移量,size则为该MemoryBlock的逻辑大小。
除了堆外内存实现OffHeapMemoryBlock,堆内内存实现OnHeapMemoryBlock之外,还有专门通过byte数组实现的ByteArrayMemoryBlock。
而TaskMemoryManager则是专门管理上述的MemoryBlock的。
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
在TaskMemoryManager中,维护了一个8192大小的MemoryBlock数组pageTable,用来存放具体的MemoryBlock。
外部内存使用者通过allocatePage()方法来申请相应的内存资源,在这个方法中,将会根据内存类型的使用模式,来选择相应的MemoryAllocator的allocate()方法实现来申请对应的MemoryBlock。
UnsafeMemoryAllocator的实现并不复杂,直接通过unsafe申请获得对应大小的内存,并将地址传回,作为OffHeapMemoryBlock的成员构造返回。
而HeapMemoryAllocator的实现较为复杂。
public MemoryBlock allocate(long size) throws OutOfMemoryError {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
final long[] array = arrayReference.get();
if (array != null) {
assert (array.length * 8L >= size);
MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
}
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[numWords];
MemoryBlock memory = OnHeapMemoryBlock.fromArray(array, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
}
return memory;
}
首先,会将申请大小根据8的倍数来进行内存对齐。
之后根据申请的内存大小是否过大来判断是否需要直接根据现有已经释放但还未回收的大MemoryBlock来进行复用来减少大内存块的申请,最后根据内存大小构造long array,来生成返回OnHeapMemoryBlock。
这样,如果需要访问一段内存空间,在堆外内存的情况下,可以直接通过堆外内存地址获取,但是堆内内存比较复杂,需要TaskMemoryManager的pageNum也就是pageTable的具体下标,和内存在下标对应long array的偏移量。
针对堆内内存的情况,TaskMemoryManager提供了编码方法encodePageNumberAndOffset()方法,统一将上述信息编码为8字节,与堆外内存统一,堆外内存也将会统一根据此编码。
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}
pageNum最大8191,为13位,编码高13位存放pageNum,而后51位存放偏移量。当根据此编码寻址时,只需要相应解码即可获取内存的具体位置。
当通过TaskMemoryManager的allocatePage()方法申请内存时,如果内存已经不足,将会通过spill()方法从已存在的内存持有者中尝试释放一部分内存,以便尽量满足所申请的内存大小。
if (got < required) {
// Call spill() on other consumers to release memory
// Sort the consumers according their memory usage. So we avoid spilling the same consumer
// which is just spilled in last few times and re-spilling on it will produce many small
// spill files.
TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
for (MemoryConsumer c: consumers) {
if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
long key = c.getUsed();
List<MemoryConsumer> list =
sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
list.add(c);
}
}
while (!sortedConsumers.isEmpty()) {
// Get the consumer using the least memory more than the remaining required memory.
Map.Entry<Long, List<MemoryConsumer>> currentEntry =
sortedConsumers.ceilingEntry(required - got);
// No consumer has used memory more than the remaining required memory.
// Get the consumer of largest used memory.
if (currentEntry == null) {
currentEntry = sortedConsumers.lastEntry();
}
List<MemoryConsumer> cList = currentEntry.getValue();
MemoryConsumer c = cList.get(cList.size() - 1);
try {
long released = c.spill(required - got, consumer);
if (released > 0) {
logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
} else {
cList.remove(cList.size() - 1);
if (cList.isEmpty()) {
sortedConsumers.remove(currentEntry.getKey());
}
}
} catch (ClosedByInterruptException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + c, e);
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
logger.error("error while calling spill() on " + c, e);
throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
+ e.getMessage());
}
}
}
在这里,会从当前所有内存持有者MemoryConsumer根据其所持有的内存大小排序,获得所有持有内存大于所剩余不足未申请到内存的消费者,并不断尝试通过spill()方法将内存释放,直到遍历完所有内存持有者或者满足内存要求。