我们在研究Netty对channel的注册实现时
ChannelFuture regFuture = config().group().register(channel);
其中config()方法返回ServerBootstrapConfig对象,其中带有ServerBootstrap的引用。
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
其中的group()方法返回赋值在ServerBootstrap的group属性上的NioEventLoopGroup对象。
/** * Returns the configured {@link EventLoopGroup} or {@code null} if non is configured yet. */ @SuppressWarnings("deprecation") public final EventLoopGroup group() { return bootstrap.group(); }
其中register()开始了正在的注册过程,调用的是NioEventLoopGroup对象的register方法。
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
其中next()方法会返回一个EventLoop对象,它是在NioEventLoopGroup的父类MultithreadEventLoopGroup中被调用的。
public EventLoop next() { return (EventLoop) super.next();
其中又调用了MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup中的next方法
@Override public EventExecutor next() { return chooser.next(); }
而我们使用的chooser又是什么呢?他是一个EventExecutorChooser,被chooser工厂创建。在初始化NioEventLoopGroup的时候被初始化赋值。
chooser = chooserFactory.newChooser(children);
那我们的chooserFactory又是什么呢?
根据我们在初始化NioEventLoopGroup的过程中,频繁的调用父类的一些构造方法就已经被创建了。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
可以看出,他是DefaultEventExecutorChooserFactory的静态成员变量,内部就是一个实例而已。
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
所以回到我们的chooserFactory.newChooser(children)方法
@SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
这会根据传入参数的长度进行判断,如果是二的指数值的话,就会选择一个PowerOfTwoEventExecutorChooser实例,反之亦然。这两个类其实就是一个负载均衡的简单算法,采用轮询的方式。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { //取模-1 return executors[idx.getAndIncrement() & executors.length - 1]; } }
我们传入的参数executors是什么呢?在上面可以看出在初始化NioEventLoopGroup的过程中会传入children,这个就是它的参数。
而children又是什么呢?
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args);
可以看出,children是一个EventExecutor的数组,这个数组的长度是传入的线程数量,里面的值是EventLoopGroup的newChild方法返回的实例。
因为我们当时是初始化NioEventLoopGroup,所以newChild方法就存在于MultithreadEventExecutorGroup的子类NioEventLoopGroup当中。
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
总结:至此我们可以看出最开始的next方法返回的是一个NioEventLoop,所以随后调用的register方法也是NioEventLooop的register方法。在其父类SingleThreadEventLoop中找到:
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
其中channel()返回当初传入的channel,
其中unsafe()返回初始化channel时创建的unsafe属性,是一个NioMessageUnsafe实例。
所以register方法就是可以定位到这个实例上了,而这个register方法真正的实现却在它的父类AbstractUnsafe上,
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) {//刚开始,肯定不在事件循环当中 register0(promise); } else { try { eventLoop.execute(new Runnable() {//执行下面的代码,因为eventloop本身也是一个执行器,所以会执行启动一个线程取执行任务 @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
这个execute方法在NioEventLoop的父类SingleThreadEventExecutor当中
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { doStartThread(); } } }
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { //executor为我们在初始化eventloop时传入的一个excutor,类型为ThreadPerTaskExecutor,一个任务一个线程的执行器 //里面有一个线程工厂,会利用线程工厂创建一个线程,然后启动线程,执行传入的任务。 @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run();//执行这个eventloop的run方法, success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); }
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys();//处理Slectedkey,看到希望了 } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//运行所有的任务 } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
private void processSelectedKeys() { if (selectedKeys != null) {//原本就没有selectedKeys,为空 processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys());//空 } }
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return;//返回 } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { afterRunningAllTasks(); return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }