内存管理一般来说包括内存分配与内存释放两个方面,我们还是以分配ByteBuf为例,非池化的分配方式很简单,就是每次使用的时候new 一个byte数组,不用了就自然释放了。所以我们主要看下池化的ByteBuf的分配,内存分配包括了堆内存和非堆内存,但是内存分配的核心算法都是类似的,所以我们从常用的堆内存来看下netty是怎样进行内存管理的。
1,内存分配
堆内存的分配主要是通过PooledByteBufAllocator来进行ByteBuf分配的,我们先看下它的构造函数:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);//useCacheForAllThreads=true
this.tinyCacheSize = tinyCacheSize;//默认512
this.smallCacheSize = smallCacheSize;//256
this.normalCacheSize = normalCacheSize;//64
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);//8192*2^11
int pageShifts = validateAndCalculatePageShifts(pageSize);//13
if (nHeapArena > 0) {//默认6
heapArenas = newArenaArray(nHeapArena);//PoolArena<byte[]>[] heapArenas
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
.....
}
以及分配buffer的函数:
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
final ByteBuf buf;
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
仔细阅读上面的代码,我们可以先看下PooledByteBufAllocator类:
该类主包含两个成员,一个是threadCache,顾名思义,他是一个ThreadLocal线程级别的缓存,缓存的对象是PoolThreadCache实例,另一个是PoolArena数组。threadCache在初始化一个PoolThreadCache实例时会从heapArenas中选取一个最少使用的PoolArena对象关联起来,相当于是没一个PoolThreadCache实例中包含了一个PoolArena实例,然后PooledByteBufAllocator将分配内存的操作委托给PoolThreadCache,PoolThreadCache又委托给PoolArena进行内存分配,这个几个类如下图:
在进行内存分配时,需要分配的size都会经过normalizeCapacity方法进行处理,最小分配大小为16字节,当size<=16时,分配16字节,当16<size<=512时,找到离16的倍数最近的值进行分配,比如要分配170个字节,实际分配176个字节,因为176=11*16,当size>512时,size成倍增长来分配,比如要分配513字节,实际分配的是512*2=1024字节,要分配1025字节时实际分配1024*2=2048字节。PoolArena有两种分配内存的方式:
a,PoolChunkList用于分配大于等于8192(pageSize)的内存
- qInit:存储内存利用率0-25%的chunk
- q000:存储内存利用率1-50%的chunk
- q025:存储内存利用率25-75%的chunk
- q050:存储内存利用率50-100%的chunk
- q075:存储内存利用率75-100%个chunk
- q100:存储内存利用率100%chunk
b,PoolSubPage用于分配小于8192的内存
PoolArena中tinySubpagePools数组用于分配小于512的内存,默认长度为32,因为最小分配大小为16,32*16=512,smallSubpagePools默认长度为4,用于分配大于等于512小于8192的内存
下面看下PoolArena是如何进行内存分配的,
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
1,首先默认从PoolThreadCache中进行分配,利用ThreadLocal特性,避免了线程竞争(这里先跳过,后面结合内存释放来具体看这里cache的作用)
2,如果是分配小内存(<8192),则由tinySubpagePools或smallSubpagePools进行分配,如果没有合适subpage,则采用方法allocateNormal分配内存。
3,分配pageSize以上的内存由allocateNormal进行分配
allocateNormal方法如下:
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity);
assert handle > 0;
c.initBuf(buf, handle, reqCapacity);
qInit.add(c);
}
首次进行内存分配时,chunkList没有可用的chunk用于分配,这时需要新建一个chunk进行内存分配,并添加到qInit列表中,这里我们先看下一个PoolChunk的结构是怎样的,如图所示,
一个chunk由2018个page组成,一个page=8192,chunk维护了一个PoolSubpage数组,大小为2048,当需要分配小于一页的内存时,从PoolSubPage中取出一页,然后进行分配。在chunk中,维护了一棵平衡二叉树memoryMap,用于内存分配的管理(其实底层就是一个大小为8192*2048的byte数组,PoolChunk按照一定的策略进行了划分,然后对这个大小为8192*2048的连续内存进行管理),所有子节点管理的内存也属于其父节点,如下图所示:
图中节点的值(如2048)为在数组memoryMap的下标。
1、如果需要分配8192的内存,则只需要在第11层,找到第一个可用节点(比如2048)即可。
2、如果需要分配大小16k的内存,则只需要在第10层,找到第一个可用节点(如1024,它管理了2048,2049两个节点,刚好大小为16k)即可。
3、如果节点1024的子节点2048已经被分配了,则该节点不能被分配,如需要分配大小16k的内存,需要重新找到1025这个节点,因为1024这个节点只剩下8k。
poolChunk内部会保证每次分配内存大小为8K*(2^n),为了分配一个大小为size的节点,需要在上面的平衡二叉树中进行搜索,找到一个匹配的节点,那如何进行搜索呢?我们首先看下memoryMap是怎样构造的:
for (int d = 0; d <= maxOrder; ++ d) { //marOder=11
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
// in each level traverse left to right and set value to the depth of subtree
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
memoryMap中每个元素存储的是下标对应的层数,这样其实起到了标记的作用,比如:
1、如果memoryMap[512] = 9,则表示其本身到下面所有的子节点都可以被分配,最大可分配32k;
2、如果memoryMap[512] = 10, 则表示节点512下有子节点已经分配过,但其字节点不是完全被分配完了,还有剩余;
3、如果memoryMap[512] = 12 (即总层数 + 1), 可分配的深度已经大于总层数, 则表示该节点下的所有子节点都已经被分配。
下面我们看下如何向PoolChunk中申请一段内存:
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity);
} else {
return allocateSubpage(normCapacity);
}
}
当要分配的内存>=8192时,使用 allocateRun分配内存,否则使用allocateSubPage分配,首先看下allocateRun怎么实现的:
private long allocateRun(int normCapacity) {
int d = maxOrder - (log2(normCapacity) - pageShifts);
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);
return id;
}
这里pageShifts=13,maxOrder=11,如果此时要分配8192大小内存,则d=11,表示在上图中第11层进行分配,如果是要分配8192*2大小的内存,则在第10层进行分配。接下来就是进行实际的分配了,
private int allocateNode(int d) {
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
if (val > d) { // unusable
return -1;
}
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1;
val = value(id);
if (val > d) {
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
setValue(id, unusable); // mark as unusable
updateParentsAlloc(id);
return id;
}
a,首先从根节点开始查找,如果根节点的val(memoryMap[id])超过了最大深度,说明没有可用内存分配了,直接返回-1.如果当前节点的val<d,id<<1进行下一层的匹配
b,如果当前节点val>d,表示字节点已经被分配过,且剩余空间不够,需要在这一层的其他节点上进行查找。
c,分配成功后更新节点为不可用,memoryMap[id] = 12,
d,更新父节点
举个例子说明上面的分配,比如分配节点2048,
至此allocateRun方法已经结束,接下来看下allocateSubpage方法,他是用于分配小于一个pagesize的内存分配方法,主要的分配方式是从chunk中先分配出一个page,然后再将这个page分割成多段,然后进行分配,大致示意图如下:
allocateSubpage方法如下:
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
int id = allocateNode(d);
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
poolchunk中为了了一个2048大小的PoolSubpage数组,分别对应平衡二叉树的2048个叶子节点,比如本次在2048这个节点上进行分配,则取出subpages的第一个元素,如果为空,进行初始化,并添加到subpages数组中。初始化函数如下:
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);
}
使用了一个bitmap来记录一个page的分配情况,这个bitmap数组默认size为8,因为我们最小的分配内存是16,所以一个page最多可以被分成512个小段,而一个long可以描述64位位图信心,所以只需要8个long就可以进行内存管理描述了。再看下init:
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
addToPool(head);
}
比如当前申请16大小的内存,maxNumElems=numAvail=512,说明一个page被拆成了512个内存段,bitmaplength=512/64=8,然后将当前PoolSubpage加入到链表中。下面看下分配方法:
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
final int bitmapIdx = getNextAvail();
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r;
if (-- numAvail == 0) {
removeFromPool();
}
return toHandle(bitmapIdx);
}
a,方法getNextAvail找到当前page中可分配内存段的bitmapIdx,初始时这个值为0
b,q=bitmapIdx/64,确认long数组的位置,
c,bitmapIdx&63确定long的bit位置,然后将该位置值为1
至此,内存分配部分结束,下面看下内存释放。
2,内存释放
从前面内存的分配我们可以看到,初始的时候,内存的分配都是从allocateNormal方法中进行分配,此时线程缓存是不存在的,此时分配的内存也没有放入到线程缓存中,只有在内存释放时才会放入到线程缓存中。入口在PoolArena的free方法中:
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass);
}
}
还记得PoolThreadCache构造的时候创建了tinySubPageHeapCaches(分配小于512的内存),smallSubPageHeapCaches(分配大于等于512小于8192的内存),normalHeapCaches(分配pageSize及以上大小的内存)三个MemoryRegionCache<byte[]>类型的数组,每个MemoryRegionCache中包含了以下几个元素:
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
比如我们现在需要释放16字节大小的内存,内存释放时(比如调用byteBuf.release方法),从tinySubPageHeapCaches取出一个MemoryRegionCache实例,然后将要释放内存对应的chunk等信息封装成一个Entry:
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.handle = handle;
return entry;
}
添加到队列中,这样,下次再分配时,可以直接从MemoryRegionCache进行线程级别的内存分配,可以大大提高分配效率。
至此,netty的内存管理到此结束,下面我们来总结下:
1,总体来看netty的内存分配借鉴了slab和buddy分配算法,自己实现了对内存的管理,可以大大避免gc影响性能问题
2,分配时首先从线程缓存中分配,如果线程缓存不能分配,再从全局进行分配,可以大大提高分配效率。