文章目录
前言
这篇文章算是上篇文章 Netty解码器源码分析 的扩展篇 ,这里单独开一篇讲解这个特殊的解码器LengthFieldBasedFrameDecoder。
首先,它有什么用呢?有什么应用场景吗?这边我举一个例子,dubbo底层使用了Netty做了服务间通信来完成远程调用,其中应用层协议使用了自定义dubbo协议
偏移量(Bit) | 字段 | 取值 |
---|---|---|
0 ~ 7 | 魔数高位 | 0xda00 |
8 ~ 15 | 魔数低位 | 0xbb |
16 | 数据包类型 | 0 - Response, 1 - Request |
17 | 调用方式 | 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用 |
18 | 事件标识 | 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包 |
19 ~ 23 | 序列化器编号 | 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization |
24 ~ 31 | 状态 | 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE … |
32 ~ 95 | 请求编号 | 共8字节,运行时生成 |
96 ~ 127 | 消息体长度 | 运行时计算 |
图片、表格信息均摘自dubbo官方文档
以上表格代表整个消息头,也就是说,整个消息头的偏移量是固定的,例如96~127的偏移量的这个位置就是代表着消息体的长度,而不固定的是消息体,dubbo协议中消息体代表dubbo远程调用的一些信息例如接口版本、被调用的方法名、方法参数等等,这些信息是不一定的,这部分长度需要运行时才能计算出来,并且放入消息头中。那么,在解码的时候,我们就可以先接收固定长度(偏移量127bit长度)的消息头,将消息中固定偏移量(96~127)的内容先取出来,就可以知道接下来需要再接收多少长度的消息体,这样就算一个完整的数据包了。以上流程即可解码一个动态长度的数据包。
那么接下来就开始介绍,基于长度域的动态解码器。
基于长度域的动态解码器
使用介绍
在开始源码分析之前,这里先介绍一下怎么使用。如果有下载Netty源码,就可以看到LengthFieldBasedFrameDecoder中的类头有一大串注释,这串注释其实就很好的介绍了这个类的使用。那么就以这个类头来分析这个类能做什么。
先介绍一下四个参数分别什么意思,可以大致先了解一下,往下看你就更能理解这些参数了
- 需要拿到的动态长度信息的起始点(偏移量),dubbo协议中即为96
- 长度信息的长度,dubbo协议中即为32
- 长度修正量
- 需要剥离的长度
- lengthFieldOffset :0
- lengthFieldLength :2
- lengthAdjustment :0
- initialBytesToStrip :0
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
首先,从0开始,长度为2,"HELLO, WORLD"字符串为12字节,也就是说前面Length占2字节,解码器截取前两字节,16进制0x000C = 12,加上lengthAdjustment,最终表示数据长度有12字节,剥离长度为0,则最终为14字节的完整数据。
- lengthFieldOffset :0
- lengthFieldLength :2
- lengthAdjustment :0
- initialBytesToStrip :2
* BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* +--------+----------------+ +----------------+
* | Length | Actual Content |----->| Actual Content |
* | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* +--------+----------------+ +----------------+
这里和上面不同的就是,剥离长度为2,则剥离前两字节的内容,很简单,Length字段被剥离了,剩下12字节的数据包。
- lengthFieldOffset :0
- lengthFieldLength :2
- lengthAdjustment :-2
- initialBytesToStrip :0
* BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* +--------+----------------+ +--------+----------------+
* | Length | Actual Content |----->| Length | Actual Content |
* | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
* +--------+----------------+ +--------+----------------+
这里和第一个例子不同的就是修正量为-2,Length字段不再是12,而是14,也就是说计算长度的方法变了,拿到Length字段的值14之后,还需要加上修正量-2,结果还是为12字节的数据。
- lengthFieldOffset :2
- lengthFieldLength :3
- lengthAdjustment :0
- initialBytesToStrip :0
* BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* +----------+----------+----------------+ +----------+----------+----------------+
* | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
* | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
* +----------+----------+----------------+ +----------+----------+----------------+
这里修改了偏移量的位置和Length长度的字节数,很好理解。
- lengthFieldOffset :1
- lengthFieldLength :2
- lengthAdjustment :1
- initialBytesToStrip :3
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
这里首先从1开始,截取2字节为Length字段值 = 12,加上偏移量最终结果为13,然后将前3字节剥离,也就是最终数据包为13字节,字符串占12字节,所以还会带一个HDR2字段。
源码分析
到这里,相信读者已经知道这个解码器应该如何使用了。那么直接开门见山,进入源码的探索过程。
首先,LengthFieldBasedFrameDecoder
继承于ByteToMessageDecoder
,关于此类介绍可以看上一篇的 Netty解码器源码分析 。也就是说,这个解码器还是基于之前的逻辑,我们之间看子类抽象方法的实现decode方法即可
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 丢弃模式
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
// 如果可读的数据都还没到Length长度的起始点,直接不进行解码,下次再说
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
// 实际Length字段的读起始点
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// 获取帧长度(还未调整)
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
// 如果Length字段值为负数,是不正常的
if (frameLength < 0) {
// 跳过这些数据,然后抛异常
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
// 调整长度
frameLength += lengthAdjustment + lengthFieldEndOffset;
// 若后面要读取的数据包长度都比Length字段的起始还要小,显然是不正常的
if (frameLength < lengthFieldEndOffset) {
// 跳过这些数据,然后抛异常
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
// 要读取的数据包长度大于最大长度,是需要被抛弃的
if (frameLength > maxFrameLength) {
// 进入丢弃
exceededFrameLength(in, frameLength);
return null;
}
// never overflows because it's less than maxFrameLength
// 因为这里maxFrameLength的值是int值,代码进行到这里肯定比maxFrameLength小
// 那么也肯定比int最大值小,强转不会溢出
int frameLengthInt = (int) frameLength;
// 可读的数据并不够一个数据包,不做处理,下次再说
if (in.readableBytes() < frameLengthInt) {
return null;
}
// 如果要剥离的长度都大于了要读取的数据包长度,显然是错误的
if (initialBytesToStrip > frameLengthInt) {
// 跳过一个数据包的长度,抛出异常
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
// 代码走到这里,说明要剥离的长度没有问题,将跳过要剥离的长度段
// 从上面的例子也可以看出来,剥离长度是从头开始剥离,所以这里跳过这段长度
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
// 实际上要读取的数据包的长度
int actualFrameLength = frameLengthInt - initialBytesToStrip;
// 零拷贝,直接将ByteBuf剥离出一个子ByteBuf
// 剥离数据:readerIndex开始的actualFrameLength长度
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
// 剥离完,调整read索引,表示已经读出了此段数据
in.readerIndex(readerIndex + actualFrameLength);
// 返回,此段数据到这里就已经解码完成
return frame;
}
相信以上注释应该是比较清晰的描述了整个流程,若有不懂建议多看几遍,在脑海里跑跑流程。
所以,大致的步骤我总结如下:
-
判断是否丢弃模式,若是,开始丢弃,不进行解码
-
若不是,开始检验数据的合法性(长度合法性)
- 可读数据都不足以读出一个Length字段
- Length字段中的值为负数
- 调整后得出的数据包长度比Length字段偏移量还小
- 数据包长度过长,进入丢弃模式
- 可读长度不够读出一个数据包
- 剥离的长度大于一个数据包长度
-
若没有以上情况,则正常进行读取数据包工作,进入extractFrame方法读取数据包
其使用了slice的API,零拷贝地读出数据,期间没有发生内存复制的过程,只是把ByteBuf底层数据的引用截取了想要的那一段,性能上是很优的
protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { // slice 零拷贝剥离ByteBuf return buffer.retainedSlice(index, length); }
-
将ByteBuf的读索引设置到一个数据包的长度后面,表示这边已经读了这么长的数据了,接着返回解码出来的ByteBuf即可
请注意,这里只是将数据分好包而已,避免了半包问题,并且动态的截取可变长度的数据包,但现在数据还是字节状态。一般情况下,我们会在这个解码器后面再加一个解码器,将字节转换为我们方便处理的POJO对象。
丢弃模式
这里单独一节讲述一下丢弃模式。当数据包过大,会进入以下这个方法
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null;
}
传入一个ByteBuf和一个数据包的长度作为参数,进入exceededFrameLength方法
private void exceededFrameLength(ByteBuf in, long frameLength) {
// 需要被丢弃的数据包长度 - 现在可读的长度
long discard = frameLength - in.readableBytes();
// 用来记录丢弃的最大数据包长度
// 在抛出异常的时候可以用到,在完全丢弃之后会reset为0
tooLongFrameLength = frameLength;
// 若小于0 证明可读的长度是大于需要被丢弃的长度的,直接跳过即可
if (discard < 0) {
// buffer contains more bytes then the frameLength so we can discard all now
// 直接跳过这个数据包长度,到此数据已全部丢弃完成,下次可进行正常编码
in.skipBytes((int) frameLength);
} else {
// Enter the discard mode and discard everything received so far.
// 若到这里,说明可读长度小于需要被丢弃的长度,下次还要继续丢
// 所以这里设置一个标识,进入丢弃模式,表示下次编码直接开始丢弃,并不做编码工作
discardingTooLongFrame = true;
// 记录还需要丢弃多长的数据
bytesToDiscard = discard;
// 直接跳过整个可读的长度
in.skipBytes(in.readableBytes());
}
// 是否快速失败和reset操作
failIfNecessary(true);
}
接着就是快速失败和各种reset操作了
// firstDetectionOfTooLongFrame表示是否是第一次进入的丢弃模式
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
// 如果bytesToDiscard值为0,证明数据已经都被丢弃完了
if (bytesToDiscard == 0) {
// Reset to the initial state and tell the handlers that
// the frame was too large.
// 这里取出之前记录的需要抛弃的数据的长度
long tooLongFrameLength = this.tooLongFrameLength;
// reset为0
this.tooLongFrameLength = 0;
// 取消丢弃模式,因为丢弃完了
discardingTooLongFrame = false;
// 这里意思是,在快速失败的情况下,第一次丢弃数据肯定就已经抛出异常了
// 不重复抛异常,所以这里还需要判断一个是否是第一次丢弃数据
// 如果是第一次丢弃数据就丢完了,肯定要抛异常
// 如果不是第一次丢弃数据,还是快速失败的,就不需要抛异常,因为之前肯定抛过异常了
if (!failFast || firstDetectionOfTooLongFrame) {
// 抛出异常
fail(tooLongFrameLength);
}
} else {
// Keep discarding and notify handlers if necessary.
// 如果代码走到这里,证明数据是还没丢弃完的
// 如果是快速失败的话,还需要判断是否是第一次丢数据
// 两者都为true才抛异常,保证只抛出一次异常
if (failFast && firstDetectionOfTooLongFrame) {
// 抛出异常
fail(tooLongFrameLength);
}
}
}
这里只是处理是否抛异常的逻辑和各种reset,没什么好说的。
可以注意到,在我们编码的开头,就会判断此时是否是丢弃模式
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
如果是丢弃模式,说明上次数据是还没丢完的,这里直接进来继续丢弃
private void discardingTooLongFrame(ByteBuf in) {
// 还需要被丢弃的长度
long bytesToDiscard = this.bytesToDiscard;
// 取 需要被丢弃的长度 和 可读长度 中最小值
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
// 直接跳过上面最小值长度的数据
in.skipBytes(localBytesToDiscard);
// 若可读数据大于被丢弃的长度,那么这里bytesToDiscard=0,表示丢弃完了
// 若可读数据小于被丢弃的长度,表示还没丢弃完
bytesToDiscard -= localBytesToDiscard;
// 记录下还需要丢弃的长度
this.bytesToDiscard = bytesToDiscard;
// 快速失败和reset判断,并且设置不是第一次进入丢弃
failIfNecessary(false);
}
值得一提的是,discardingTooLongFrame这个方法并没有return,那么是否可以认为,若数据包比较小的话,设置为快速失败模式可以提升一些性能?因为若数据过大,一次丢不完,如果是快速失败则在第一次解码的时候就已经抛出了异常了,在第二次进入丢弃模式的时候,若丢弃完数据了,丢弃模式取消,那么因为是快速失败所以此时是不需要抛出异常的,可以继续往下进行解码工作,若数据包较小,此时还可以利用这个方法接下去再解出一个数据包,不用等到下一次数据到来才做这件事。
到这里,动态解码器的源码分析就结束了。相信看到这里,读者可以理解这个解码器的大致逻辑了,希望读者在日常使用中可以将其运用的淋漓尽致。