项目结构
服务端
package com.nio.echo.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class EchoServer {
public void bind(int port) throws Exception{
// 配置服务端的NIO线程
NioEventLoopGroup groupBig = new NioEventLoopGroup();
NioEventLoopGroup groupSmall = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(groupBig,groupSmall)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 创建分隔符缓冲对象$_作为分割符
ByteBuf byteBuf = Unpooled.copiedBuffer("$_".getBytes());
/**
* 第一个参数:单条消息的最大长度,当达到最大长度仍然找不到分隔符抛异常,防止由于异常码流缺失分隔符号导致的内存溢出
* 第二个参数:分隔符缓冲对象
*/
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture future = bootstrap.bind(port).sync();
// 等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
// 退出,释放资源
groupBig.shutdownGracefully();
groupSmall.shutdownGracefully();
}
}
public static void main(String[] args)throws Exception {
int port = 8836;
if (args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e){
e.printStackTrace();
}
}
new EchoServer().bind(port);
}
}
package com.nio.echo.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
int counter = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
// 打印接收到的消息
System.out.println("服务端的计数器:" + ++counter + " ,客户端发来的消息是:【 " + body + "】");
body += "$_";
ByteBuf byteBuf = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(byteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端
package com.nio.echo.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class EchoClient {
public void connect(int port,String host)throws Exception{
// 配置客户端NIO线程组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("$_".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, byteBuf));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
// 发起异步连接操作
ChannelFuture future = bootstrap.connect(host, port).sync();
// 等待客户端链路关闭
future.channel().closeFuture().sync();
}finally {
// 优雅退出
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
int port = 8836;
if (args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e){
e.printStackTrace();
}
}
new EchoClient().connect(port, "localhost");
}
}
package com.nio.echo.client;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoClientHandler extends ChannelInboundHandlerAdapter{
private int counter;
public static final String ECHO_STR = "hellp world nio.$_. welcome come to netty.$_";
public EchoClientHandler() {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_STR.getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("客户端第: " + ++counter + " 次收到服务端的返回数据" + " 【" + msg + " 】");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
运行结果:
netty固定长度解码器的使用URL: