netty组件详解-中

接着之前的博客netty组件详解-上,我们继续深入到源码层面,来探究netty的各个组件和其设计思想:

  1. netty内置的通讯模式
    我们在编写netty代码时,经常使用NioServerSocketChannel 作为通讯模式。
    例如下面的简单netty客户端示例:
 private void start() throws InterruptedException {
    
    
 		// 客户端采用java NIO 的通讯模型
        EventLoopGroup group = new NioEventLoopGroup();
        try{
    
    
            Bootstrap client = new Bootstrap();
            client.group(group)
                    .channel(NioSocketChannel.class) // 客户端采用java NIO 的通讯模型
                    .remoteAddress(new InetSocketAddress(host,port)) 
                    .handler(new ChannelInitializer<SocketChannel>() {
    
    
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
    
    
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                            }
                    });
            ChannelFuture sync = client.connect().sync();
            sync.channel().closeFuture().sync();
        }finally {
    
    
            group.shutdownGracefully().sync();
        }
    }

但是除此之外,netty也内置了其他方式的通讯模型:
(1) Epoll 模型,此方法底层原理是由JNI调用linux的epoll()实现的,因此此方法只能在Linux上调试,使用方法就是替换下面两个类:

private void start() throws InterruptedException {
    
    
        EventLoopGroup group = new EpollEventLoopGroup(); // 客户端采用Epoll模型
        try{
    
    
            // 客户端启动类必备
            Bootstrap client = new Bootstrap();
            client.group(group)
                    .channel(EpollSocketChannel.class) // 客户端采用Epoll模型
                    .remoteAddress(new InetSocketAddress(host,port)) 
                    .handler(new ChannelInitializer<SocketChannel>() {
    
    
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
    
    
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                            }
                    });
            ChannelFuture sync = client.connect().sync();
            sync.channel().closeFuture().sync();
        }finally {
    
    
            group.shutdownGracefully().sync();
        }
    }

但是改为Epoll模式后,我们在windows上是无法调试的,会报错:
在这里插入图片描述
注:Epoll模式和NIO模式,都是基于Reactor模型来实现的,不同点在与,Epoll整合很多linux系统独有的特性,如零拷贝,SO_REUSEPORT这个些特性,而NIO是JAVA在JVM层面上做的优化,Epoll性能比NIO更好,关于两者的对比,我单独出一篇博客详细分析两者的区别。
(2)OIO io.netty.channel.socket.oio 使用 java.net 包作为基础——使用阻塞流即BIO模式,但这个组件目前基本不会使用了,我们看netty关于OIO的方法都已经标注为过时:
在这里插入图片描述
(3)Local io.netty.channel.local 可以在 VM 内部通过管道进行通信的本地传输,这个通讯模型也很少使用了,因为已经在一个JVM内部的话,可以用直接内存的方式来实现通信,完全没必要进行socket调用。
(4)Embedded io.netty.channel.embedded Embedded 传输,允许使用 ChannelHandler 而又不需要一个真正的基于网络的传输。多用于测试 ChannelHandler 。
下面是一个测试用例:

  • 我们定义一个编码的handler,EmbeddTestHandler

public class EmbeddTestHandler extends MessageToMessageEncoder<ByteBuf> {
    
    
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
    
    
        // 取字节数组中首位进行编码
        byte[] array = byteBuf.array();
        String s = Arrays.toString(array);
        String s1 = new String(array, StandardCharsets.UTF_8);
        System.out.println(s);
        System.out.println("=================");
        System.out.println(s1);
        list.add(array[0]);
    }
}
  • 再定义一个基于Embedded的测试类
public class EmbeddTestHandlerTest {
    
    
    @Test
    public void testEmbedded(){
    
    
        ByteBuf byteBuf = Unpooled.buffer();
        String msg = "北京欢迎您";
        byteBuf.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
        //(2) 创建一个EmbeddedChannel,并安装一个测试的EmbeddTestHandler
        EmbeddedChannel channel = new EmbeddedChannel(new EmbeddTestHandler());
        //(3) 写入 ByteBuf,并断言调用 readOutbound()方法将会产生数据
        assertTrue(channel.writeOutbound(byteBuf));
        //(4) 将该 Channel 标记为已完成状态
        assertTrue(channel.finish());
        // read bytes
        //(5) 读取所产生的消息,并断言它包含了编码的值
        Byte code = channel.readOutbound();
        Byte checkCode = msg.getBytes(StandardCharsets.UTF_8)[0];
        assertEquals(code,checkCode);
        assertNull(channel.readOutbound());
    }
}

无需进行真实网络传输的一系列定义,即可进行handler的测试,下面是测试结果:
在这里插入图片描述
2. BootStrap引导类
netty中客户端和服务端各有一个BootStrap,其中客户端为Bootstrap,服务端为ServerBootstrap。
其中服务端的ServerBootstrap在监听端口和处理socketChannel可以使用两组线程模型:
这里定义了boss,work两组线程模型,其底层原理就是Reactor模型的主从模式,关于Reactor模型我将在零拷贝及NIO机制博文中深入探究

