应用在进行数据传输时,往往需要使用缓冲区,最常用的缓冲区就是JDK NIO类库提供的java.nio.Buffer。由于JDK原生的Buffer存在一些缺点,Netty提供了自己的ByteBuffer实现(ByteBuf类);
ByteBuf种类繁多,支持的功能特性存在一些差异,如果使用不当,往往会出现功能异常或性能问题。
目录
ByteBuf实现机制
ByteBuf工作原理
ByteBuf引用计数器工作原理分析
ByteBuf工作原理
ByteBuf提供一下几类基本功能:
- 7中Java基础类型,byte数组,ByteBuffer(ByteBuf)等的读写。
- 缓冲区自身的copy和slice等
- 操作位置指针等方法
- 容量自动扩展
- 从ByteBuf到其他数据结构的灵活转换。
不同于JDK ByteBuffer的limit,ByteBuf通过两个位置指针来协助缓冲区的读写操作,读操作使用readerIndex,写操作使用writerIndex。
readerIndex和writerIndex的取值开始都为0,随着数据的写入和读取,writerIndex和readerIndex分别增加。但readerIndex不会超过writerIndex(readerIndex<=writerIndex)。在读取一部分数据以后,0~readerIndex的数据就被视为丢弃的(discard),调用discardReadBytes方法,可以释放这部分空间。readerIndex和writerIndex之间的数据是可读取的,writerIndex和capacity之间的空间是可写的。
两个位置指针将ByteBuf缓冲区划分为三部分:
调用ByteBuf的read操作时,从readerIndex处读取。从readerIndex到writerIndex的空间为可读的字节缓冲区,从writerIndex到capacity的空间为可写的字节缓冲区,从0到readerIndex的空间是已经读取的缓冲区,可以调用discardReadBytes来释放这部分空间,节约空间,防止ByteBuf不断扩充。
写入N个字节之后ByteBuf的位置指针:
读取M(<N)个字节后ByteBuf的位置指针:
调用discardReadBytes()操作后ByteBuf的位置指针:
调用clear()操作后,readerIndex和writerIndex被重置为0:
ByteBuf引用计数器工作原理分析
AbstractReferenceCountedByteBuf实现了对ByteBuf的内存管理,通过引用计数器对ByteBuf的引用情况进行管理,跟踪对象的分配和使用情况,以实现内存的回收、销毁和复用。
继承关系图:
AbstractReferenceCountedByteBuf的refCnt是引用计数器,用于对ByteBuf的申请和释放做引用计数。它的更新通过原子类AtomicIntegerFieldUpdater完成。AtomicIntegerFieldUpdater通过原子类的方式对成员变量进行更新等操作,以实现线程安全,避免加锁,提升性能。
AbstractReferenceCountedByteBuf类:
//AbstractReferenceCountedByteBuf类相关代码
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf{
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class,
"refCnt");
//使用volatile修饰解决多线程并发访问时线程可见性,线程总是获取到最新的refCnt
private volatile int refCnt;
protected AbstractReferenceCountedByteBuf(int maxCapacity){
super(maxCapacity);
refCntUpdater.set(this, 1)
}
}
使用volatile修饰解决多线程并发访问时线程可见性,线程总是获取到最新的refCnt。
每调用一次retain方法,引用计数器就会累加一次,由于可能存在多线程并发调用场景,所以它的累加操作必须是线程安全的,通过自旋对引用计数器进行累加操作,如果执行累加操作后发现引用计数器小于等于0,或者发生了整形范围越界(如0x7fffffff+1),则抛出IllegalReferenceCountException异常,代码如下:
//ByteBuf对象每增加一个引用,调用retain()方法
private ByteBuf retain0(final int increment){
int oldRef = refCntUpdater.getAndAdd(this, increment);
if(oldRef <= 0 || oldRef + increment < oldRef){
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
与retain相对应的,是release方法,通过调用release方法释放引用,如果oldRef与decrement相等,说明对象申请和释放次数相同,refCnt为0,已经没有对象引用该ByteBuf,需要执行释放操作。如果发生重复释放(refCnt < 0)和越界操作,则抛出非法引用异常,代码如下:
//而每减少一个引用,需要调用release()方法
private boolean release0(int decrement){
int oldRef = refCntUpdater.getAndAdd(this, -decrement):
if(oldRef == decrement){
deallocate();
return true;
}else if(oldRef < decrement || oldRef - decrement > oldRef){
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, -increment);
}
return false
}
需要注意的是不同的ByteBuf实现,释放策略不同,如内存池模式,则需要将ByteBuf返回到内存池中复用,PooledByteBuf的deallocate()代码实现如下:
protected final void deallocate(){
if(handle >= 0){
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
//==========释放并复用ByteBuf===========//
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
recycle();
//==========释放并复用ByteBuf===========//
}
}
对于非池模式创建的ByteBuf,则释放ByteBuf相关资源,等待GC回收,以UnpooledDirectByteBuf为例,代码如下:
protected void deallocate(){
ByteBuffer buffer = this.buffer;
if(buffer == null){
return;
}
this.buffer = null;
if(!doNotFree){
freeDirect(buffer);
}
}
ByteBuf的申请和释放会跨Netty的NioEventLoop和业务线程,要注意并发安全和非法引用问题。