我们在研究注册过程中的源码可以看到这样的一段代码
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });
那么这个execute方法里面的流程究竟是什么样的?因为NioEventLoop本身继承了SingleThreadEventExecutor,所以本身会调用父类的execute方法:
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop();//判断eventLoop中的线程是否已经启动 if (inEventLoop) { addTask(task);//如果是,则添加任务 } else { startThread();//如果不是,则启动线程 addTask(task);//然后添加任务 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }所以eventLoop的execute方法只会讲Runnable对象作为任务,添加的某个任务列表中。
在启动线程的方法中,跟踪,会有这样的代码
private void doStartThread() { assert thread == null; executor.execute(new Runnable() {//值得注意的是这个executor的类是我们初始化eventloopgroup时分配好的一个ThreadPerTaskExecutor //实例,里面有一个线程工厂的实例,用于创建新的线程来执行传入的Runnable任务。 @Override public void run() { thread = Thread.currentThread();//这里就对eventloop中的thread进行赋值,以后对eventLoop的excute调用将可能会返回true. if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run();//调用eventLoop的run方法,进行循环。 success = true; } catch (Throwable t) {
所以这个地方会立马新建并启动一个线程,执行Runnable.
跟踪到run方法中,里面会是一个死循环,我们挑出关键代码
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {//计算出选择策略,可以得出是select case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false));//这里就是NIO中的selector.select(),然后将selectedKey赋予
if (ioRatio == 100) {//默认值为50 try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys();//处理selectedKeys,,很激动,不就是Nio中对于Selector.selectedKeys的处理嘛。。。 } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//执行添加到任务列表中的任务 } }
看一下processSelectedKeys的代码
private void processSelectedKeys() { if (selectedKeys != null) {//这个是初始化形成的,是一个1024个selectKey的数组,所以判定不为空,但是里面没有值,所以size=0 processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) {//刚开始会直接返回,因为size=0 final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
随后就是执行任务,这样就死循环就会一直在这个线程中执行。
我们回到添加任务的方法那,从最开始,我们知道我们添加的是一个执行注册方法的任务
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task);//所以,可以看出是添加到一个taskQueue中。Queue<Runnable> taskQueue }
而我们在那个线程中的死循环中是不断的执行所有的任务的,
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks(); return ranAtLeastOne; }
所以到在采取运行注册的任务
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
我们在执行完register0()里的doRegister()之后
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister();//执行具体的注册流程 neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded();//我们会调用之前addLast挂起的一些任务,最终完成channelHandler的添加, //传播添加handler成功的事件 safeSetSuccess(promise); pipeline.fireChannelRegistered();//传播注册完成事件 // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }