LineBasedFrameDecoder 行解码器,回车换行符解决 TCP 粘包

LineBasedFrameDecoder

为了解决 TCP 粘包/拆包导致的半包读写问题,Netty 默认提供了许多种编解码器用于处理半包

本文将学习 LineBasedFrameDecoder :基于行的帧解码器,配合 StringDecoder 解码器一同使用

        LineBasedFrameDecoder 的工作原理是它依次遍历 ByteBuf 中的可读字节,判断看是否有 "\n” 或者 "\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以回车换行符为结束标记的解码器,支持配置单行的最大长度,如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读取到的异常码流(Netty 4.1 版本亲测:没有抛出异常,但是客户端会被自动断开连接)

      StringDecoder 解码器工作原理将接收到的对象转换成字符串,然后继续调用后面的 Hander。

      LineBasedFrameDecoder + StringDecoder 组合就是按行切换的文本解码器,它被设计用来支持 TCP 的粘包与拆包。

服务端

本文仍然以 《TCP 粘包/拆包说明 及 异常案例》的示例进行改写。

TimeServer 内容如下:

package com.lct.netty.stringDecoderStickyBag;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * Created by Administrator on 2017/5/16.
 */
public class TimeServer {
    public static void main(String[] args) {
        int port = 9898;
        new TimeServer().bind(port);
    }

    public void bind(int port) {
        /**
         * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
         * 配置服务端的 NIO 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组
         * bossGroup 用于服务端接受客户端连接,workerGroup 用于进行 SocketChannel 网络读写*/
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            /** ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度
             * */
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            /**服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成*/
            ChannelFuture f = b.bind(port).sync();

            System.out.println(Thread.currentThread().getName() + ",服务器开始监听端口,等待客户端连接.........");
            /**下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束
             * */
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            /**优雅退出,释放线程池资源*/
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {
            /**
             * 添加 LineBasedFrameDecoder 与 StringDecoder解码器
             */
            arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
            arg0.pipeline().addLast(new StringDecoder());
            arg0.pipeline().addLast(new TimeServerHandler());
        }
    }
}

重点在第 61-62行,新增了两个解码器。

TimeServerHandler 内容如下:

package com.lct.netty.stringDecoderStickyBag;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/5/16.
 * ChannelInboundHandlerAdapter extends ChannelHandlerAdapter 用于对网络事件进行读写操作
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 因为多线程,所以使用原子操作类来进行计数
     */
    private static AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 收到客户端消息,自动触发
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        /**
         * 这个 msg 已经是解码成功的消息,所以不再需要像以前一样使用 ByteBuf 进行编码
         * 直接转为 string 字符串即可*/
        String body = (String) msg;
        System.out.println((atomicInteger.addAndGet(1)) + "--->" + Thread.currentThread().getName() + ",The server receive  order : " + body);

        /**回复消息
         * copiedBuffer:创建一个新的缓冲区,内容为里面的参数
         * 通过 ChannelHandlerContext 的 write 方法将消息异步发送给客户端
         * 注意解决 TCP 粘包的策略之一就是:在包尾增加回车换行符进行分割
         * System.getProperty("line.separator");屏蔽了 Windows和Linux的区别
         * windows 系统上回车换行符 "\n",Linux 系统上是 "/n"
         * */
        String respMsg = "I am Server,消息接收 success!" + Thread.currentThread().getName() + System.getProperty("line.separator");
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        /**
         * 每次写的时候,同时刷新,防止 TCP 粘包
         */
        ctx.writeAndFlush(respByteBuf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("-----客户端关闭:" + ctx.channel().remoteAddress());
        /**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */
        ctx.close();
    }
}

重点在第 33 行:将解码成功的消息直接转为了字符串;第 43 行:发送的消息以回车换行符结尾。第 48 行:每次写的同时进行刷新。

客户端

TimeClient 内容如下:

package com.lct.netty.stringDecoderStickyBag;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

/**
 * Created by Administrator on 2017/5/16.
 */
public class TimeClient {

    /**
     * 使用 1 个线程模拟 1 个客户端
     *
     * @param args
     */
    public static void main(String[] args) {
        for (int i = 0; i < 1; i++) {
            new Thread(new MyThread()).start();
        }
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            connect("192.168.1.20", 9898);
        }

        public void connect(String host, int port) {
            /**配置客户端 NIO 线程组/池*/
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                /**Bootstrap 与 ServerBootstrap 都继承(extends)于 AbstractBootstrap
                 * 创建客户端辅助启动类,并对其配置,与服务器稍微不同,这里的 Channel 设置为 NioSocketChannel
                 * 然后为其添加 Handler,这里直接使用匿名内部类,实现 initChannel 方法
                 * 作用是当创建 NioSocketChannel 成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件*/
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                /**
                                 * 添加 LineBasedFrameDecoder、StringDecoder 解码器
                                 */
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new StringDecoder());
                                ch.pipeline().addLast(new TimeClientHandler());
                            }
                        });

                /**connect:发起异步连接操作,调用同步方法 sync 等待连接成功*/
                ChannelFuture channelFuture = b.connect(host, port).sync();
                System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接..........");

                /**等待客户端链路关闭*/
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                /**优雅退出,释放NIO线程组*/
                group.shutdownGracefully();
            }
        }
    }
}

重点在第 54-55 行,新加两个解码器,和服务器一样。