public void start() throws InterruptedException {
    
    
        final MessageCountHandler messageCountHandler = new MessageCountHandler();
        /*使用两个线程组*/
        EventLoopGroup boss  = new NioEventLoopGroup();
        EventLoopGroup work  = new NioEventLoopGroup();
        try {
    
    
            /*服务端启动必备*/
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss,work) // 采用Reactor主从线程模型
            .channel(NioServerSocketChannel.class)/*指定使用NIO的通信模式*/
                    //.option(ChannelOption.SO_BACKLOG)
            .localAddress(new InetSocketAddress(port))/*指定监听端口*/
                   // .childOption(ChannelOption.SO_RCVBUF)
                    //.childOption()
            //.handler();
            .childHandler(new ChannelInitializer<SocketChannel>() {
    
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
    
    
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(messageCountHandler); // 添加一个共享的hander到pipeline中
                    ch.pipeline().addLast(new EchoServerMCHandler());
                }
            });
            ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞到完成*/
            LOG.info("服务器启动完成");
            f.channel().closeFuture().sync();/*阻塞当前线程,直到服务器的ServerChannel被关闭*/
        } finally {
    
    
            boss.shutdownGracefully().sync();
            work.shutdownGracefully().sync();

        }
  1. ChannelInitializer
    在上面的BootStrap引导类示例中,我们来看这个逻辑:
  .childHandler(new ChannelInitializer<SocketChannel>() {
    
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
    
    
                    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                    ch.pipeline().addLast(messageCountHandler); // 添加一个共享的hander到pipeline中
                    ch.pipeline().addLast(new EchoServerMCHandler());
                }
            });

ChannelInitializer类主要用于向socketChannel的pipeline中添加Handler,我们来看下源码:
在这里插入图片描述
可以发现,ChannelInitializer本身也是一个handler,并且提供了一个对外的方法,来初始化pipeline:
在这里插入图片描述
这里涉及到了netty设计的一个细节,既然是handler那么一定自己的生命周期,我们来看下源码:
在这里插入图片描述
这个Handler在将其他handler添加到pipeLine中之后,会将自己从pipeline中移除,这个是netty编程的常见的一个细节
我们可以参照这个编程细节,用在自己的业务上,比如:
在我们自己的应用程序中,如果存在着某个 handler 只使用一次的情况,也可以仿造 ChannelInitializer,用完以后将自己从ChannelPipeline 中移除自己,比如授权 handler,某客户端第一次连接登录以后,进行授权检查,检查通过后就可以把这个授权 handler 移除了。如果客户端关闭连接下线,下次再连接的时候,就是一个新的连接,授权 handler 依然会被安装到 ChannelPipeline ,依然会进行授权检查。
4. ChannelOption
ChannelOption属性主要对应套接字中的参数:
首先看用法示例:

 private void doStart() throws InterruptedException {
    
    
        System.out.println("netty服务已启动");
        // 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
    
    
            // 创建服务器端引导类
            ServerBootstrap server = new ServerBootstrap();
            // 初始化服务器配置
            server.group(group) // 配置处理客户端的连接线程组
                    .channel(NioServerSocketChannel.class) // 指定channel为 NioServerSocketChannel
                    // 为socketChannel配置TCP参数
                    .option(ChannelOption.SO_LINGER,100)
                    .option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
                    .option(ChannelOption.SO_BACKLOG,100)
                    .option(ChannelOption.SO_REUSEADDR,true)
                    .option(ChannelOption.SO_KEEPALIVE,true)
                    .localAddress(port) // 配置服务端口号
                    // 为每个handler配置TCP参数
                    .childOption(ChannelOption.SO_SNDBUF,1024)
                    .childOption(ChannelOption.SO_RCVBUF,1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
    
     // 指定客户端通信的处理类,添加到pipline中,进行初始化
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
    
    
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            // 绑定端口,sync()会阻塞到完成
            ChannelFuture sync = server.bind().sync();
            // 阻塞当前线程,直到服务器的ServerChannel被关闭
            sync.channel().closeFuture().sync();
        }finally {
    
    
            // 关闭资源
            group.shutdownGracefully().sync();
        }
    }

