Netty Decoder对TCP粘包的处理

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/about4years/article/details/81118603

粘包概念不多阐述,看代码:

对于clientHandler:

class SendClientHandler extends SimpleChannelInboundHandler {
    private byte[] req;

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        req = ("abc").getBytes();
        for (int i = 0; i < 10; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }
}

连续发送10条"abc"。

对于server Handler:

class SimpleServerHandler extends SimpleChannelInboundHandler {
        int count = 0;

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            //这一步必须要求添加StringDecoder 
            String s = (String) msg;
            System.out.println("channelRead0 invoked,count is " + ++count);
            System.out.println(s);
        }
    }

收到一条就打印一条,按照常理来说,channelRead0会被触发10次,然而结果是:

channelRead0 invoked,count is 1
abcabcabcabcabcabcabcabcabcabc

可以看到一次性收到了所有abc,这就是所谓的粘包。我们需要做的就是拆开来一个个分离的报文。另外为什么channelRead0中的msg可以直接强转成String,后文会提到,需要知道的是我们必须在我们的自定义handler之前添加netty提供的StringDecoder,不添加的话是会报错强转异常的。

回到粘包的问题,怎么解决呢?我们可以在自定义Handler中自己去切分,做内部缓冲,然而更加优雅的处理方案是增加多个ChannelHandlerChannelPipeline 把一整个ChannelHandler拆分成多个模块以减少应用的复杂程度。

自定义一个解码器,继承netty提供的ByteToMessageDecoder,简单重写decode方法:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
             if (in.readableBytes() < 3) {
                 return;
             }
             out.add(in.readBytes(3));
         }

在pipeline中加入这个handler。这里有个问题:应该添加在StringDecoder之前还是之后?后文分析ByteToMessageDecoder源码会提到。这里直接看输出结果:

channelRead0 invoked,count is 1
abc
channelRead0 invoked,count is 2
abc
channelRead0 invoked,count is 3
abc
channelRead0 invoked,count is 4
abc
channelRead0 invoked,count is 5
abc
channelRead0 invoked,count is 6
abc
channelRead0 invoked,count is 7
abc
channelRead0 invoked,count is 8
abc
channelRead0 invoked,count is 9
abc
channelRead0 invoked,count is 10
abc

可以看到消息被正确的切分了。如果我们想一次打印2个abc呢?只需要将handler里decode中的2个3改成6即可。其实decode方法很好理解,return的意思就是现在的数据还不是我想要(能够进行切分的),我需要等待更多的数据进来。out.add则是完成了一次切分。

以上的切分属于字符串的切分,比较简单,接下来设想一种自定义报文,分为header+data,header简单的由2部分构成,固定的标志位+len(2byte),len代表data的长度,一个完整的报文长度就是1byte+1byte+len(data),这里暂定len不超过byte能表达的最大值。那decode方法应该怎么写?

从最简单的情况开始考虑,我们设定标志位是a,客户端以如下逻辑发送报文:

for (int i = 1; i < 21; i++) {
            if (i % 5 != 0) {
                req = "a4bcde".getBytes();
            } else {
                req = "a8bcdefghi".getBytes();
            }
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
            //System.out.println("send" + Arrays.toString(req));
        }

每当i是5的倍数时,发送data长度是8的报文。

其实假定客户端发送的报文都是正确的,永远是以a开头,那么我们处理粘包的逻辑将会非常简单,只需要简单的判断长度够不够,够的话解析出len的长度,读取对应长度的字节然后切分就好了,因为不论你有没有成功的完成一次切分,每次进行decode你读取的第一个字节一定是a,不可能会是其他的字节。假定你连续发送a4bcde a4bcde2个报文,当你粘在一起发过了,那么我会进行2次切分,假定你第一次发了a4b,那么我会判断出长度不够等待接下来的字节,直到剩下的字节填充完毕进行切分。所以服务端的decoder可以这样写:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
           if (in.readableBytes() < 2) { //header至少有2个字节,假定可以发送空data
               return;
           }
           in.markReaderIndex();
           //标志位
           in.readByte();
           byte len = in.readByte();
           //len = 4,字符4转成byte对应52
           if (len == 52) {
               if (in.readableBytes() < 4) {
                   in.resetReaderIndex();
                   return;
               }
               out.add(in.readBytes(4));
           } else {
               if (in.readableBytes() < 8) {
                   in.resetReaderIndex();
                   return;
               }
               out.add(in.readBytes(8));
           }
        }

注意长度不够的话需要resetReaderIndex以便下次decode重新进行读取。在客户端符合要求,并且不出错的情况下,我们这样的decoder是完全可行的,可以正确的拆分出一个个报文数据。然而,可能会出现一些情况,客户端出现问题,发送了不符合要求的报文,例如中间突然发了一次rtyu的报文,那么我们的decoder就会出现问题,或者说可能会有报文攻击,发送大量非法发文,那么我们的decoder也是会出错的,考虑这两种情况的话,我们要做的就是假如出现非法报文,丢弃它。看下面的decoder:

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
           if (in.readableBytes() < 2) { //header至少有2个字节,假定可以发送空data
               return;
           }
           while (true) {
               if (!in.isReadable()) {
                   //当一直读完还读不到标志位,那么return,不reset,此时之前的报文被抛弃
                   return;
               }
               //此处进行mark操作,确保当发送break时能够定位到标志报文处
               in.markReaderIndex();
               byte b = in.readByte();
               System.out.println("b is"+ b);
               if (b == 97) {
                   break;
               }
           }
           in.resetReaderIndex();
           in.readByte();
           byte len = in.readByte();
           //len = 4,字符4转成byte对应52
           if (len == 52) {
               if (in.readableBytes() < 4) {
                   in.resetReaderIndex();
                   return;
               }
               out.add(in.readBytes(4));
           } else {
               if (in.readableBytes() < 8) {
                   in.resetReaderIndex();
                   return;
               }
               out.add(in.readBytes(8));
           }
        }

我们用一个while循环来解决这个问题,此时就算你发送大量垃圾报文,只要我读不到标志位,那么垃圾报文会被舍弃,一旦读到标志位,就会定位到标志位然后进行正常的读取数据逻辑。client代码改成如下:

for (int i = 1; i < 30; i++) {
            req = "plmplm".getBytes();
            if (i == 25) {
                req = "a4bcde".getBytes();
            } else if(i==28) {
                req = "a8bcdefghi".getBytes();
            }
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
            System.out.println("send" + Arrays.toString(req));
        }

只有当i是25/28时发送正确格式的报文,看服务端的输出:

4 ---msg len
server receive order : bcde;the counter is: 1
8 ---msg len
server receive order : bcdefghi;the counter is: 2

服务端接收的代码很简单:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String)msg;
        System.out.println(body.length() + " ---msg len");
        System.out.println("server receive order : " + body + ";the counter is: " + ++counter);
    }

猜你喜欢

转载自blog.csdn.net/about4years/article/details/81118603