在了解IO,NIO,AIO知识后,学习Netty5便会轻松很多,本章节主要介绍Netty是如何接收,反馈数据和拆包粘包的问题
。
1 Netty基础知识
1.1 Netty作用
Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠性 的网络服务器和客户端程序。Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty简化了网络程序的开发过程,比如TCP和UDP的 Socket的开发。
1.2 TCP和UDP
TCP(Transmission Control Protocol,传输控制协议)是基于连接的协议,也就是说,在正式收发数据前,必须和对方建立可靠的连接。
UDP(User Data Protocol,用户数据报协议)是与TCP相对应的协议。它是面向非连接的协议,它不与对方建立连接,而是直接就把数据包发送过去!
UDP(User Data Protocol,用户数据报协议)是与TCP相对应的协议。它是面向非连接的协议,它不与对方建立连接,而是直接就把数据包发送过去!
TCP | UDP | |
是否连接 | 面向连接 | 面向非连接 |
传输可靠性 | 可靠 | 不可靠 |
应用场合 | 传输大量数据 | 传输少量数据 |
速度 | 慢 | 快 |
2 HelloWorld代码
2.1 DISCARD服务:
丢弃服务,丢弃了所有接收到的数据,并不做任何响应。
简单理解就是接收数据,不返回数据
2.2 ECHO服务:响应式协议,
这个协议针对任何接收的数据都会返回一个响应
2.3 代码事例:
首选是服务器处理类
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.ReferenceCountUtil;
- /**
- * DISCARD协议 丢弃协议,其实就是只接收数据,不做任何处理。
- * ECHO服务(响应式协议),其实就是返回数据。
- * 实现步骤:
- * step1 继承 ChannelHandlerAdapter
- * step2 覆盖chanelRead()事件处理方法
- * step3 释放ByteBuffer,ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
- * step4 异常处理,即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。
- */
- public class DiscardServerHandler extends ChannelHandlerAdapter{
- @Override
- public void channelRead(ChannelHandlerContext chc, Object msg) {
- try {
- // 简单的读写操作
- ByteBuf buf = (ByteBuf) msg;
- byte[] req = new byte[buf.readableBytes()];
- buf.readBytes(req);
- String body = new String(req, "utf-8");
- System.out.println("Server :" + body);
- chc.writeAndFlush(Unpooled.copiedBuffer("卒...... ".getBytes())); // 返回数据
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- ReferenceCountUtil.release(msg); // 释放msg
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
- // 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
- cause.printStackTrace();
- chc.close();
- }
- }
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- /**
- * step1 创建两个线程组分别负责接收和处理
- * step2 启动NIO服务辅助类
- * step3 绑定两个线程组,指定一个通道,关联一个处理类,设置一些相关参数
- * step4 监听端口
- * step5 关闭一些资源
- */
- public class DiscardServer {
- private static final int PORT = 8888; // 监听的端口号
- public static void main(String[] args) {
- // NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器
- EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于接收进来的连接
- EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理进来的连接
- try {
- ServerBootstrap bootstrap = new ServerBootstrap(); // ServerBootstrap 是一个启动NIO服务的辅助启动类
- bootstrap.group(bossGroup, workerGroup) // 绑定俩个线程组
- .channel(NioServerSocketChannel.class) // 指定用 NioServerSocketChannel 通道
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new DiscardServerHandler()); // DiscardServerHandler是我们自定义的服务器处理类,负责处理连接
- }
- })
- .option(ChannelOption.SO_BACKLOG, 128) // 设置tcp缓冲区
- .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置保持连接
- ChannelFuture future = bootstrap.bind(PORT).sync(); // 绑定端口
- future.channel().closeFuture().sync(); // 等待关闭
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- workerGroup.shutdownGracefully(); // 关闭线程组,先打开的后关闭
- bossGroup.shutdownGracefully();
- }
- }
- }
客户端处理类
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.ReferenceCountUtil;
- /**
- *
- * 和ServerHandler类似
- *
- */
- public class ClientHandler extends ChannelHandlerAdapter{
- public void channelRead(ChannelHandlerContext chc, Object msg) {
- try {
- ByteBuf buf = (ByteBuf) msg;
- byte[] req = new byte[buf.readableBytes()];
- buf.readBytes(req);
- String body = new String(req, "utf-8");
- System.out.println("Client :" + body);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- ReferenceCountUtil.release(msg); // 释放msg
- }
- }
- public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
- // 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
- cause.printStackTrace();
- chc.close();
- }
- }
- import io.netty.bootstrap.Bootstrap;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- public class Client {
- private static final int PORT = 8888;
- private static final String HOST = "127.0.0.1";
- public static void main(String[] args) {
- NioEventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(workerGroup)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ClientHandler());
- }
- })
- .option(ChannelOption.SO_KEEPALIVE, true);
- ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); // 建立连接
- future.channel().writeAndFlush(Unpooled.copiedBuffer("快醒醒,还有几个bug没有改".getBytes())); // 向服务端发送数据
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- workerGroup.shutdownGracefully();
- }
- }
- }
- Server :快醒醒,还有几个bug没有改
- Client :卒......
3 拆包粘包
3.1 使用特殊的分隔符
3.2 限定长度,不推荐。若发送数据的长度不够指定长度,则一直处于等待中。
代码在原来的基础上做了简单修改,可以打开注释自己调试
首选是服务器处理类
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.ReferenceCountUtil;
- /**
- * DISCARD协议 丢弃协议,其实就是只接收数据,不做任何处理。
- * ECHO服务(响应式协议),其实就是返回数据。
- * 实现步骤:
- * step1 继承 ChannelHandlerAdapter
- * step2 覆盖chanelRead()事件处理方法
- * step3 释放ByteBuffer,ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
- * step4 异常处理,即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。
- */
- public class DiscardServerHandler extends ChannelHandlerAdapter{
- private static final String DELIMITER = "^_^"; // 拆包分隔符
- @Override
- public void channelRead(ChannelHandlerContext chc, Object msg) {
- try {
- // 简单的读写操作
- /*
- ByteBuf buf = (ByteBuf) msg;
- byte[] req = new byte[buf.readableBytes()];
- buf.readBytes(req);
- String body = new String(req, "utf-8");
- System.out.println("Server :" + body);
- chc.writeAndFlush(Unpooled.copiedBuffer("卒...... ".getBytes())); // 返回数据
- */
- // 加了 StringDecoder 字符串解码器后可以直接读取
- System.out.println("Server :" + msg);
- // 分隔符拆包
- // String response = msg + " , 骗你的" + DELIMITER;
- // chc.channel().writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
- // 限定长度拆包
- chc.channel().writeAndFlush(Unpooled.copiedBuffer(msg.toString().getBytes()));
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- ReferenceCountUtil.release(msg); // 释放msg
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
- // 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
- cause.printStackTrace();
- chc.close();
- }
- }
然后是服务端Netty启动代码
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.DelimiterBasedFrameDecoder;
- import io.netty.handler.codec.FixedLengthFrameDecoder;
- import io.netty.handler.codec.string.StringDecoder;
- /**
- * step1 创建两个线程组分别负责接收和处理
- * step2 启动NIO服务辅助类
- * step3 绑定两个线程组,指定一个通道,关联一个处理类,设置一些相关参数
- * step4 监听端口
- * step5 关闭一些资源
- */
- public class DiscardServer {
- private static final int PORT = 8888; // 监听的端口号
- private static final String DELIMITER = "^_^"; // 拆包分隔符
- public static void main(String[] args) {
- // NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器
- EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于接收进来的连接
- EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理进来的连接
- try {
- ServerBootstrap bootstrap = new ServerBootstrap(); // ServerBootstrap 是一个启动NIO服务的辅助启动类
- bootstrap.group(bossGroup, workerGroup) // 绑定俩个线程组
- .channel(NioServerSocketChannel.class) // 指定用 NioServerSocketChannel 通道
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- // 考虑到tcp拆包粘包的问题,升级代码
- // step1 获取特殊分隔符的ByteBuffer
- ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());
- // step2 设置特殊分隔符
- // socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter));
- // 还有一种就是指定长度 二选一 (用的比较少)
- socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
- // step3 设置字符串形式的解码
- socketChannel.pipeline().addLast(new StringDecoder());
- // step4 设置处理类
- socketChannel.pipeline().addLast(new DiscardServerHandler()); // DiscardServerHandler是我们自定义的服务器处理类,负责处理连接
- }
- })
- .option(ChannelOption.SO_BACKLOG, 128) // 设置tcp缓冲区
- .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置保持连接
- ChannelFuture future = bootstrap.bind(PORT).sync(); // 绑定端口
- future.channel().closeFuture().sync(); // 等待关闭
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- workerGroup.shutdownGracefully(); // 关闭线程组,先打开的后关闭
- bossGroup.shutdownGracefully();
- }
- }
- }
客户端处理类
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerAdapter;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.util.ReferenceCountUtil;
- /**
- *
- * 和ServerHandler类似
- *
- */
- public class ClientHandler extends ChannelHandlerAdapter{
- public void channelRead(ChannelHandlerContext chc, Object msg) {
- try {
- /*
- ByteBuf buf = (ByteBuf) msg;
- byte[] req = new byte[buf.readableBytes()];
- buf.readBytes(req);
- String body = new String(req, "utf-8");
- */
- System.out.println("Client : " + msg);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- ReferenceCountUtil.release(msg); // 释放msg
- }
- }
- public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {
- // 这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
- cause.printStackTrace();
- chc.close();
- }
- }
客户端启动服务类
- import io.netty.bootstrap.Bootstrap;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.DelimiterBasedFrameDecoder;
- import io.netty.handler.codec.FixedLengthFrameDecoder;
- import io.netty.handler.codec.string.StringDecoder;
- public class Client {
- private static final int PORT = 8888;
- private static final String HOST = "127.0.0.1";
- private static final String DELIMITER = "^_^"; // 拆包分隔符
- public static void main(String[] args) {
- NioEventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(workerGroup)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- // 考虑到tcp拆包粘包的问题,升级代码
- ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());
- // socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter));
- socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
- socketChannel.pipeline().addLast(new StringDecoder());
- socketChannel.pipeline().addLast(new ClientHandler());
- }
- })
- .option(ChannelOption.SO_KEEPALIVE, true);
- ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); // 建立连接
- // future.channel().writeAndFlush(Unpooled.copiedBuffer("快醒醒,还有几个bug没有改".getBytes())); // 向服务端发送数据
- // future.channel().writeAndFlush(Unpooled.copiedBuffer(("又要加班了"+DELIMITER).getBytes()));
- // future.channel().writeAndFlush(Unpooled.copiedBuffer(("好开心啊T。T"+DELIMITER).getBytes()));
- future.channel().writeAndFlush(Unpooled.copiedBuffer("123456789".getBytes())); // 传的个数是9个,只打印了5个,还有4个在等待中
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- workerGroup.shutdownGracefully();
- }
- }
- }