文章目录
1. 服务端启动步骤
一个 Netty 服务端配置启动代码如下,其大略的流程为以下几个步骤:
- 创建 NioEventLoopGroup 实例,这个类是 Netty 的 Reactor 线程池实现之一,其实际为 EventLoop 的容器,而 EventLoop 的主要职责是处理所有注册到本线程多路复用器 Selector 上的 Channel。代码中创建了两个 NioEventLoopGroup 实例,这是 主从 Reactor 多线程模式 的体现
- 创建 ServerBootstrap 实例,该类是 Netty 服务端的启动辅助类,提供一系列方法用于设置服务端启动相关的参数
- ServerBootstrap 对象调用 group() 方法设置并绑定主从 Reactor 线程池
- ServerBootstrap 对象调用 channel() 方法设置服务端 Channel 实例类型
- ServerBootstrap 对象调用 handler() 方法设置服务端 MainReactor 线程池业务处理器
- ServerBootstrap 对象调用带 child 前缀的方法设置服务端 SubReactor 线程池配置,比如 childHandler() 方法设置SubReactor 线程池业务处理器
- ServerBootstrap 对象调用 bind() 方法绑定并启动监听端口
public class NettyServer {
private static final int DEFAULT_PORT = 10086;
public static void start() {
new Thread(() -> {
// 创建监听线程组, 监听客户端请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 创建工作线程组, 处理请求
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 服务器辅助启动类配置
new ServerBootstrap().group(bossGroup, workerGroup)
// 设置 channel 类型为NIO类型
.channel(NioServerSocketChannel.class)
// 设置连接配置参数
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler())
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
// 配置入站、出站事件handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 配置入站、出站事件channel
ch.pipeline()
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new ServerHandler());
}
})
// 绑定端口
.bind(DEFAULT_PORT)
.addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 端口[" + DEFAULT_PORT + "]绑定成功!");
} else {
System.err.println("端口[" + DEFAULT_PORT + "]绑定失败!");
}
});
}, "Server").start();
}
}
2. 服务端启动流程详解
服务端启动流程图如下,各组件间的关系可参考 Netty源码分析(1)-核心组件与架构,其主要流程可以分为以下几个部分:
- 事件分发组件的配置及初始化
- 业务处理组件 Channel 的初始化及注册,注册过程即事件循环线程的启动过程
- 绑定服务端 ServerSocketChannel 到指定的端口
2.1 事件分发组件配置及初始化
这部分对应以上流程图步骤 1-5,其中比较关键的流程如下:
-
new NioEventLoopGroup()
创建 Reactor 线程池实例,其关键逻辑在父类MultithreadEventExecutorGroup
的构造方法中,重要步骤如下:new ThreadPerTaskExecutor(new DefaultThreadFactory())
生成 Executor 实例,并指定其线程工厂- 调用
newChild()
方法为当前 group 新建 NioEventLoop 实例,并指定其 Executor 入参为 ThreadPerTaskExecutor 对象,该对象后续将用于创建和启动 EventLoop 线程 - 如果有一个 NioEventLoop 实例新建失败,调用已创建的每个 NioEventLoop 实例的 shutdownGracefully() 方法启动事件循环线程
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { // #1 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // #2 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { // #3 children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
-
new ServerBootstrap().group(bossGroup, workerGroup)
新建 ServerBootstrap 对象,并调用其group()
方法配置好主从 Reactor 线程池。其中 MainReactor 线程池将会保存在AbstractBootstrap.group 变量
,SubReactor 线程池以ServerBootstrap.childGroup 变量
保存public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
-
ServerBootstrap#channel()
方法实际配置了一个ReflectiveChannelFactory
工厂类,用于创建指定的 NioServerSocketChannel 对象public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
-
以
ServerBootstrap#childHandler()
方法为例,带 child 前缀的方法都用来配置 SubReactor 线程池所需的处理器等,这些配置在 MainReactor 将连接注册到 SubReactor 上之后 IO 读写会用到public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } this.childHandler = childHandler; return this; }
2.2 业务处理组件 Channel 的初始化及注册
-
ServerBootstrap#bind()
方法实际调用到AbstractBootstrap#doBind()
方法,这个方法是整个服务端启动的入口,主要分为了以下 2 个部分,本节主要分析initAndRegister()
流程initAndRegister()
初始化 Channel,并将其注册到 Selector 上doBind0()
将 Channel 绑定监听指定端口
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
-
AbstractBootstrap#initAndRegister()
方法的作用见名知意,其作用就是初始化 channel 并完成注册,其中关键的部分如下:- 首先通过配置的Channel 工厂类创建指定 Channel 对象,然后通过 init(channel) 方法初始化 Channel
- config().group().register(channel) 将初始化完毕的 Channel 注册到 MainReactor 中某个事件循环线程 Selector 上
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
-
channelFactory.newChannel()
通过反射调用了NioServerSocketChannel
无参构造方法,可知其newSocket()
方法实际是返回了一个 JDK 中的ServerSocketChannel
对象,NioServerSocketChannel
实际就是这个对象的进一步封装。在NioServerSocketChannel
的有参构造中可以看到,其调用了父类构造方法,并指定了其监听的就绪事件为SelectionKey.OP_ACCEPT
。此处往上追溯到其AbstractNioChannel
父类构造方法,可以看到 JDK 中 NIO 设置非阻塞的标准操作,并保留了监听的就绪事件的标识readInterestOp
,这个标识会在之后开始读取 IO 数据的时候用于设置监听事件public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } // AbstractNioChannel 构造方法 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
-
追溯
NioServerSocketChannel
父类构造方法最终来到AbstractChannel
,可以到其主要有两个动作:- newUnsafe() 新建 NioMessageUnsafe 对象,该类负责实际的 IO 读写动作
- newChannelPipeline() 新建 DefaultChannelPipeline 对象
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
-
DefaultChannelPipeline
构造方法中会新建TailContext
和HeadContext
对象,并将其前后指针互相指向对象,形成双向处理链表protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
-
Channel 对象的创建告一段落,回到初始化 Channel 的方法
AbstractBootstrap#init()
。这是个抽象方法,其实现为ServerBootstrap#init()
。这个方法中最重要的逻辑就是代码p.addLast()
调用的部分,这部分ChannelInitializer
对象被添加到流处理链,会在之后的流程中将ServerBootstrapAcceptor
注册到 MainReactor 的处理链中void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
-
进入
DefaultChannelPipeline#addLast()
方法,可以看到其内部主要做了 3 件事:- newContext() 将 ChannelHandler 封装到 DefaultChannelHandlerContext 对象
- addLast0() 将新建的 ChannelHandlerContext 对象加入到双向链表中
- 此时 Channel 还没有完成注册,callHandlerCallbackLater() 方法将新建一个 PendingHandlerAddedTask 对象,用于注册完成后执行 callHandlerAdded0() 方法回调处理器实现的 handlerAdd() 方法
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } callHandlerAdded0(newCtx); return this; }
-
Channel 的初始化结束,接下来就是注册 Channel。
config().group().register(channel)
实际调用到了NioEventLoopGroup
超类MultithreadEventLoopGroup#register()
方法,最终其实是选中事件循环线程池中的一个NioEventLoop
事件循环对象完成注册,实现是调用其超类SingleThreadEventLoop#register()
方法@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
-
从代码可以看到,注册是由
Unsafe
类完成的,其实现为AbstractUnsafe#register()
。此时事件循环线程还没有启动,故其会将AbstractUnsafe#register0()
方法包装成异步任务扔到事件循环对象中执行@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ...... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
-
以上任务其实提交到了
SingleThreadEventExecutor#execute()
,这个方法比较关键的其实只有两步:- addTask() 将异步任务添加到任务队列中
- 此时事件循环尚未启动,调用 startThread() 新建线程并启动
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
-
启动线程调用到
SingleThreadEventExecutor#doStartThread()
,此处代码executor.execute()
实际是通过ThreadPerTaskExecutor
新建并启动线程,至此则SingleThreadEventExecutor.this.run()
方法被调用private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { ...... } } }); }
-
SingleThreadEventExecutor.this.run()
为抽象方法,其实现为NioEventLoop#run()
,其主要逻辑如下:for 空循环正式启动事件循环线程。循环中select() 方法通过 Selector 轮询 IO 就绪事件,之后根据 ioRatio 配置分配processSelectedKeys()处理 IO 事件 和 runAllTasks() 处理其他任务的时间
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // fall through default: } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
-
SingleThreadEventExecutor#runAllTasks()
会将队列中的任务处理掉,则之前的异步任务AbstractUnsafe#register0()
被执行,其处理步骤如下:- doRegister() 完成 ServerSocketChannel 的注册
- pipeline.invokeHandlerAddedIfNeeded() 回调 Handler 处理器的 handlerAdded() 方法
- pipeline.fireChannelRegistered() 通知 Channel 注册事件,由处理器做相应处理
- 此时 ServerSocketChannel 如果是激活状态,且是第一次注册,则 pipeline.fireChannelActive() 通知 Channel 激活事件,由处理器做对应处理
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
-
AbstractNioChannel#doRegister()
中实际调用了ServerSocketChannel#register()
实现将 Channel 注册到 Selector 上,这属于Java 中 NIO 的标准操作。需注意此时 Channel 的监听标识为 0,也就是说此时 ServerSocketChannel 仅仅注册成功了,还不能监听任何网络操作。不过之后可以通过SelectionKey#interestOps()
方法修改监听操作位为指定值,也就是步骤 3 提到的标识protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
-
pipeline.invokeHandlerAddedIfNeeded()
最终调用到DefaultChannelPipeline#callHandlerAddedForAllHandlers()
,则步骤 7 中封装的PendingHandlerAddedTask#execute
执行,可以看到这个任务的核心为DefaultChannelPipeline#callHandlerAdded0()
@Override void execute() { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } remove0(ctx); ctx.setRemoved(); } } }
-
DefaultChannelPipeline#callHandlerAdded0()
会依次回调处理器的handlerAdded()
方法,则步骤6 的ChannelInitializer#handlerAdded()
会被调用,最终调用ChannelInitializer#initChannel()
将ServerBootstrapAcceptor
注册到 MainReactor 的处理链中,至此再执行完 步骤13 剩下的步骤则注册流程完成p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
2.3 服务端 ServerSocketChannel 绑定
-
Channel 初始化并注册完成后,就进入了绑定流程,也就是方法
AbstractBootstrap#doBind0()
的调用。可以看到方法内部的主体是向已经注册的 Channel 所属的事件循环线程提交一个异步任务,该任务主要调用到AbstractChannel#bind()
方法private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
-
AbstractChannel#bind()
方法内部调用到DefaultChannelPipeline#bind()
方法,这个方法逻辑很简单,只是使用TailContext
的引用调到了AbstractChannelHandlerContext#bind()
方法public final ChannelFuture bind(SocketAddress localAddress) { return tail.bind(localAddress); }
-
AbstractChannelHandlerContext#bind()
内部逻辑也不复杂,首先通过findContextOutbound()
方法从流处理双向链表尾部往前找到一个处理出站事件的处理器,也就是HeadContext
,之后调用其invokeBind()
方法public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
-
HeadContext#invokeBind()
方法其实是借助了Unsafe
类进行绑定操作的,最终也就调到了AbstractUnsafe#bind()
方法。这个方法主要完成了两件事:- doBind() 完成 ServerSocketChannel 绑定到指定端口
- 提交异步任务,调用 pipeline.fireChannelActive() 通知 Channel 已经激活,从而回调处理器中的 channelActive() 方法。需注意,在这个过程中服务端 Channel 监听的事件将被更改为
SelectionKey.OP_ACCEPT
,这部分将在后续解析新连接建立的文章中分析
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
-
dobind()
为抽象方法,其实现为NioServerSocketChannel#doBind()
,可以看到内部逻辑其实就是将 ServerSocketChannel 绑定到指定端口进行监听而已,属于 Java 中 NIO 的标准操作,至此绑定流程结束protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }