netty-netty中的分隔符和定长解码器
摘自<netty权威指南>
解决TCP粘包和拆包导致的半包问题
分隔符解码器
服务器端:
# EchoServer
public class EchoServer {
private int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
new EchoServer(port).start();
}
private void start() throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
// ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
// Unpooled.copiedBuffer("$_".getBytes())));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = b.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
# EchoServerHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
String body = (String) msg;
System.out.println("this is " + ++counter
+ " times recv client[x]: [" + body + "]");
body += "$_";
ByteBuf resp = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端实现:
public class EchoClient {
private String host;
private int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public static void main(String[] args) throws Exception{
int port = 8080;
String host = "127.0.0.1";
new EchoClient(host, port).connect();
}
private void connect() throws Exception{
EventLoopGroup boss = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(boss)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.copiedBuffer("$_".getBytes())));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = b.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
}
}
}
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
private static String word = "hello netty world $_";
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(word.getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
String body = (String) msg;
System.out.println("this is " + ++counter
+ "times recv server: [" + body + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
如果将服务器端EchoServer
的DelimiterBasedFrameDecoder
注释后,服务器接收的结果为:
this is 1 times recv client[x]: [hello netty world $_hello netty world
$_hello netty world $_hello netty world $_hello netty world $_
hello netty world $_hello netty world $_hello netty world
$_hello netty world $_hello netty world $_]
如果打开服务器端EchoServer
的DelimiterBasedFrameDecoder
注释后,服务器接收的结果为:
this is 1 times recv client[x]: [hello netty world ]
this is 2 times recv client[x]: [hello netty world ]
this is 3 times recv client[x]: [hello netty world ]
this is 4 times recv client[x]: [hello netty world ]
this is 5 times recv client[x]: [hello netty world ]
this is 6 times recv client[x]: [hello netty world ]
this is 7 times recv client[x]: [hello netty world ]
this is 8 times recv client[x]: [hello netty world ]
this is 9 times recv client[x]: [hello netty world ]
this is 10 times recv client[x]: [hello netty world ]
可以发现分隔符解码器确实有效了
定长解码器
固定长度解码器,无论一次性接收多少数据,它能够按照指定的长度对消息进行自动解码,如果是半包,
FixedLengthFramdeDecoder
会缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包
public class EchoServer {
private int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
new EchoServer(port).start();
}
private void start() throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, work)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = b.bind(port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()){
System.out.println("server start success...");
}else {
System.out.println("server start failed...");
}
}
});
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) throws Exception {
String body = (String) msg;
System.out.println("server recv[x]: "+ body);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}