针对Channel上发生的各种网络操作,例如链路创建、链路关闭、消息读写等,Netty将这些消息封装成事件,触发ChannelPipeline调用ChannelHandler链,由系统或者用户实现的ChannelHandler对网络事件做处理。
由于网络事件种类比较多,触发和执行机制也存在一些差异,如果掌握不到位很有可能发生一些莫名其妙的问题。
channelReadComplete被多次调用问题
业务基于Netty开发服务器,生产环境运行一段时间,发现对于同一个HTTP请求消息,ChannelHandler的channelReadComplete()方法被多次调用,但大部分消息都只调用一次,按照业务的设计,当服务器端读到一个完整的HTTP请求消息时,在channelReadComplete()方法中进行业务逻辑,如果一个请求消息的channelReadComplete方法被多次调用,则业务逻辑会出现异常。
通过分析,根本原因:TCP底层并不了解上层业务具体含义,它会根据TCP缓冲区的实际情况进行包的拆分,所以在业务上认为一个完整的HTTP报文可能会被TCP拆分成多个包发送,也可能把多个小的包封装成一个大的数据包发送,导致数据包拆分和重组的原因如下:
- 应用程序写入的字节大小大于套接口发送缓冲区大小
- 进行MSS大小的TCP分段
- 以太网的有效载荷大于MTU的IP分片
- 开启了TCP Nagle算法
由于底层TCP无法理解上层的业务数据,所以在底层无法保证数据包不被拆分和重组,这个问题只能通过上层的应用协议来解决,如下:
- 消息定长,例如每个报文的大小固定为200字节,如果不够,空位补空格。
- 在包尾增加换行符(或其他特定字符串)进行分隔。
- 将消息分为消息头和消息体,消息头包含表示消息总长度的字段,通常消息头的第一个字段使用int32表示消息的总长度。
场景重现
//客户端代码
public class EventTriggerClientHandler extends ChannelInboundHandlerAdapter {
private static AtomicInteger SEQ = new AtomicInteger(0);
static final String ECHO_REQ = "Hi, welcome to Netty";
static final String DELIMITER = " $_";
static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
scheduledExecutorService.scheduleAtFixedRate(() ->{
int counter = SEQ.incrementAndGet();
if(counter % 10 == 0){
ctx.writeAndFlush(Unpooled.copiedBuffer((ECHO_REQ + DELIMITER).getBytes()));
}else{
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
}, 0, 1000, TimeUnit.MILLISECONDS);
}
}
客户端代码。故障模拟,使用分隔符"$“表示一个完整的业务请求信息。客户端每秒向服务端发送一个不完整的信息,到10S的倍数时间时,发送分隔符”$"表明此段完整信息发送完毕。
//服务端代码
public class EventTriggerServerHandler extends ChannelInboundHandlerAdapter {
int counter;
int readCompleteTimes;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("This is " + ++counter + "times receive client : [" + body + "]");
body += "$_";
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readCompleteTimes++;
System.out.println("This is " + readCompleteTimes + " times receive ReadComplete event.");
}
}
在初始化ChannelPipeline时,增加DelimiterBasedFrameDecoder解码器,用于对于以分隔符做码流结束标识的消息做自动解码,示例代码:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(2048, delimiter));
p.addLast(new StringDecoder());
p.addLast(new EventTriggerServerHandler());
}
});
通过上述测试,结果如下:
This is 1 times receive ReadComplete event.
This is 2 times receive ReadComplete event.
This is 3 times receive ReadComplete event.
This is 4 times receive ReadComplete event.
This is 5 times receive ReadComplete event.
This is 6 times receive ReadComplete event.
This is 7 times receive ReadComplete event.
This is 8 times receive ReadComplete event.
This is 9 times receive ReadComplete event.
This is 1times receive client : [Hi, welcome to NettyHi, welcome to NettyHi, welcome to NettyHi, welcome to NettyHi, welcome to NettyHi, welcome to NettyHi, welcome to NettyHi, welcome to NettyHi, welcome to NettyHi, welcome to Netty ]
This is 10 times receive ReadComplete event.
This is 11 times receive ReadComplete event.
This is 12 times receive ReadComplete event.
对于channelRead方法,如果他添加了解析器,则会在消息被解码后才会被调用,而channelReadComplete方法的调用机制则不一样,只要底层的SocketChannel读到了ByteBuf就会触发一次调用,对于一个完整的业务信息,可能就会被多次调用。找到出现问题,业务解决方案可以使将channelReadComplete里面的逻辑放在channelRead方法内执行。