TCP 粘包、半包 Netty 全搞定
什么是粘包和半包?
什么是粘包?
故名思意就是客户端和服务端之间发送的数据包粘在了一起,原本应该分多条发送的数据包粘在了一起发送。
什么是半包?
指的是一条数据包被分割成了多条发送。
为什么 TCP 应用中会出现粘包和半包现象?
粘包的主要原因
- 发送方每次写入数据 < 套接字缓冲区大小;
- 接收方读取套接字缓冲区数据不够及时。
TCP
使用的 Nagle
算法(默认开启,可以手动禁用):
Nagle
算法用于处理小报文段(微小分组)的发送问题,专门用于减少网络中小数据分组的数量情况。- 当一个数据包小于
MSS
(MSS
是一个数据包(data
里面的数据)的最长数据传输限制值,如果大于该值就要进行分段传输。)的时候,该算法会尽可能将所有类似的数据包归为同一个分组进行数据的发送。
避免大量的小数据包发送,因为发送端通常都是收到前一个报文确认之后才会进行下个数据包的发送。
半包的主要原因
- 发送方写入数据 > 套接字缓冲区大小(此时数据必须要拆分,否则无法写入缓冲区)。
- 发送的数据大于协议的
MTU(Maximum Transmission Unit,最大传输单元)
,必须拆包。
核心原因
TCP
是流式协议,消息无边界。UDP
像邮寄的包裹,虽然一次运输多个,但每个包裹都有“界限”,一个一个签收,所以无粘包、半包问题。
解决粘包和半包问题的几种常用方法
几种常用方法:
解决问题的根本手段:找出消息的边界。
Netty 对三种常用封帧方式的支持
Netty 的拆包粘包问题解决方案
定长数据流
客户端和服务端进行消息通讯的时候,提前确定好消息的长度。例如如果客户端发送的数据长度不足 10 个字节的时候,则使用空白字符来补足 10 个字节。
特殊结束字符
根据特殊字符将数据进行分割。
特殊协议
将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段。
解读 Netty 处理粘包、半包的源码
解码核心工作流程
三种类型的解码器都是继承自 ByteToMessageDecoder
,所以我们可以从这个类入手。
(1)ByteToMessageDecoder
的类结构:
很明显,它继承了 ChannelInboundHandlerAdapter
类,这个类有个核心方法就是 channelRead()
,负责处理数据;
(2)ByteToMessageDecoder
的 channelRead()
方法源码,解释包含在源码中了
// 参数 msg 就是我们的传输数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
// 转换为 Netty 的 ByteBuf
ByteBuf data = (ByteBuf) msg;
/**
* cumulation 就是数据积累器
* 判断 cumulation 是否为 null,如果为 null,说明是第一笔数据,直接复制给 cumulation 即可;
* 如果不为 null,说明之前已经有数据被接收了,就追加到 cumulation 中。
* 其实核心工作就是一个数据积累的过程。
*/
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// 追加到 cumulation
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 此时调用了 callDecode,请看下文
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
(3)callDecode()
方法源码(在 channelRead()
方法中调用):
// 参数 in 代表数据积累器
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
// 此时肯定为 0,因为没有数据
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
// decode 中时,不能执行完 handler remove 清理操作。
//那 decode 完之后需要清理数据。
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
(4)decodeRemovalReentryProtection()
方法源码(在 callDecode()
方法中被调用):
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 调用了 decode 方法
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
上面有个核心逻辑,就是调用了 decode()
方法.
(5)decode()
方法如下:
我们发现这是一个抽象方法,它的实现类中有那三种解码器,所以不难知道,这就是解码的核心逻辑;
(5)我们选择其中一个解码器的 decode()
方法来分析,FixedLengthFrameDecoder
的 decode()
方法源码:
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
// 被上面的 decode 方法调用
// 参数 in 表示被数据积累器收集到的数据
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 如果小于我们定义的固定长度,就不解码了,没有意义
if (in.readableBytes() < frameLength) {
return null;
} else {
// 如果大于等于我们定义的数据长度,说明可以解码
// 方法就是根据我们定义的数据长度来解码就醒了
return in.readRetainedSlice(frameLength);
}
}
(6)这样就完成了数据解码的过程。
解码中两种数据积累器(Cumulator)的区别?
(1)在这里调用了数据积累器的方法
(2)我们跟进去 cumulate()
方法,发现它有两种实现:
(3)我们点进第二种的实现(默认实现,内存复制):
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
try {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) { // 按需扩容
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
// 如果空间不够则扩容
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
// 将数据追加到数据积累器中,使用内存复制的方式
buffer.writeBytes(in);
return buffer;
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
in.release();
}
}
};
而另一种实现(组合的方式):
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
try {
if (cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the
// user use slice().retain() or duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
// 空间扩容
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
buffer.writeBytes(in);
} else {
CompositeByteBuf composite;
// 创建 composite bytebuf,如果已经创建过,就不用了
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
composite.addComponent(true, cumulation);
}
// 避免内存复制
composite.addComponent(true, in);
in = null;
buffer = composite;
}
return buffer;
} finally {
if (in != null) {
// We must release if the ownership was not transferred as otherwise it may produce a leak if
// writeBytes(...) throw for whatever release (for example because of OutOfMemoryError).
in.release();
}
}
}
};
(3)总结就是,两种数据积累器,一种使用的方式是内存复制(默认),而另一种使用的是组合的方式,提供一个逻辑的视图;
为什么会选择内存复制的方式,而不选择组合方式,明明组合的方式效率更高。
源码中注释已经解释了:
三种解码器的常用额外控制参数有哪些?
DelimiterBasedFrameDecoder
这是一个数组,它不仅仅可以支持一个分隔符,还可以支持多个分隔符。
FixedLengthFrameDecoder
自定义长度;
LengthFieldBasedFrameDecoder