Netty源码分析----NioEventLoop之初始化过程及线程模型

(*文章基于Netty4.1.22版本)
Netty的线程模式网上很多文章都有介绍了,很多文章写得也好,加上我的表达能力不太好,这块线程模型的就不分析了,这篇文章主要讲一下Netty线程模型底层实现的细节。

线程线程,肯定就是有线程去处理的,但是Netty的线程不是简简单单用一个Thread或者ThreadPool是去实现那样的一个线程模型,其核心是一个叫做EventLoop的东西,这个可以看成是一个Thread的封装、抽象,以NIO为例,用到的就是NioEventLoop了。

上面说了EventLoop是一个Thread,在一般的应用程序中,使用Thread不会说就直接使用的,而是通过ThreadPool去使用ThreadPool,那么EventLoop也有一个类似的东西,叫做EventLoopGroup,看名字就可以知道其是多个EventLoop的集合,也就是多个Thread的集合,也就是类似ThreadPool的概念。

源码分析

在Netty的demo中,开头需要初始化两个EventLoopGroup

EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

这里分成了两个线程组,一个叫boss线程组,一个叫worker线程组(构造方法参数为空,代表创建cpu核心数*2个线程)。
有三种线程模型,单线程模型,多线程模型,主从多线程模型,Netty中代表如下:
1. 单线程模型:bossGroup和workerGroup共用,设置一个线程
2. 多线程模型:bossGroup设置一个线程,workerGroup设置多个线程
3. 主从多线程模型:bossGroup和workerGroup均使用多个线程

注意:在Netty中,服务启动的时候,调用bind方法,会将Channel注册到一个EventLoop上,所以一般我们调用bind一次,只会创建一个线程去接收请求,即实际是使用第二种模型。

NioEventLoopGroup

看下构造方法

    public NioEventLoopGroup() {
        this(0);
    }
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

这里讲线程数设置为0,其他的都是默认值,然后调用父类的构造方法,其父类为MultithreadEventLoopGroup

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

当没有设置的话,就设置为CPU核心是*2,然后继续调用父类的构造方法

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        // ....
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {// 初始化nThreads个线程
            boolean success = false;
            try {
                children[i] = newChild(executor, args);// 初始化一个EventLoop
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    //....
                }
            }
        }
        // 创建一个选择器,用于从数组中选择一个EventLoop进行使用
        chooser = chooserFactory.newChooser(children);

        //....

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

newChild方法在NioEventLoopGroup中实现

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

EventLoopGroup在初始化的时候,会初始化多个EeventLoop,作为一个数组存在,数量是指定的或者默认cpu核心数*2

代码中chooser这个东西是干嘛的,回顾一下服务启动中有句代码,调用EventLoopGroup去注册一个Channel的时候

    final ChannelFuture initAndRegister() {
        // ....
        ChannelFuture regFuture = config().group().register(channel);
        //....
        return regFuture;
    }
    //MultithreadEventLoopGroup.java
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

首先会调用next方法,该方法返回一个EventLoop,实现如下:

    public EventExecutor next() {
        return chooser.next();
    }

即如何从数组中获取EeventLooph是有策略的,这个策略就是选择器chooser,再看下chooserFactory.newChooser和如何创建一个新的选择器的,由于EventExecutorChooserFactory只有一个实现DefaultEventExecutorChooserFactory,直接看下这个类的方法

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {// 数量是2的n次方
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

根据数量是否是2的n次方有两种策略,那么看下两种策略的next方法有什么不同

        // ....
        private final AtomicInteger idx = new AtomicInteger();
        // ....
        // PowerOfTwoEventExecutorChooser
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
        //GenericEventExecutorChooser
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }

两种算法的结果是一样的,从0开始轮询选择,但是如果是2的n次方,会选择位运算,因为这种方式性能更好,在Netty的内存分配也运用了大量的位运算,而这的前提就是在一开始已经限制了内容分配的大小为2的n次方

