netty 资源泄漏探测器: http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析: http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器: http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念: http://donald-draper.iteye.com/blog/2394408
引言
上一篇文章,我们看了复合buf概念,先来回顾一下:
复合字节缓冲CompositeByteBuf,内部有一个字节buf数组,用于存放字节buf,每个字节buf添加到复合buf集时,将被包装成一个buf组件,如果添加buf是,复合buf集已满,则将buf集中的所有buf,整合到一个组件buf中,并将原始buf集清空,添加整合后的buf到buf集。复合buf的读写索引为字节buf集的起始索引和size;每个组件buf Component内部记录着字节buf在复合buf中的起始位置和结束位置,及buf可读数据长度。
在netty 默认通道配置初始化( http://donald-draper.iteye.com/blog/2393504)
这篇文章,我们看到字节buf是由字节分配器来分配的,默认字节分配器,在ByteBufUtil中定义,我们先回到
通道配置字节buf分配器的定义:
//字节buf分配器
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
public final class ByteBufUtil { static final ByteBufAllocator DEFAULT_ALLOCATOR;//默认字节分配器 static { String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc; THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 64 * 1024); logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE); MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024); logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE); } }
从字节buf工具类,可以看出,如果系统属性io.netty.allocator.type,配置的为unpooled,
则默认的字节buf分配器为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,
对于Android平台,默认为UnpooledByteBufAllocator。
//UnpooledByteBufAllocator
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric(); private final boolean disableLeakDetector; /** * Default instance which uses leak-detection for direct buffers. */ public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred()); ... }
//PooledByteBufAllocator
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { public static final PooledByteBufAllocator DEFAULT = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); ... }
由于上面两个方法继承了AbstractByteBufAllocator,我们先来看一下抽象字节buf分配器,
/** * Skeletal {@link ByteBufAllocator} implementation to extend. */ public abstract class AbstractByteBufAllocator implements ByteBufAllocator { static final int DEFAULT_INITIAL_CAPACITY = 256;//初始化容量 static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;//最大容量 static final int DEFAULT_MAX_COMPONENTS = 16;//复合buf最大组件buf数量 static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page 容量调整阈值 private final boolean directByDefault;//分配buf是否为direct类型 private final ByteBuf emptyBuf; /** * Instance use heap buffers by default */ protected AbstractByteBufAllocator() { this(false); } /** * Create new instance * * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than * a heap buffer */ protected AbstractByteBufAllocator(boolean preferDirect) { directByDefault = preferDirect && PlatformDependent.hasUnsafe(); emptyBuf = new EmptyByteBuf(this); } }
来看创建字节buf:
@Override public ByteBuf buffer() { if (directByDefault) { return directBuffer(); } return heapBuffer(); } @Override public ByteBuf buffer(int initialCapacity) { if (directByDefault) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); } @Override public ByteBuf buffer(int initialCapacity, int maxCapacity) { if (directByDefault) { return directBuffer(initialCapacity, maxCapacity); } return heapBuffer(initialCapacity, maxCapacity); }
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型
先来看分配direct类型buf
@Override public ByteBuf directBuffer() { return directBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY); } @Override public ByteBuf directBuffer(int initialCapacity) { return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); } @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } /** * Create a direct {@link ByteBuf} with the given initialCapacity and maxCapacity. 待子类扩展 */ protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
再来看分配heap类型buf:
@Override public ByteBuf heapBuffer() { return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY); } @Override public ByteBuf heapBuffer(int initialCapacity) { return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); } @Override public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity); } /** * Create a heap {@link ByteBuf} with the given initialCapacity and maxCapacity. 待子类扩展 */ protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
从上面可以看出创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。
来看创建ioBufer
@Override public ByteBuf ioBuffer() { if (PlatformDependent.hasUnsafe()) { return directBuffer(DEFAULT_INITIAL_CAPACITY); } return heapBuffer(DEFAULT_INITIAL_CAPACITY); } @Override public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); } @Override public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity, maxCapacity); } return heapBuffer(initialCapacity, maxCapacity); }
从上面可以看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型
再来看创建复合buf:
@Override public CompositeByteBuf compositeBuffer() { if (directByDefault) { return compositeDirectBuffer(); } return compositeHeapBuffer(); } @Override public CompositeByteBuf compositeBuffer(int maxNumComponents) { if (directByDefault) { return compositeDirectBuffer(maxNumComponents); } return compositeHeapBuffer(maxNumComponents); }
从上面可以看出创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;
来看创建heap类型复合buf
@Override public CompositeByteBuf compositeHeapBuffer() { return compositeHeapBuffer(DEFAULT_MAX_COMPONENTS); } @Override public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { return toLeakAwareBuffer(new CompositeByteBuf(this, false, maxNumComponents)); }
//追踪复合buf内存泄漏 protected static CompositeByteBuf toLeakAwareBuffer(CompositeByteBuf buf) { ResourceLeakTracker<ByteBuf> leak; switch (ResourceLeakDetector.getLevel()) { case SIMPLE: leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) { buf = new SimpleLeakAwareCompositeByteBuf(buf, leak); } break; case ADVANCED: case PARANOID: leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) { buf = new AdvancedLeakAwareCompositeByteBuf(buf, leak); } break; default: break; } return buf; }
//SimpleLeakAwareCompositeByteBuf class SimpleLeakAwareCompositeByteBuf extends WrappedCompositeByteBuf { final ResourceLeakTracker<ByteBuf> leak;//内部资源泄漏探测器 SimpleLeakAwareCompositeByteBuf(CompositeByteBuf wrapped, ResourceLeakTracker<ByteBuf> leak) { super(wrapped); this.leak = ObjectUtil.checkNotNull(leak, "leak"); } ... }
//追踪字节buf内存泄漏 protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) { ResourceLeakTracker<ByteBuf> leak; switch (ResourceLeakDetector.getLevel()) { case SIMPLE: leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) { buf = new SimpleLeakAwareByteBuf(buf, leak); } break; case ADVANCED: case PARANOID: leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) { buf = new AdvancedLeakAwareByteBuf(buf, leak); } break; default: break; } return buf; }
再来看创建direct类型复合buf
@Override public CompositeByteBuf compositeDirectBuffer() { return compositeDirectBuffer(DEFAULT_MAX_COMPONENTS); } @Override public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { return toLeakAwareBuffer(new CompositeByteBuf(this, true, maxNumComponents)); }
从上面可以看出创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。
再来看一下计算新容量
@Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. //如果新容量大于阈值, if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) {//新容量+阈值大于最大容量 newCapacity = maxCapacity;//则容量为最大容量 } else { newCapacity += threshold;//否则增量为阈值 } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. //否则从容量64开始,每次扩展一倍,直至大于最小新容量 int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
再来看字节buf分配器度量提供者ByteBufAllocatorMetricProvider
public interface ByteBufAllocatorMetricProvider { /** * Returns a {@link ByteBufAllocatorMetric} for a {@link ByteBufAllocator}. 返回一个字节buf分配器度量器 */ ByteBufAllocatorMetric metric(); }
public interface ByteBufAllocatorMetric { /** * Returns the number of bytes of heap memory used by a {@link ByteBufAllocator} or {@code -1} if unknown. 返回字节buf分配使用的堆内存 */ long usedHeapMemory(); /** * Returns the number of bytes of direct memory used by a {@link ByteBufAllocator} or {@code -1} if unknown. 返回字节buf分配使用的direct内存 */ long usedDirectMemory(); }
总结:
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。