netty 资源泄漏探测器: http://donald-draper.iteye.com/blog/2393940
netty 抽象字节buf解析: http://donald-draper.iteye.com/blog/2394078
netty 抽象字节buf引用计数器: http://donald-draper.iteye.com/blog/2394109
netty 复合buf概念: http://donald-draper.iteye.com/blog/2394408
netty 抽象字节buf分配器: http://donald-draper.iteye.com/blog/2394419
引言
上一篇我们看了抽象字节buf分配器,先来回顾一下:
创建字节buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建direct和heap buf实际通过newDirectBuffer和newHeapBuffer方法,待子类扩展。看出ioBuffer方法创建的字节buf,优先为direct类型,当系统平台不支持Unsafe时,才为heap类型;创建复合buf主要根据字节buf分配器的directByDefault属性,来决定分配buf是否为direct类型还是heap类型;创建复合buf时,如果资源泄漏探测功能开启,则追踪复合buf内存泄漏情况。
今天我们来看抽象字节buf分配器一个具体实现非池类字节buf分配器UnpooledByteBufAllocator
/** * Simplistic {@link ByteBufAllocator} implementation that does not pool anything. */ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { //字节buf分配器Metric private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric(); private final boolean disableLeakDetector;//是否关闭资源泄漏探测 /** * Default instance which uses leak-detection for direct buffers. 默认非池类字节buf分配器 */ public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred()); /** * Create a new instance which uses leak-detection for direct buffers. * * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than * a heap buffer */ public UnpooledByteBufAllocator(boolean preferDirect) { this(preferDirect, false); } /** * Create a new instance * * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than * a heap buffer * @param disableLeakDetector {@code true} if the leak-detection should be disabled completely for this * allocator. This can be useful if the user just want to depend on the GC to handle * direct buffers when not explicit released. */ public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector) { super(preferDirect); this.disableLeakDetector = disableLeakDetector; } }
来看一下度量器
private static final class UnpooledByteBufAllocatorMetric implements ByteBufAllocatorMetric { final LongCounter directCounter = PlatformDependent.newLongCounter();//direct内存使用计数器 final LongCounter heapCounter = PlatformDependent.newLongCounter();//Heap内存使用计数器 @Override public long usedHeapMemory() { return heapCounter.value(); } @Override public long usedDirectMemory() { return directCounter.value(); } @Override public String toString() { return StringUtil.simpleClassName(this) + "(usedHeapMemory: " + usedHeapMemory() + "; usedDirectMemory: " + usedDirectMemory() + ')'; } }
//LongCounter Long计数器
/** * Counter for long. */ public interface LongCounter { void add(long delta); void increment(); void decrement(); long value(); }
来看创建堆buf
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
堆buf有两种,我们分别来看
先看InstrumentedUnpooledUnsafeHeapByteBuf
1.
//InstrumentedUnpooledUnsafeHeapByteBuf
private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf { InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override byte[] allocateArray(int initialCapacity) { byte[] bytes = super.allocateArray(initialCapacity); //更新buf分配器Heap内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length); return bytes; } @Override void freeArray(byte[] array) { int length = array.length; super.freeArray(array); //释放buf分配器Heap内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementHeap(length); } }
//UnpooledUnsafeHeapByteBuf
class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf { /** * Creates a new heap buffer with a newly allocated byte array. * * @param initialCapacity the initial capacity of the underlying byte array * @param maxCapacity the max capacity of the underlying byte array */ UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } ... }
2.
再来看另一种堆buf InstrumentedUnpooledHeapByteBuf
//InstrumentedUnpooledHeapByteBuf
private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf { InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override byte[] allocateArray(int initialCapacity) { byte[] bytes = super.allocateArray(initialCapacity); //更新buf分配器Heap内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementHeap(bytes.length); return bytes; } @Override void freeArray(byte[] array) { int length = array.length; super.freeArray(array); //释放buf分配器Heap内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementHeap(length); } }
我们来看一下Unpooled堆字节buf:
/** * Big endian Java heap buffer implementation. */ public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; byte[] array;//存储数据的字节数组 private ByteBuffer tmpNioBuf;//临时nio 字节部分 /** * Creates a new heap buffer with a newly allocated byte array. * * @param initialCapacity the initial capacity of the underlying byte array * @param maxCapacity the max capacity of the underlying byte array */ protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setArray(allocateArray(initialCapacity)); setIndex(0, 0); } /** * Creates a new heap buffer with an existing byte array. * * @param initialArray the initial underlying byte array * @param maxCapacity the max capacity of the underlying byte array */ protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); checkNotNull(initialArray, "initialArray"); if (initialArray.length > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity)); } this.alloc = alloc; setArray(initialArray); setIndex(0, initialArray.length); } //分配initialCapacity容量的字节数组 byte[] allocateArray(int initialCapacity) { return new byte[initialCapacity]; } ... @Override public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.length); System.arraycopy(array, index, dst, dstIndex, length); return this; } .... @Override public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.capacity()); if (src.hasMemoryAddress()) { PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length); } else if (src.hasArray()) { setBytes(index, src.array(), src.arrayOffset() + srcIndex, length); } else { src.getBytes(srcIndex, array, index, length); } return this; } @Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { checkSrcIndex(index, length, srcIndex, src.length); System.arraycopy(src, srcIndex, array, index, length); return this; } ... @Override public ByteBuffer nioBuffer(int index, int length) { ensureAccessible(); return ByteBuffer.wrap(array, index, length).slice(); } @Override public ByteBuffer[] nioBuffers(int index, int length) { return new ByteBuffer[] { nioBuffer(index, length) }; } @Override public ByteBuffer internalNioBuffer(int index, int length) { checkIndex(index, length); return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length); } ... private ByteBuffer internalNioBuffer() { ByteBuffer tmpNioBuf = this.tmpNioBuf; if (tmpNioBuf == null) { this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array); } return tmpNioBuf; } ... @Override protected void deallocate() { freeArray(array); array = null; } //待子类扩展 void freeArray(byte[] array) { // NOOP } }
从上面可以看出,非池类堆字节buf,实际为一个字节数组。
//堆内存使用更新与释放,委托给内部度量器
void incrementHeap(int amount) { metric.heapCounter.add(amount); } void decrementHeap(int amount) { metric.heapCounter.add(-amount); }
再来看分配direct buf:
@Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { final ByteBuf buf; if (PlatformDependent.hasUnsafe()) { buf = PlatformDependent.useDirectBufferNoCleaner() ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
从上面来看direct buf有三种,分别为InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf,
InstrumentedUnpooledUnsafeDirectByteBuf,InstrumentedUnpooledDirectByteBuf
我们分别来看这三种:
1.
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeNoCleanerDirectByteBuf { InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect(int initialCapacity) { ByteBuffer buffer = super.allocateDirect(initialCapacity); //更新buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) { int capacity = oldBuffer.capacity(); ByteBuffer buffer = super.reallocateDirect(oldBuffer, initialCapacity); //更新buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity() - capacity); return buffer; } @Override protected void freeDirect(ByteBuffer buffer) { int capacity = buffer.capacity(); super.freeDirect(buffer); //释放buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
//UnpooledUnsafeNoCleanerDirectByteBuf
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf { UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } ... }
再来看第二种:
2.
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf { InstrumentedUnpooledUnsafeDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect(int initialCapacity) { ByteBuffer buffer = super.allocateDirect(initialCapacity); //更新buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override protected void freeDirect(ByteBuffer buffer) { int capacity = buffer.capacity(); super.freeDirect(buffer); //释放buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
//UnpooledUnsafeDirectByteBuf
/** * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)} * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the * constructor explicitly. */ public class UnpooledUnsafeDirectByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; private ByteBuffer tmpNioBuf; private int capacity;//容量 private boolean doNotFree;//是否需要释放内存 ByteBuffer buffer;//内部nio 字节buf long memoryAddress; /** * Creates a new direct buffer. * * @param initialCapacity the initial capacity of the underlying direct buffer * @param maxCapacity the maximum capacity of the underlying direct buffer */ protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false); } /** * Creates a new direct buffer by wrapping the specified initial buffer. * * @param maxCapacity the maximum capacity of the underlying direct buffer */ protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) { // We never try to free the buffer if it was provided by the end-user as we not know if this is an duplicate or // an slice. This is done to prevent an IllegalArgumentException when using Java9 as Unsafe.invokeCleaner(...) // will check if the given buffer is either an duplicate or slice and in this case throw an // IllegalArgumentException. // // See http://hg.openjdk.java.net/jdk9/hs-demo/jdk/file/0d2ab72ba600/src/jdk.unsupported/share/classes/ // sun/misc/Unsafe.java#l1250 // // We also call slice() explicitly here to preserve behaviour with previous netty releases. this(alloc, initialBuffer.slice(), maxCapacity, false); } UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity, boolean doFree) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialBuffer == null) { throw new NullPointerException("initialBuffer"); } if (!initialBuffer.isDirect()) { throw new IllegalArgumentException("initialBuffer is not a direct buffer."); } if (initialBuffer.isReadOnly()) { throw new IllegalArgumentException("initialBuffer is a read-only buffer."); } int initialCapacity = initialBuffer.remaining(); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; doNotFree = !doFree; setByteBuffer(initialBuffer.order(ByteOrder.BIG_ENDIAN), false); writerIndex(initialCapacity); } /** * Allocate a new direct {@link ByteBuffer} with the given initialCapacity. 分配initialCapacity容量的direct buf,实际委托给内部的nio 字节buf */ protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } ... }
从上面我们可以看出,非池类Direct buf,实际为一个nio 字节buf。
再来看第三种:
3.
private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf { InstrumentedUnpooledDirectByteBuf( UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); } @Override protected ByteBuffer allocateDirect(int initialCapacity) { ByteBuffer buffer = super.allocateDirect(initialCapacity); //更新buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity()); return buffer; } @Override protected void freeDirect(ByteBuffer buffer) { int capacity = buffer.capacity(); super.freeDirect(buffer); //释放buf分配器direct内存使用量 ((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity); } }
//UnpooledDirectByteBuf
/** * A NIO {@link ByteBuffer} based buffer. It is recommended to use {@link Unpooled#directBuffer(int)} * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the * constructor explicitly. */ public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf { private final ByteBufAllocator alloc; private ByteBuffer buffer;//内部字节buf private ByteBuffer tmpNioBuf; private int capacity;//容量 private boolean doNotFree;//是否需要释放内存 /** * Creates a new direct buffer. * * @param initialCapacity the initial capacity of the underlying direct buffer * @param maxCapacity the maximum capacity of the underlying direct buffer */ protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); } /** * Creates a new direct buffer by wrapping the specified initial buffer. * * @param maxCapacity the maximum capacity of the underlying direct buffer */ protected UnpooledDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialBuffer == null) { throw new NullPointerException("initialBuffer"); } if (!initialBuffer.isDirect()) { throw new IllegalArgumentException("initialBuffer is not a direct buffer."); } if (initialBuffer.isReadOnly()) { throw new IllegalArgumentException("initialBuffer is a read-only buffer."); } int initialCapacity = initialBuffer.remaining(); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; doNotFree = true; setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN)); writerIndex(initialCapacity); } /** * Allocate a new direct {@link ByteBuffer} with the given initialCapacity. 分配initialCapacity容量的direct buf,实际委托给内部的nio 字节buf */ protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } ... }
再来看更新direct 内存使用量,更新操作,直接委托给内部度量器:
void incrementDirect(int amount) { metric.directCounter.add(amount); } void decrementDirect(int amount) { metric.directCounter.add(-amount); }
再来看复合buf
@Override public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { CompositeByteBuf buf = new CompositeByteBuf(this, false, maxNumComponents); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); } @Override public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { CompositeByteBuf buf = new CompositeByteBuf(this, true, maxNumComponents); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
总结:
非池类堆字节buf,实际为一个字节数组,直接在Java虚拟机堆内存中,分配字节缓存;非池类Direct buf,实际为一个nio 字节buf,从操作系统实际物理内存中,分配字节缓存。Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
附:
我们再来看一下Unpooled:
/** * Creates a new {@link ByteBuf} by allocating new space or by wrapping * or copying existing byte arrays, byte buffers and a string. *Unpooled用于创建一个新的字节buf,包装和拷贝一个已经存在的字节数组,字节buf或String * <h3>Use static import</h3> * This classes is intended to be used with Java 5 static import statement: * * <pre> * import static io.netty.buffer.{@link Unpooled}.*; * * {@link ByteBuf} heapBuffer = buffer(128); * {@link ByteBuf} directBuffer = directBuffer(256); * {@link ByteBuf} wrappedBuffer = wrappedBuffer(new byte[128], new byte[256]); * {@link ByteBuf} copiedBuffer = copiedBuffer({@link ByteBuffer}.allocate(128)); * </pre> * * <h3>Allocating a new buffer</h3> * * Three buffer types are provided out of the box. * * [list] * [*]{@link #buffer(int)} allocates a new fixed-capacity heap buffer. * [*]{@link #directBuffer(int)} allocates a new fixed-capacity direct buffer. * [/list] * * <h3>Creating a wrapped buffer</h3> *包装buf,为底层buf的视图,所有底层buf的改变,包装buf都可以看到 * Wrapped buffer is a buffer which is a view of one or more existing * byte arrays and byte buffers. Any changes in the content of the original * array or buffer will be visible in the wrapped buffer. Various wrapper * methods are provided and their name is all {@code wrappedBuffer()}. * You might want to take a look at the methods that accept varargs closely if * you want to create a buffer which is composed of more than one array to * reduce the number of memory copy. * * <h3>Creating a copied buffer</h3> *拷贝buf,并不会共享数据,底层buf数据的改变,拷贝buf看不到 * Copied buffer is a deep copy of one or more existing byte arrays, byte * buffers or a string. Unlike a wrapped buffer, there's no shared data * between the original data and the copied buffer. Various copy methods are * provided and their name is all {@code copiedBuffer()}. It is also convenient * to use this operation to merge multiple buffers into one buffer. */ public final class Unpooled { //字节buf分配器,默认为UnpooledByteBufAllocator.DEFAULT private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT; /** * Big endian byte order. */ public static final ByteOrder BIG_ENDIAN = ByteOrder.BIG_ENDIAN; /** * Little endian byte order. */ public static final ByteOrder LITTLE_ENDIAN = ByteOrder.LITTLE_ENDIAN; /** * A buffer whose capacity is {@code 0}. */ public static final ByteBuf EMPTY_BUFFER = ALLOC.buffer(0, 0); static { assert EMPTY_BUFFER instanceof EmptyByteBuf: "EMPTY_BUFFER must be an EmptyByteBuf."; } }
再来看分配buf:
/** * Creates a new big-endian Java heap buffer with reasonably small initial capacity, which * expands its capacity boundlessly on demand. */ public static ByteBuf buffer() { return ALLOC.heapBuffer(); } /** * Creates a new big-endian direct buffer with reasonably small initial capacity, which * expands its capacity boundlessly on demand. */ public static ByteBuf directBuffer() { return ALLOC.directBuffer(); } /** * Creates a new big-endian Java heap buffer with the specified {@code capacity}, which * expands its capacity boundlessly on demand. The new buffer's {@code readerIndex} and * {@code writerIndex} are {@code 0}. */ public static ByteBuf buffer(int initialCapacity) { return ALLOC.heapBuffer(initialCapacity); } /** * Creates a new big-endian direct buffer with the specified {@code capacity}, which * expands its capacity boundlessly on demand. The new buffer's {@code readerIndex} and * {@code writerIndex} are {@code 0}. */ public static ByteBuf directBuffer(int initialCapacity) { return ALLOC.directBuffer(initialCapacity); } /** * Returns a new big-endian composite buffer with no components. */ public static CompositeByteBuf compositeBuffer() { return compositeBuffer(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS); } /** * Returns a new big-endian composite buffer with no components. */ public static CompositeByteBuf compositeBuffer(int maxNumComponents) { return new CompositeByteBuf(ALLOC, false, maxNumComponents); }
从上面来看Unpooled创建字节buf,实际委托给内部字节分配器UnpooledByteBufAllocator。
包装buf相关的方法:
/** * Creates a new big-endian buffer which wraps the specified {@code array}. * A modification on the specified array's content will be visible to the * returned buffer. */ public static ByteBuf wrappedBuffer(byte[] array) { if (array.length == 0) { return EMPTY_BUFFER; } return new UnpooledHeapByteBuf(ALLOC, array, array.length); } /** * Creates a new buffer which wraps the specified buffer's readable bytes. * A modification on the specified buffer's content will be visible to the * returned buffer. * @param buffer The buffer to wrap. Reference count ownership of this variable is transfered to this method. * @return The readable portion of the {@code buffer}, or an empty buffer if there is no readable portion. * The caller is responsible for releasing this buffer. */ public static ByteBuf wrappedBuffer(ByteBuf buffer) { if (buffer.isReadable()) { return buffer.slice(); } else { buffer.release(); return EMPTY_BUFFER; } } /** * Creates a new big-endian composite buffer which wraps the readable bytes of the * specified buffers without copying them. A modification on the content * of the specified buffers will be visible to the returned buffer. * @param maxNumComponents Advisement as to how many independent buffers are allowed to exist before * consolidation occurs. * @param buffers The buffers to wrap. Reference count ownership of all variables is transfered to this method. * @return The readable portion of the {@code buffers}. The caller is responsible for releasing this buffer. */ public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) { switch (buffers.length) { case 0: break; case 1: ByteBuf buffer = buffers[0]; if (buffer.isReadable()) { return wrappedBuffer(buffer.order(BIG_ENDIAN)); } else { buffer.release(); } break; default: for (int i = 0; i < buffers.length; i++) { ByteBuf buf = buffers[i]; if (buf.isReadable()) { return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i, buffers.length); } buf.release(); } break; } return EMPTY_BUFFER; }
...
拷贝buf相关的方法:
** * Creates a new big-endian buffer whose content is a copy of the * specified {@code array}. The new buffer's {@code readerIndex} and * {@code writerIndex} are {@code 0} and {@code array.length} respectively. */ public static ByteBuf copiedBuffer(byte[] array) { if (array.length == 0) { return EMPTY_BUFFER; } return wrappedBuffer(array.clone()); } /** * Creates a new buffer whose content is a copy of the specified * {@code buffer}'s readable bytes. The new buffer's {@code readerIndex} * and {@code writerIndex} are {@code 0} and {@code buffer.readableBytes} * respectively. */ public static ByteBuf copiedBuffer(ByteBuf buffer) { int readable = buffer.readableBytes(); if (readable > 0) { ByteBuf copy = buffer(readable); copy.writeBytes(buffer, buffer.readerIndex(), readable); return copy; } else { return EMPTY_BUFFER; } }
/** * Return a unreleasable view on the given {@link ByteBuf} which will just ignore release and retain calls. 返回一个不可释放的buf */ public static ByteBuf unreleasableBuffer(ByteBuf buf) { return new UnreleasableByteBuf(buf); }
//UnreleasableByteBuf
final class UnreleasableByteBuf extends WrappedByteBuf { private SwappedByteBuf swappedBuf; ... @Override public ByteBuf retain(int increment) { return this; } @Override public ByteBuf retain() { return this; } @Override public ByteBuf touch() { return this; } @Override public ByteBuf touch(Object hint) { return this; } @Override public boolean release() { return false; } @Override public boolean release(int decrement) { return false; } }
//WrappedByteBuf,理解为字节静态代理
class WrappedByteBuf extends ByteBuf { protected final ByteBuf buf; @Override public final int readerIndex() { return buf.readerIndex(); } @Override public final ByteBuf readerIndex(int readerIndex) { buf.readerIndex(readerIndex); return this; } @Override public final int writerIndex() { return buf.writerIndex(); } @Override public final ByteBuf writerIndex(int writerIndex) { buf.writerIndex(writerIndex); return this; } ... }
/** * Wrap the given {@link ByteBuf}s in an unmodifiable {@link ByteBuf}. Be aware the returned {@link ByteBuf} will * not try to slice the given {@link ByteBuf}s to reduce GC-Pressure. 包装buf为不可修改字节buf */ public static ByteBuf unmodifiableBuffer(ByteBuf... buffers) { return new FixedCompositeByteBuf(ALLOC, buffers); }
//FixedCompositeByteBuf
/** * {@link ByteBuf} implementation which allows to wrap an array of {@link ByteBuf} in a read-only mode. * This is useful to write an array of {@link ByteBuf}s. */ final class FixedCompositeByteBuf extends AbstractReferenceCountedByteBuf { private static final ByteBuf[] EMPTY = { Unpooled.EMPTY_BUFFER }; private final int nioBufferCount; private final int capacity; private final ByteBufAllocator allocator; private final ByteOrder order; private final Object[] buffers; private final boolean direct; ... }
//Unpooled