苦涩又难懂的io<3>

我们上节课讲了nio的大致模型,我服务端这块有一个serversocketchannel,他会一开始监听我们的端口上比如说 9000
然后把自己注册在selector上,当我们客户端这边有连接到服务端,呢实际上 我这个channel就会有一个accept事件,selector是一个多路复用的监听器,实际上就可以感知这个事件,呢他就会把我们这个socket和server真正的连接socket建立,并且把这个socketchannel也注册在我们的selector上。
假设有很多客户端都注册在我这个selector上,我selector 就会采用epoil模型取感知所有channel上面的变化,如果有事件的话 他就会调用selector.select()监听到所有的事件.拿到事件之后 他不管是连接事件还是读事件 或者说是写事件 他都会排队去处理
redis性能高的原因1.纯内存操作2. nio的事件 响应模型
selector 我实际上是监听所有注册在selector上的channel的,如果channel没有任何事件发生,呢么此时我selector.select是阻塞的
呢么只要channel上有事件发生,我selector只要感知到有事件发生,就会立即去处理么,这样的话讲道理 selector.selectedKeys() 始终只拿到的一个key啊,不是这样的
只要有一个channel上有事件,我selector感知到的话, 此时会有个延时操作, 并且把这个channel的事件放到队列里面比如说延时100ms,在这段事件内,可能也有其他channel的事件过来,等到100ms之后,实际上selector就拿到这个延时队列里面所有channel的事件了,这样他就可以一次处理多个请求了
我们现在就用netty实现以下我们bio/nio的通用模型,我客户端启动了 并且连到我服务端了,客户端发送消息 hellowService ,服务端接受消息 并且回复helloClient
Client
 public static void main(String[] args) throws Exception {
    
    
        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
    
    
            //创建客户端启动对象
            //注意客户端使用的不是ServerBootstrap而是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //设置相关参数
            bootstrap.group(group) //设置线程组
                    .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
                    .handler(new ChannelInitializer<SocketChannel>() {
    
    
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
    
    
                            //加入处理器
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("netty client start。。");
            //启动客户端去连接服务器端
            ChannelFuture cf = bootstrap.connect("localhost", 9000).sync();
            //对通道关闭进行监听
            cf.channel().closeFuture().sync();
        } finally {
    
    
            group.shutdownGracefully();
        }
    }
NettyClientHandler ..
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
    

    /**
     * 当客户端连接服务器完成就会触发该方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
    
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    //当通道有读取事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    
    
        cause.printStackTrace();
        ctx.close();
    }
   public static void main(String[] args) throws Exception {
    
    
        //创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
        // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
    
    
            //创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来配置参数
            bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                    .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
                    // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
    
    //创建通道初始化对象,设置初始化参数

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
    
    
                            //对workerGroup的SocketChannel设置处理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            //绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
            //启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
            ChannelFuture cf = bootstrap.bind(9000).sync();
            //给cf注册监听器,监听我们关心的事件
            /*cf.addListener(new ChannelFutureListener() {
    
    
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
    
    
                    if (cf.isSuccess()) {
    
    
                        System.out.println("监听端口9000成功");
                    } else {
    
    
                        System.out.println("监听端口9000失败");
                    }
                }
            });*/
            //对通道关闭进行监听,closeFuture是异步操作,监听通道关闭
            // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成
            cf.channel().closeFuture().sync();
        } finally {
    
    
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
/**
 * 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
    

    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文对象, 含有通道channel,管道pipeline
     * @param msg 就是客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    
        System.out.println("服务器读取线程 " + Thread.currentThread().getName());
        //Channel channel = ctx.channel();
        //ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
        //将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕处理方法
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
    
        ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    /**
     * 处理异常, 一般是需要关闭通道
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    
    
        ctx.close();
    }
}
netty代码大致是这样的,我先讲服务端
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
这2个线程组先不要看
2.ServerBootstrap 这个就相当于服务端,然后
2.1 .channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现22 
2.2  .option(ChannelOption.SO_BACKLOG, 1024)// 然后opotion 这个目前先不要看, 可以对服务方进行定制化配置,也就是说回头要对服务方进行配置的话可以用option 这个api进行配置
2.3  .childHandler(new ChannelInitializer<SocketChannel>() {
    
    //创建通道初始化对象,设置初始化参数

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
    
    
                            //对workerGroup的SocketChannel设置处理器
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });  这个是个回调函数
2.4   ChannelFuture cf = bootstrap.bind(9000).sync();  这个就是我们服务方跟端口绑定 绑定一个9000的端口号,
2.5 接下来  cf.channel().closeFuture().sync(); 这个暂时不用看
 然后一个finally 关闭线程组,释放资源

在这里插入图片描述

NettyServerHandler 这个nettyServerHander是自己写的类,这个类中就是我们接受socket请求后要执行的逻辑,比方说客户端发消息后,他收到消息之后真正要执行的逻辑

在这里插入图片描述

自定义Handler需要继承netty规定好的某个HandlerAdapter(规范),我就可以处理我们的channel联通好的事件,比方说读事件,写事件,

在这里插入图片描述

这里有一个channelRead,比如说客户端发送过来的数据 我可能就通过这个channelRead这个方法中获取,
当 服务端读完之后 通过channelReadComplete,此时我们就可以回写数据返回给客户端
exceptionCaught 这个方法就是异常信息的抓铺
这大概就是service端大致代码逻辑,我们接下来看看客户端代码逻辑
 //客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup(); 同样此时这个线程组我们先不用管
2,1 Bootstrap  这个可以理解我们的客户端
2,2 .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现 
2,3 同样此时配置个 NettyClientHandler
2.4   ChannelFuture cf = bootstrap.connect("localhost", 9000).sync();
同样此时我们需要用客户端连接本地的9000端口,
2.5   cf.channel().closeFuture().sync(); 这个先不用管
2.6   group.shutdownGracefully(); 关闭线程组
NettyClientHandler 这里也是需要实现 ChannelInboundHandlerAdapter
channelActive这个方法 是 当客户端连接服务器完成就会触发该方法
channelRead 这个方法上面讲了 是接收socket上面的数据的 也就是说服务方给客户端返回的数据
也就是说这段代码的大致逻辑是 
客户端连接服务端 触发 channelActive 然后发送HelloServer给服务端, 服务端read数据然后 再回了一个HelloClient ,之后client就会接受到这个数据,这样的大体流程
刚刚遗留一个问题就是说 客户端和服务端都有一个线程组
 这个线程组是个什么意思
         EventLoopGroup workerGroup = new NioEventLoopGroup();
         我们需要看之前的io模型
我们再看下io模型的推演
我们的io模型
1.刚开始是单线程的bio,这样有缺点 accept,read 会有阻塞
2.后来是多线程的bio 就是说前端每一个请求,后台开一个线程,然后把线程丢到线程池中处理, 这样虽然说没有了阻塞,但是扛不住并发问题,
3.再后来就是现在的nio,基于事件驱动模型
其实nio实际上是有问题的	 我客户端万一有几万个连接事件,或者说几十万个读写事件,此时把这些事件都交给selector去处理
比方说有selector.select()收集到10个事件,我至少要把这10个事件执行完毕之后,我才可以继续selector.select() 收集事件,在高并发情况下会有延迟的[事件特别多],
4.所以nio的模型还可以继续优化,当客户端跟服务端建立连接的时候,有一个专门的selector,这个selector只做一件事情,就是只处理客户端的连接,也就是说 我只负责新客户端的连接操作,或者说我只接受accept事件,然后我会让另外一个selector专门处理那些已经建立好的通道的读写或者io事件,

猜你喜欢

转载自blog.csdn.net/weixin_43689953/article/details/116572923