阅读须知
- Netty版本:4.1.14.Final
- 文章中使用/* */注释的方法会做深入分析
正文
当我们进行数据传输的时候,经常需要使用到缓冲区,对于JDK NIO而言,我们主要使用的是ByteBuffer,从功能角度而言,ByteBuffer完全可以满足NIO编程的需要,为什么Netty还要实现自己的ByteBuf呢?这里引用《Netty权威指南》中对这个问题的解释:
- ByteBuffer长度固定,一旦分配完成,它的容量不能动态扩展和收缩,当需要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常。
- ByteBuffer只有一个标识位置的指针position,读写的时候需要手工调用flip()和rewind()等,使用者必须小心谨慎地处理这些API,否则很容易导致程序处理失败。
- ByteBuffer的API功能有限,一些高级和使用的特性它不支持,需要使用者自己编程实现。
Netty实现自己的ByteBuf来解决以上问题,但因为要操作JDK NIO原生的API,所以Netty也需要实现两者的相互转换。
我们来看源码实现,AbstractByteBuf继承自ByteBuf,ByteBuf一些公共属性和功能会在AbstractByteBuf中实现,我们先来看读操作:
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
// 校验可读字节数,最小读取长度需要大于等于0、
// 检查缓冲区是否被释放、读索引加上读取长度不能大于写索引
checkReadableBytes(length);
// 子类实现,从读索引开始,复制length个字节到目标字节数组中
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length; // 读索引累加读取长度
return this;
}
子类覆盖的getBytes方法后面我们在介绍到具体的子类时详细分析。这里提到了读索引和写索引,也就是readerIndex和writerIndex,它们的出现就是为了解决上文描述的JDK ByteBuffer的第二个缺陷。
readerIndex和wirterIndex的取值一开始都是0,随着数据的写入writerIndex会增加,读取数据会使readerIndex增加,但是它不会超过writerIndex。在读取之后,0~readerIndex的空间就被视为discard(丢弃)的,调用discardReadBytes方法,可以释放这部分空间,它的作用类似ByteBuffer的compact方法。readerIndex和writerIndex之间的数据是可读取的,等价于ByteBuffer position和limit之间的数据。writerIndex和capacity之间的空间是可写的,等价于ByteBuffer limit和capacity之间的可用空间。
由于写操作不修改readerIndex指针,读操作不修改writerIndex指针,因此读写之间不再需要调整指针位置,这极大的简化了缓冲区的读写操作,避免了由于遗漏或不熟悉flip()操作导致的功能异常。
下面我们来看写操作:
AbstractByteBuf:
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
/* 确认是否可写 */
ensureWritable(length);
// 子类实现,从写索引开始,将指定字节数组从srcIndex下标开始复制length长度到缓冲区中
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length; // 写索引累加长度
return this;
}
AbstractByteBuf:
public ByteBuf ensureWritable(int minWritableBytes) {
// 最小写入长度不能小于0
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
/* 确认是否可写 */
ensureWritable0(minWritableBytes);
return this;
}
AbstractByteBuf:
final void ensureWritable0(int minWritableBytes) {
ensureAccessible(); // 检查缓冲区是否被释放
// 如果要写入的长度不大于当前缓冲区容量减去writerIndex的值
// 说明当前缓冲区的剩余空间足以满足本次写入
if (minWritableBytes <= writableBytes()) {
return;
}
// 如果写入的长度大于可以动态扩展的最大可写字节数
// 说明缓冲区无法满足本次写入所需的空间,抛出异常
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
/* 扩容并将当前容量标准化为2的幂 */
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
capacity(newCapacity); // 子类实现,调整到新的容量
}
扩容的作用就是为了解决上文描述的JDK ByteBuffer的第一个缺陷。
AbstractByteBufAllocator:
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
// 所需容量不能小于0
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
}
// 所需容量不能大于缓冲区最大容量
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
final int threshold = CALCULATE_THRESHOLD; // 阈值4M
if (minNewCapacity == threshold) {
return threshold; // 所需容量等于阈值,直接返回
}
// 如果所需容量大于阈值,不翻倍,只是增加阈值
if (minNewCapacity > threshold) {
// 去除余数
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
// 去除余数后的容量大于最大容量减去阈值,则只能使用最大容量
newCapacity = maxCapacity;
} else {
// 否则在去除余数后的容量的基础上加上阈值,标准化为2的幂
newCapacity += threshold;
}
return newCapacity;
}
// 到这里说明没有超过阈值,从64开始双倍增加到4M
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
// 最后取计算后的值和最大容量之间较小的一个返回
return Math.min(newCapacity, maxCapacity);
}
上文我们提到discardReadBytes方法可以释放0和readerIndex之间的字节,它将readerIndex和writerIndex之间的字节移动到0,并分别将readerIndex和writerIndex设置为0和oldWriterIndex - oldReaderIndex。我们来分析这个方法:
AbstractByteBuf:
public ByteBuf discardReadBytes() {
ensureAccessible(); // 检查缓冲区是否被释放
// readerIndex为0不需要释放
if (readerIndex == 0) {
return this;
}
// 读索引不等于写索引的情况
if (readerIndex != writerIndex) {
// 将readerIndex和writerIndex之间的内容移动到从0开始
setBytes(0, this, readerIndex, writerIndex - readerIndex);
// 写索引变为原来的writerIndex - readerIndex
writerIndex -= readerIndex;
/* 调整标记 */
adjustMarkers(readerIndex);
readerIndex = 0; // 释放后将读索引置为0
} else { // 读索引等于写索引的情况,也就是都读完了
/* 调整标记 */
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0; // 将读写索引都置为0
}
return this;
}
AbstractByteBuf:
protected final void adjustMarkers(int decrement) {
int markedReaderIndex = this.markedReaderIndex;
// 如果markedReaderIndex小于等于需要减少的decrement
if (markedReaderIndex <= decrement) {
this.markedReaderIndex = 0; // markedReaderIndex置为0
int markedWriterIndex = this.markedWriterIndex;
// 如果markedWriterIndex小于等于需要减少的decrement
if (markedWriterIndex <= decrement) {
this.markedWriterIndex = 0; // markedWriterIndex置为0
} else {
// 否则将markedWriterIndex置为markedWriterIndex减去decrement的差
this.markedWriterIndex = markedWriterIndex - decrement;
}
} else {
// 将markedReaderIndex置为markedReaderIndex减去decrement的差
this.markedReaderIndex = markedReaderIndex - decrement;
// 将markedWriterIndex置为markedWriterIndex减去decrement的差
markedWriterIndex -= decrement;
}
}
下面我们来看AbstractReferenceCountedByteBuf,它继承了AbstractByteBuf,从命名上可以看出,它的作用主要是对引用进行计数,类似于JVM内存回收的对象引用计数器,用于跟踪对象的分配和销毁,做自动内存回收。
refCnt是AbstractReferenceCountedByteBuf的主要成员变量,用volatile关键字修饰保证多线程之间的可见性,作用是跟踪对象的引用次数,初始值为1,每调用一次retain方法,引用计数就会加1,我们来看方法的实现:
AbstractReferenceCountedByteBuf:
public ByteBuf retain() {
return retain0(1);
}
AbstractReferenceCountedByteBuf:
private ByteBuf retain0(int increment) {
// 自旋CAS操作
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
// 确保我们不复活(这意味着refCnt为0)以及我们遇到溢出
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
// CAS更新引用计数
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
}
return this;
}
下面我们来看释放引用的方法:
AbstractReferenceCountedByteBuf:
public boolean release() {
return release0(1);
}
AbstractReferenceCountedByteBuf:
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
// CAS递减引用计数
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
// CAS成功判断refCnt是否等于1,refCnt初始值为1
// 如果等于1说明需要释放
if (refCnt == decrement) {
// 子类实现释放ByteBuf对象
deallocate();
return true;
}
return false;
}
}
}
我们将在后续的文章中继续介绍一些具体的ByteBuf子类实现。