NioEventLoop

分析完NioEventLoopGroup后,知道NioEventLoopGroup有多个NioEventLoop,NioEventLoopGroup只是负责分配和保存NioEventLoop,实际处理事情的是NioEventLoop,看下其构造方法

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        //....
        provider = selectorProvider;
        // 打开一个Selector并封装成SelectorTuple
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

NioEventLoop构造方法主要是初始化selector,NIO中将Channel注册到selector上,对于Netty来说就是将Channel注册到NioEventLoop上

看下其父类SingleThreadEventLoop的构造方法

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        // 这个队列后面会和另外几个队列一起分析
        tailTasks = newTaskQueue(maxPendingTasks);
    }

父类SingleThreadEventLoop的构造方法

    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory,
            boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
        this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
    }
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);// 保存EventLoopGroup
        // 如果为false,那么在有新任务的时候会唤醒Selector
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);//队列数量
        this.executor = ObjectUtil.checkNotNull(executor, "executor");//executor负责创建线程执行任务
        taskQueue = newTaskQueue(this.maxPendingTasks);//任务队列
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

总结一下,NioEventLoop初始化的时候做了几件事:
1. 初始化Selector
2. 初始化任务队列
3. 保存线程相关的一些属性

一开始在介绍的时候EventLoop是类似Thread的概念,那么EventLoop是如何与线程关联的呢?

在NioEventLoop的构造方法中,有个叫Executor的东西,这个在NioEventLoopGroup的父类MultithreadEventExecutorGroup构造方法中初始化

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    // ....
      protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass());
    }

看下ThreadPerTaskExecutor

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

这个东西就是类似ThreadPoolExecutor,调用execute方法的时候,会创建一个线程去执行任务。
- 想一下,线程池中,会限制线程数量,Netty如何去限制线程数量呢

我们已经知道,NioEventLoopGroup有限制了NioEventLoop的数量,那么这样的话,只需要NioEventLoop对应一定数量的Thread,Netty就相当于实现了线程池的作用(当然机制不一样)
NioEventLoop对应几个Thread,要看一个NioEventLoop中的ThreadPerTaskExecutor的execute方法执行了几遍,找到其使用到的地方

//io.netty.util.concurrent.SingleThreadEventExecutor.doStartThread()
private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
              thread = Thread.currentThread();
              //....
            }
        });
}

doStartThread调用了一次execute方法,找到doStartThread调用的地方(注意execute执行后会将初始化Thread属性,设置其为当前线程,从这里也可以看出一个EventLoop对应一个Thread)

//io.netty.util.concurrent.SingleThreadEventExecutor.execute(Runnable)
    public void execute(Runnable task) {
        //....
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            //....
        } else {
            startThread();// 这里调用了doStartThread
            //....
        }
        //....
    }

当inEventLoop为false调用startThread一次,Netty有两个地方限制了executor.execute只会调用一次,一个是doStartThread里第一句代码,当thread为空才能继续执行,另外一个是startThread方法

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                   //....
                }
            }
        }
    }

可以看到,这里在state为ST_NOT_STARTED的时候才会调用doStartThread方法,且这之前还会使用cas将状态变化

总结一下:
1. NioEventLoop对应一个线程,其父类中也有一个Thread类型的变量
2. NioEventLoopGroup包含多个NioEventLoop,起到线程池的作用
3. 调用NioEventLoop的execute方法执行一个任务的时候,会委托给Executor,即ThreadPerTaskExecutor,而ThreadPerTaskExecutor执行的时候会通过DefaultThreadFactory创建一个Thread去执行这个任务
4. Netty通过thread属性是否为空和一个状态位控制ThreadPerTaskExecutor的execute只会创建一次,即只会创建一个线程
5. 第一次调用的时候会创建一个线程并保存到thread属性,后面调用的时候将任务放到队列中等待第一次创建的线程去执行

猜你喜欢

转载自blog.csdn.net/u013160932/article/details/80646502