TimeClientHandler 内容如下:

package com.lct.netty.stringDecoderStickyBag;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/5/17.
 * 用于对网络事件进行读写操作
 */
public class TimeClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 因为 Netty 采用线程池,所以这里使用原子操作类来进行计数
     */
    private static AtomicInteger atomicInteger = new AtomicInteger();

    /**
     * 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        /**
         * 连续发送 50 条数据
         */
        for (int i = 0; i < 50; i++) {
            /**
             * 解决 TCP 粘包的策略之一就是:在包尾增加回车换行符进行分割
             * System.getProperty("line.separator");屏蔽了 Windows和Linux的区别
             * windows 系统上回车换行符 "\n",Linux 上是 "/n"
             */
            String reqMsg = (i + 1) + ",我是客户端 " + Thread.currentThread().getName() + System.getProperty("line.separator");
            byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
            ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);
            /**
             * writeBytes:将指定的源数组的数据传输到缓冲区
             * 调用 ChannelHandlerContext 的 writeAndFlush 方法将消息发送给服务器
             */
            reqByteBuf.writeBytes(reqMsgByte);
            /**
             * 每次发送的同时进行刷新
             */
            ctx.writeAndFlush(reqByteBuf);
        }
    }

    /**
     * 当服务端返回应答消息时,channelRead 方法被调用,从 Netty 的 ByteBuf 中读取并打印应答消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        /**
         * 这个 msg 已经是解码成功的消息,所以不再需要像以前一样使用 ByteBuf 进行编码
         * 直接转为 string 字符串即可*/
        String body = (String) msg;
        System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + body);
    }

    /**
     * 当发生异常时,打印异常 日志,释放客户端资源
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**释放资源*/
        ctx.close();
    }
}

重点在第 35 行:发送包的结尾以回车换行符结尾;第 58 行:将解码成功的消息直接转为字符串。

运行测试

同样先开服务器,再开客户端,客户端输出如下:

客户端输出如下:

Thread-0,客户端发起异步连接..........
1---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
2---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
3---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
4---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
5---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
6---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
7---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
8---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
9---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
10---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1

...........省略 30 行............
41---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
42---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
43---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
44---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
45---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
46---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
47---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
48---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
49---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1
50---nioEventLoopGroup-2-1,Server return Message:I am Server,消息接收 success!nioEventLoopGroup-3-1

服务器输出如下:

main,服务器开始监听端口,等待客户端连接.........
1--->nioEventLoopGroup-3-1,The server receive  order : 1,我是客户端 nioEventLoopGroup-2-1
2--->nioEventLoopGroup-3-1,The server receive  order : 2,我是客户端 nioEventLoopGroup-2-1
3--->nioEventLoopGroup-3-1,The server receive  order : 3,我是客户端 nioEventLoopGroup-2-1
4--->nioEventLoopGroup-3-1,The server receive  order : 4,我是客户端 nioEventLoopGroup-2-1
5--->nioEventLoopGroup-3-1,The server receive  order : 5,我是客户端 nioEventLoopGroup-2-1
6--->nioEventLoopGroup-3-1,The server receive  order : 6,我是客户端 nioEventLoopGroup-2-1
7--->nioEventLoopGroup-3-1,The server receive  order : 7,我是客户端 nioEventLoopGroup-2-1
8--->nioEventLoopGroup-3-1,The server receive  order : 8,我是客户端 nioEventLoopGroup-2-1
9--->nioEventLoopGroup-3-1,The server receive  order : 9,我是客户端 nioEventLoopGroup-2-1
10--->nioEventLoopGroup-3-1,The server receive  order : 10,我是客户端 nioEventLoopGroup-2-1

...........省略 30 行............
41--->nioEventLoopGroup-3-1,The server receive  order : 41,我是客户端 nioEventLoopGroup-2-1
42--->nioEventLoopGroup-3-1,The server receive  order : 42,我是客户端 nioEventLoopGroup-2-1
43--->nioEventLoopGroup-3-1,The server receive  order : 43,我是客户端 nioEventLoopGroup-2-1
44--->nioEventLoopGroup-3-1,The server receive  order : 44,我是客户端 nioEventLoopGroup-2-1
45--->nioEventLoopGroup-3-1,The server receive  order : 45,我是客户端 nioEventLoopGroup-2-1
46--->nioEventLoopGroup-3-1,The server receive  order : 46,我是客户端 nioEventLoopGroup-2-1
47--->nioEventLoopGroup-3-1,The server receive  order : 47,我是客户端 nioEventLoopGroup-2-1
48--->nioEventLoopGroup-3-1,The server receive  order : 48,我是客户端 nioEventLoopGroup-2-1
49--->nioEventLoopGroup-3-1,The server receive  order : 49,我是客户端 nioEventLoopGroup-2-1
50--->nioEventLoopGroup-3-1,The server receive  order : 50,我是客户端 nioEventLoopGroup-2-1

由此可见 TCP 粘包问题完美解决。

本文介绍的是对于以回车换行符结束的消息处理,如果对方发送的消息不是以换行符结束,那么 Netty 也提供了其它的解码器,后续会继续讲解。

下一篇《DelimiterBasedFrameDecoder 自定义分隔符解码器,解决 TCP 粘包

 

 

猜你喜欢

转载自blog.csdn.net/wangmx1993328/article/details/83929476