内存分配
GitHub地址:https://github.com/RobertoHuang
线程私有分配
在介绍PoolArena内存分配结构分析的时候提到内存分配会先从线程缓存里分配,这个线程缓存其实就是PoolThreadCache
PoolThreadCache
成员变量
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
PoolThreadCache中共有6个MemoryRegionCache类型的数组(Heap和Direct分别为3个)而每个又分为Tiny/Small/Normal,我们大概都可以猜出是为了存放不同大小的内存,PoolThreadCache中还持有Arena,Arena在这里并不是用来分配的,主要是用到了其内部的一些属性(最重要的一个就是每次PoolThreadCache和一个Arena关联起来的时候,Arena的线程持有数会+1)
构造方法
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena, int tinyCacheSize, int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
if (maxCachedBufferCapacity < 0) {
throw new IllegalArgumentException("maxCachedBufferCapacity: " + maxCachedBufferCapacity + " (expected: >= 0)");
}
// 初始化参数
this.heapArena = heapArena;
this.directArena = directArena;
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
if (directArena != null) {
// 初始化缓存数组
tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
// 每次将Arena分配给PoolThreadCache,numThreadCaches都会加一
directArena.numThreadCaches.getAndIncrement();
} else {
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// 初始化缓存数组
tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
// 每次将Arena分配给PoolThreadCache,numThreadCaches都会加一
heapArena.numThreadCaches.getAndIncrement();
} else {
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
if (tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null) {
// Only check freeSweepAllocationThreshold when there are caches in use.
if (freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)");
}
freeTask = new Runnable() {
@Override
public void run() {
free0();
}
};
deathWatchThread = Thread.currentThread();
// 观察线程每隔一个周期检测当前线程是否存活
ThreadDeathWatcher.watch(deathWatchThread, freeTask);
} else {
freeTask = null;
deathWatchThread = null;
}
}
在PoolThreadCache的构造方法中最重要的逻辑为初始化缓存数组,关于这几个数组的Size是在PooledByteBufAllocator初始化的时候被赋值的,关于这三个数组初始化大小如下
tiny块个数:32 计算方式:512 >>> 4(tiny块size为512)
small块个数:4 计算方式:small块的个数有pageSize来决定,它的计算公式是:pageShifts - 9(small块size为256)
normal块个数:3 计算方式:Math.max(1, log2(max / area.pageSize) + 1)。其中maxCachedBufferCapacity(设置io.netty.allocator.maxCachedBufferCapacity系统变量传入,默认是32 * 1024),pageSize默认为8192(normal块size为64)
分配内存
在分配时通过请求的内存大小计算内存块的索引 对于tiny计算方式如下
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
// 计算出tinyInx
int idx = PoolArena.tinyIdx(normCapacity);
// 根据tinyInx从数组中获取对应的MemoryRegionCache
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
static int tinyIdx(int normCapacity) {
// 因为tiny块的大小只能是16的倍数
return normCapacity >>> 4;
}
在分配时通过请求的内存大小计算内存块的索引 对于small计算方式如下
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
// 计算出smallIdx
int idx = PoolArena.smallIdx(normCapacity);
// 根据smallIdx从数组中获取对应的MemoryRegionCache
if (area.isDirect()) {
return cache(smallSubPageDirectCaches, idx);
}
return cache(smallSubPageHeapCaches, idx);
}
static int smallIdx(int normCapacity) {
int tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}
因为small块的大小是512的2次幂倍数,代码函数中计算512计算出0,1024计算出1,2048计算出2,4096计算出3等等
在分配时通过请求的内存大小计算内存块的索引 对于normal计算方式如下
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
// 计算出nomalIdx
int idx = log2(normCapacity >> numShiftsNormalDirect);
// 根据nomalIdx从数组中获取对应的MemoryRegionCache
return cache(normalDirectCaches, idx);
}
// 计算出nomalIdx
int idx = log2(normCapacity >> numShiftsNormalHeap);
// 根据nomalIdx从数组中获取对应的MemoryRegionCache
return cache(normalHeapCaches, idx);
}
numShiftsNormalHeap是pageSize的对数,pageSize为8192时numShiftsNormalHeap的值是13,normal块的size都是大于pageSize的,所以它的值也是512的2次幂倍数,当normCapacity小于等于8192时计算出的idx是0,8192*2时为1等
通过需要分配内存大小的不同获取到对应的MemoryRegionCache,最后使用MemoryRegionCache进行内存分配
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
// cache是通过大小从不同数组不同下标中获取到的元素
if (cache == null) {
return false;
}
// 调用MemoryRegionCache的分配方法
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
添加Chunk到线程缓存
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
// 获取对应的MemoryRegionCache 并使用add方法加入到队列中
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, handle);
}
在PoolThreadCache中引入了一个新的数据类型MemoryRegionCache,接下来我们对MemoryRegionCache进行剖析
MemoryRegionCache
成员变量
// 队列长度
private final int size;
// 实体队列
private final Queue<Entry<T>> queue;
// 维护类型(Tiny/Small/Normal)
private final SizeClass sizeClass;
// 分配次数
private int allocations;
构造方法
MemoryRegionCache(int size, SizeClass sizeClass) {
// 初始化对应数据
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
在构造方法中主要是对成员变量进行初始化,这里使用了一个MPSC(Multiple Producer Single Consumer)队列即多个生产者单一消费者队列,之所以使用这种类型的队列是因为:ByteBuf的分配和释放可能在不同的线程中,这里的多生产者即多个不同的释放线程,这样才能保证多个释放线程同时释放ByteBuf时所占空间正确添加到队列中
添加元素
public final boolean add(PoolChunk<T> chunk, long handle) {
// 通过chunk和handle构建一个Entry元素
Entry<T> entry = newEntry(chunk, handle);
// 将entry放入队列中
boolean queued = queue.offer(entry);
if (!queued) {
// 队列已满不缓存
// 立即回收entry对象进行下一次分配
entry.recycle();
}
return queued;
}
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.handle = handle;
return entry;
}
分配内存
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
// 从队列获取元素
Entry<T> entry = queue.poll();
// 为空返回分配失败
if (entry == null) {
return false;
}
// 初始化PooledByteBuf
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
// 断开chunk引用 handle设置为-1
entry.recycle();
++ allocations;
return true;
}
释放队列
public final int free() {
return free(Integer.MAX_VALUE);
}
private int free(int max) {
int numFreed = 0;
// 遍历队列释放所有节点
for (; numFreed < max; numFreed++) {
Entry<T> entry = queue.poll();
if (entry != null) {
freeEntry(entry);
} else {
// 队列中所有节点都被释放
return numFreed;
}
}
return numFreed;
}
private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
// 回收entry对象
entry.recycle();
// 释放实际的内存空间
chunk.arena.freeChunk(chunk, handle, sizeClass);
}
MemoryRegionCache内部实现是一个队列,当PoolThreadCache获取到对应类型的MemoryRegionCache(tiny/small/normal)进行分配的时候会从队列中Poll一个元素(该元素与一个Chunk关联),那么从队列中获取成功之后就可以直接使用其分配。在释放的时候会将该Chunk加入到该队列中,下次就可以从队列中使用