1,准备好netty必须的jar
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.23.Final</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
2,服务端-Server
启动服务方法:
public void satrtServer(int port) throws Exception {
//处理和客户端连接的线程组(构造器参数默认是空,线程数就是当前机器CPU的内核个数)
EventLoopGroup acceptGroup = new NioEventLoopGroup(1);
//处理客户端与服务端写入读取的IO等业务的线程组
EventLoopGroup handlerIoGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(acceptGroup, handlerIoGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024) //http缓冲区
.option(ChannelOption.SO_SNDBUF, 3 * 1024) //发送到客户端的最大长度
.option(ChannelOption.SO_RCVBUF, 3 * 1024) //接收客户端信息的最大长度
.option(ChannelOption.TCP_NODELAY, true) //无延迟推送
.childOption(ChannelOption.SO_KEEPALIVE, true)
//服务端是childHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBUffer("$@$".getBytes("UTF-8"));
//处理客户端连续发送流导致粘包问题,客户端发送的信息需要$@$代表发生结束
ch.pipeline().addLast(new DelimiterBasedFrameDecoder( 3 * 1024,byteBuf));
ch.pipeline().addLast(new ReadTimeoutHandler(10)); //超过10秒没有读取到客户端发送的信息就关闭这个通道
ch.pipeline().addLast(new ServerHandler());
}
});
//异步阻塞
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
acceptGroup.shutdownGracefully();
handlerIoGroup .shutdownGracefully();
}
}
3,实现服务端处理业务ServerHandler:
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msgBuf = (ByteBuf) msg;
String gMsg = msgBuf.toString(CharsetUtil.UTF_8);
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("收到信息了".getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}