其中介绍比较重要的几个参数:
(1)ChannelOption.SO_REUSEADDR:
ChanneOption.SO_REUSEADDR 对应于套接字选项中的 SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,
比如,多网卡(IP)绑定相同端口,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置 SO_REUSEADDR 就无法正常使用该端口。
但是注意,这个参数无法做到让应用绑定完全相同 IP + Port 来重复启动。
(2)ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE 参数对应于套接字选项中的 SO_KEEPALIVE,该参数用于设置 TCP 连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP 会自动发送一个活动探测数据报文。
(3)ChannelOption.SO_SNDBUF 和 ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF 参数对应于套接字选项中的 SO_SNDBUF,ChannelOption.SO_RCVBUF 参数对应于套接字选项中的 SO_RCVBUF 这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
(4)ChannelOption.SO_LINGER
ChannelOption.SO_LINGER 参数对应于套接字选项中的 SO_LINGER,Linux 内核默认的处理方式是当用户调用 close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用 SO_LINGER 可以阻塞 close()的调用时间,直到数据完全发送
(5)ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY 参数对应于套接字选项中的 TCP_NODELAY,该参数的使用与 Nagle 算法有关,Nagle 算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用 Nagle 算法,使用于小数据即时传输,于TCP_NODELAY 相对应的是 TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

  1. TCP的粘包和半包:
    网路传输过程中,客户端给服务端发送了报文a,b,c,但服务端收到报文时,是a+b的上半部分,b的下半部分+c两个包,并没有按照完整报文a,b,c来接收,这个问题称为半包,报文不完整。再比如a,b的报文比较小,则服务器收到的包为a+b,c两个包,其中a,b合在了一个包里,这个称之为粘包。
    发生的原因,在于TCP对于网络数据传输的处理优化,如果发送的网络数据包太小,那么他本身会启用 Nagle 算法,对较小的数据包进行合并然后再发送,这样就发生了粘包/半包的问题。即报文在传输过程中发生了拆包或合并。
    怎么防止这个现象出现呢,解决方案就是在报文中添加标识符或分隔符,让服务器端知道,一个完整的包的状态。
    (1)文本报文通用换行分隔符(适用于一行形式的报文)
  • 客户端处理:每个报文加回车换行符
 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
    
        ByteBuf msg = null;
        String request = "apple,pear,orange"
                + System.getProperty("line.separator");// 为每个报文末尾添加回车换行符
        for(int i=0;i<10;i++){
    
    
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }

    }

服务端处理,加回车换行符处理
LineBasedFrameDecoder是netty已经为我们实现好的,处理回车换行符的handler

 private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
    
    

        @Override
        protected void initChannel(Channel ch) throws Exception {
    
    
        	// 添加回车换行符处理的handler,校验报文的完整性 
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new LineBaseServerHandler());
        }
    }

(2)文本报文自定义分隔符(适用于一段形式的报文)

  • 服务端处理,在服务端约定一个自定义的分隔符:
    DelimiterBasedFrameDecoder 是netty已经为我们实现好的,处理自定义分隔符的handler
 // 在服务端约定一个自定义的分隔符
 public static final String My_SYMBOL = "#";
 private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
    
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
    
    
            ByteBuf delimiter = Unpooled.copiedBuffer(My_SYMBOL .getBytes());
            // 服务端添加一个自定义的分隔符处理handler
            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
            ch.pipeline().addLast(new DelimiterServerHandler());
        }
    }
  • 客户端处理,使用和服务端约定的分隔符:
public static final String My_SYMBOL = "#";
 private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
    
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
    
    
            ByteBuf delimiter = Unpooled.copiedBuffer(My_SYMBOL.getBytes());
            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
            ch.pipeline().addLast(new DelimiterClientHandler());
        }
    }

(3)二进制定长标识(适用于二进制报文)约定好每条报文的长度:

  • 客户端处理,每次发送给服务端报文前,将报文固定长度进行编码,一并发送过去:
public final static String REQUEST = "apple.orange,pear";
 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
    
        ByteBuf msg = null;
        for(int i=0;i<10;i++){
    
    
        	// 申请固定长度的buffer
            msg = Unpooled.buffer(REQUEST.length());
            msg.writeBytes(REQUEST.getBytes());
            ctx.writeAndFlush(msg);
        }
    }
  • 服务端处理,添加二进制长度域解码器来识别完整报文:
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
    
    
        @Override
        protected void initChannel(Channel ch) throws Exception {
    
    
        	// 添加二进制长度域解码器来识别完整报文
            ch.pipeline().addLast(new FixedLengthFrameDecoder(FixedLengthEchoClient.REQUEST.length()));
            ch.pipeline().addLast(new FixedLengthServerHandler());
        }
    }

(4)二进制基于长度域解码
channelRead 和 channelReadComplate的区别
channelRead 方法用于处理每次从通道中读取到的数据。
channelReadComplete 方法用于通知数据读取操作完成后进行后续处理。
在 Netty 中,channelRead 和 channelReadComplete 是 ChannelInboundHandler 接口中的两个重要方法,用于处理入站数据(从远程对等方传入的数据)。
channelRead 方法是在每次从通道中读取到数据时被调用的。当有数据从远程对等方传入时,Netty 会自动将数据包装成一个 ByteBuf 对象,并传递给相应的 ChannelInboundHandler 的 channelRead 方法进行处理。在这个方法中,你可以对接收到的数据进行解码、处理、转换等操作。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    
    
    // 在这里处理接收到的数据(msg),通常是 ByteBuf 对象
    // 例如,解码、处理数据等
}

channelReadComplete 方法是在一个通道的数据读取操作完成时被调用的。在 channelRead 方法中处理完数据后,Netty 会自动调用 channelReadComplete 方法,以通知处理器数据已经读取完成,可以进行后续的操作,例如回送响应或释放资源等。

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
    
    
    // 本次数据读取完成后的后续处理
    // 例如,回送响应或释放资源等
}

猜你喜欢

转载自blog.csdn.net/weixin_43830765/article/details/131796239
今日推荐