网络编程18

AbstractUnsafe

  • register方法:把channel和selector挂钩,如果pipeline的通道已经是active状态的,会调用beginRead方法,beginRead方法又会调用doBeginRead方法,把真正关心的事件进行注册

服务器启动流程

  • 1.首先创造了一个eventloopgroup,支持指定线程数,如果不传,默认会从配置中读取,如果配置没有,就是CPU逻辑核心数乘2,io操作的线程数一般都是这么设置的,会进入到MultithreadEventExecutorGroup方法中

    • MultithreadEventExecutorGroup

      1.executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

      是这样一个执行器,初始化时创建了一个线程工厂,并且在execute方法里面就会创建一个新的线程,其中对应于下面bind—doBind—initAndRegister—init中加入接受连接的handler的异步任务,每一个eventloopgroup都会有一个executor。

      2.children就是每一个eventloop。

      3.按照传进来的线程数,依次进行相关的数组循环,每一个数组元素进行newChild操作,就是一个eventloop。

      eventloop的构造方法里面,把selector相关的参数初始化出来,selector就是通过openSelector获得的。

    #所以第一步,new出来一个eventloopgroup的时候,就是按照cpu核心数创建相对应数量的eventloop,在每一个eventloop里面初始化出一个selector,方便后面channel将关注的事件注册到上面,而NioEventLoop里面有一个专门的run方法,在一个for循环里面,不断调用selector相关方法,处理各种事件集,netty这里有一个小小的技巧,因为NioEventLoop除了处理io事件之外,有一些系统的任务也是由它处理,需要进行区分,引入了一个ioRatio参数,即处理io事件的比例,根据比例来分配资源处理io事件,所以eventloop会判断执行当前任务的线程是不是eventloop本身的线程,缺省情况ioRatio是50%,即一半一半,eventloop里面包装了一个线程,但是这个线程不是在NioEventLoop中声明的,而是在它的父类SingleThreadEventExecutors,里面声明了thread变量和taskQueue队列,而eventloopgroup里面execute会创建一个线程并执行,但是还没有和eventloop里面的线程挂钩,在哪里挂钩?

  • 2.拿到ServerBootstrap 这个服务器启动必备的实例

  • 3.把channel、通讯地址、handler这些参数set到ServerBootstrap 中,唯一有点的区别的是channel,它设置的class对象,是反射的方法拿到每一个channel的相关实例

    b.group(group)
     .channel(NioServerSocketChannel.class)
     .localAddress(new InetSocketAddress(port)
     .childHandler(new ChannelInitializer<SocketChannel>() {
          
          
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
          
          
             // 把新创建的handler加到SocketChannel里面
             ch.pipeline().addLast(serverHandler);
         }
     });
    
  • 4.ChannelFuture f = b.bind().sync();

    其中bind方法是非常关键的,是关注的重点,它是有它的父类AbstractBootstrap实现的,而bind方法又跳进doBind方法,

    • doBind方法

      1.执行initAndRegister()方法

      1.1.通过反射工厂new出channel来

      $NioServerSocketChannel在初始化的时候做了什么?

      拿到原生jdk中的网络编程的socketChannel,设定当前NioServerSocketChannel关注的事件是接受连接事件,但是还没有往selector上面注册

      $在AbstractChannel的构造方法里面做了什么?

      会new出一个pipeline,用的是DefaultChannelPipeline,会在构造方法中创建两个handler,head和tail

      #newChannel会创建一个defaultPipeline,而pipeline已经有两个handler了

      ***总结1.1:把serverSocketChannel new出来,配置接受连接事件,并创建好pipeline

      1.2.init(channel)------初始化channel,首先对serverSocketChannel各种相关参数进行配置,把子channel的相关参数也设置好,往pipeline里面加入handler,同时除了我们set进来的handler,给我们的主channel加一个netty自带的ServerBootstrapAcceptor的handler,主channel里面就有3个handler了,这个handler是用来接收连接的,所以从理论上来说是入站handler,所以要来处理channelRead事件,而channelRead中接收的Object-msg本质上是一个一个的相关channel,对这些channel做一些处理,就是用之前自己set进去的handler进行过滤,我们定义的serverBootstrap.childHandler,包括编码解码、解决粘包半包等各种handler,所以给msg里面创造出来的每一个子channel都加入我们自己定义的子handler,并进行相关的注册,

      @Override
      public void execute(Runnable task) {
              
              
          if (task == null) {
              
              
              throw new NullPointerException("task");
          }
      
          boolean inEventLoop = inEventLoop();
          addTask(task);
          if (!inEventLoop) {
              
              
              startThread();
              if (isShutdown() && removeTask(task)) {
              
              
                  reject();
              }
          }
      
          if (!addTaskWakesUp && wakesUpForTask(task)) {
              
              
              wakeup(inEventLoop);
          }
      }
      // 注意这里的加入是异步加入的,所以会执行startThread方法
      
      private void startThread() {
              
              
          if (state == ST_NOT_STARTED) {
              
              
              if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
              
              
                  try {
              
              
                      doStartThread();
                  } catch (Throwable cause) {
              
              
                      STATE_UPDATER.set(this, ST_NOT_STARTED);
                      PlatformDependent.throwException(cause);
                  }
              }
          }
      }
      // 然后继续执行doStartThread方法
      
      private void doStartThread() {
              
              
          assert thread == null;
          executor.execute(new Runnable() {
              
              
              @Override
              public void run() {
              
              
                  // Thread.currentThread()不是主线程了
                  // 是打包成Runnable,放到executor.execute中执行
                  // 而这个runnable就被打包到eventloopgroup中new出来的executor中执行execute方法时的Runnable command
                  // 而指向runnable方法的线程就是NioEventLoop里面它应该持有的线程来执行的
                  // thread是eventloop自己的线程
                  // 把eventloop和执行的单线程挂钩
                  thread = Thread.currentThread();
                  if (interrupted) {
              
              
                      thread.interrupt();
                  }
      
                  boolean success = false;
                  updateLastExecutionTime();
                  try {
              
              
                      // 进行事件轮询,做各种io事件的处理
                      SingleThreadEventExecutor.this.run();
                      success = true;
                  } catch (Throwable t) {
              
              
                      logger.warn("Unexpected exception from an event executor: ", t);
                  } finally {
              
              
                      for (;;) {
              
              
                          int oldState = state;
                          if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                              SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
              
              
                              break;
                          }
                      }
      
                      // Check if confirmShutdown() was called at the end of the loop.
                      if (success && gracefulShutdownStartTime == 0) {
              
              
                          if (logger.isErrorEnabled()) {
              
              
                              logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                           SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                           "be called before run() implementation terminates.");
                          }
                      }
      
                      try {
              
              
                          // Run all remaining tasks and shutdown hooks.
                          for (;;) {
              
              
                              if (confirmShutdown()) {
              
              
                                  break;
                              }
                          }
                      } finally {
              
              
                          try {
              
              
                              cleanup();
                          } finally {
              
              
                              STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                              threadLock.release();
                              if (!taskQueue.isEmpty()) {
              
              
                                  if (logger.isWarnEnabled()) {
              
              
                                      logger.warn("An event executor terminated with " +
                                                  "non-empty task queue (" + taskQueue.size() + ')');
                                  }
                              }
      
                              terminationFuture.setSuccess(null);
                          }
                      }
                  }
              }
          });
      }
      

      $为什么channelRead中接受的msg是channel?

      (1).等到看到处理服务器接受连接事件的时候就知道了

      (2).AbstractNioMessageChannel里面doReadMessage接受了一个list,netty是按照channel处理的,对于服务端的serverSocketChannel,它处理的channelRead,自然就是channel,传入类型是object,而不是一个byte数组,至于怎么传进来的,在后面服务器接受连接事件时就明白了

      $我们写的各种子channel里面有pipeline,那父channel里面也有pipeline吗?

      一样是有的,pipeline是在AbstractChannel里面创建的,所以它的子类里面肯定继承了它所有的成员变量,不管是NioServerSocketChannel还是NioSocketChannel里面,都有pipeline,只是说对于NioServerSocketChannel而言,我们只是不加各种handler的,一样是可以加的,只是在网络通讯中,我们关注的网络数据的处理,所以去给每一个socketChannel去加handler,

      ***总结1.2:把eventloop当前的线程创造出来,并且和eventloop进行挂钩,并且加入一个ServerBootstrapAcceptor,入站专门处理客户端连接的handler,以及启动selector相关的方法,不断轮询,准备处理各种事件

      1.3config().group().register(channel),并进行注册相关工作

      &register是一个抽象方法,通过MultithreadEventLoopGroup实现的register方法,MultithreadEventLoopGroup继续调用SingleThreadEventLoop的register方法进行注册,这个方法又调用了promise.channel().unsafe().register(this, promise)来进行注册,也就是unsafe的register进行注册,这个register主要是把当前channel和selector进行挂钩,但是还没有真正写入关注的事件,注册的是一个空事件

      2.根据regFuture,通过promise判断初始化和注册做完没有,如果做完了,直接调用doBind0方法,如果没有做完,加入一个侦听器,等到初始化和注册做完了之后,调用侦听器里面的方法,最终起始还是调用了doBind0方法

      2.1doBind0方法

      通过一个异步的方式来执行channel.bind方法,channel.bind其实会调用channelPipeline.bind的绑定方法,意味着这个事件开始在serverSocketChannel上面所属的pipeline上开始传播,pipeline上有3个handler-----head、serverAccept、tail,只有head是出站和入站兼有,其余两个都是入站,因为bind是出站操作(因为是服务器端接收数据后,做出的响应),所以最终由head来执行绑定方法

      $HeadContext.bind方法

      该方法调用unsafe.bind(),又跳到AbstractUnsafe方法里面的bind方法,这里要调用的是AbstractChannel的doBind(localAddress)方法,是一个抽象方法,又跳到NioServerSocketChannel里的doBind方法,里面jdk原生的bind操作,当绑定操作执行完后,又回到AbstractChannel,触发主channel的fireChannelActive()事件,channelActive是一个入站事件(所有以fire开头的都是入站事件),同样要在3个handler上面进行流转,

      $$channelActive流转

      (1)head,ctx.fireChannelActive()继续往后面传播,readIfIsAutoRead()中调用channel.read方法,又调用了pipeline.read事件,read是一个出站操作,所以只有head处理,调用read方法,又调用unsafe.beginRead()方法,又调用AbstractUnsafe.doBeginRead方法,又调用AbstractNioChannel.doBeginRead方法,才真正把当前channel关注的事件和selector进行挂钩

      到此时才算完成了bind操作,我们自己实现服务器时很简单的两行代码,但是netty底层帮我们走完了这些复杂的逻辑

selector什么时候open的

  • eventloop的构造方法中

猜你喜欢

转载自blog.csdn.net/Markland_l/article/details/114611912