Netty源码分析
1.服务端创建流程
Netty 服务端创建的时序图:
主要步骤为:
- 创建 ServerBootstrap 实例
- 设置并绑定 Reactor 线程池
- 设置并绑定服务端 Channel
- 创建并初始化 ChannelPipeline
- 添加并设置 ChannelHandler
- 绑定并启动监听端口
代码
public class NettyServer {
public void bind(int port){
// 创建EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(); //创建BOSS线程组 用于服务端接受客户端的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); //创建WORK线程组 用于进行SocketChannel的网络读写
try {
// 创建ServerBootStrap实例
// ServerBootstrap 用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度
ServerBootstrap b = new ServerBootstrap();
// 绑定Reactor线程池
b.group(bossGroup, workerGroup)
// 设置并绑定服务端Channel
// 指定所使用的NIO传输的Channel
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
@Override
protected void initChannel(Channel ch) throws Exception {
//do something
}
});
// 绑定端口,同步等待成功
ChannelFuture future = b.bind(port).sync();
// 等待服务端监听端口关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅地关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class LoggingServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("loggin-channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("loggin-channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("loggin-handlerAdded");
}
}
public static void main(String[] args){
new NettyServer().bind(8899);
}
}
2.代码分析
2.1创建EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup 为 BOSS 线程组,用于服务端接受客户端的连接, workerGroup 为 worker 线程组,用于进行 SocketChannel 的网络读写。当然也可以创建一个并共享。
创建ServerBootst rap实例
ServerBootstrap b = new ServerBootstrap();
ServerBootStrap为Netty服务端的启动引导类,用于帮助用户快速配置、启动服务端服务。
2.2设置并绑定Reactor线程池
调用 group() 方法,为 ServerBootstrap 实例设置并绑定 Reactor 线程池。
b.group(bossGroup, workerGroup)
当然也可以使用一个,b.group(group),因为在ServerBootstrap内部调用了group(bossGroup, workerGroup)方法。
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
EventLoopGroup 为 Netty 线程池,它实际上就是 EventLoop 的数组容器。EventLoop 的职责是处理所有注册到本线程多路复用器 Selector 上的 Channel,Selector 的轮询操作由绑定的 EventLoop 线程 run 方法驱动,在一个循环体内循环执行。通俗点讲就是一个死循环,不断的检测 I/O 事件、处理 I/O 事件。bossGroup 的作用就是不断地接收新的连接,接收之后就丢给 workerGroup 来处理,workerGroup 负责干活就行(负责客户端连接的 IO 操作)。
group方法的源码如下:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
首先调用了父类的构造方法,并且参数是的bossGroup,然后将成员变量childGroup设置为我们传入的workerGroup。
2.3绑定服务端channel
绑定线程池后,则需要设置 channel 类型,服务端用的是 NioServerSocketChannel 。当然也可以使用BioServerSocketChannel。
.channel(NioServerSocketChannel.class)
Netty通过工厂类,利用反射创建NioServerSocketChannel 对象,如下:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
首先调用serverbootStrap的channel方法传入NioServerSocketChannel.class,在cahnnel方法中创建一个ReflectiveChannelFactory对象,并通过构造方法把NioServerSocketChannel.class赋值给clazz属性。接下来把实例化好的ReflectiveChannelFactory对象赋值给serverbootStrap的channelFactory属性最后返回其实过程很简单,就是实例化了一个channel工厂类。其中ReflectiveChannelFactory实现了ChannelFactory接口。
需要创建 channel 的时候,该方法将被调用
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
2.4添加并设置ChannelHandler(TODO添加一个childhandler)
设置完 Channel 参数后,用户可以为启动辅助类和其父类分别指定 Handler
.handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
//省略代码
})
这两个 Handler 不一样,前者(handler())设置的 Handler 是服务端 NioServerSocketChannel的,后者(childHandler())设置的 Handler 是属于每一个新建的 NioSocketChannel 的。跟踪源代码会发现两种所处的类不一样,handler 位于 AbstractBootstrap 中,childHandler 位于 ServerBootstrap 中。ServerBootstrap 中的 Handler 是 NioServerSocketChannel 使用的,所有连接该监听端口的客户端都会执行它,父类 AbstractBootstrap 中的 Handler 是一个工厂类,它为每一个新接入的客户端都创建一个新的 Handler。
2.5绑定端口,启动服务
通过b.bind(port)来启动服务,下面将来进行分析。调用 ServerBootstrap 的 bind(int port) 方法进行端口绑定,该方法有调用bind(SocketAddress localAddress)方法。
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
其中调用validate()和doBind(localAddress)两个方法。这两个方法在
AbstractBootstrap中。
//函数功能:检查相关参数是否设置了
@SuppressWarnings("unchecked")
public B validate() {
if (group == null) {//这里的group指的是:b.group(bossGroup, workerGroup)代码中的bossGroup,
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return (B) this;
}
该方法主要检查了两个参数,一个是group,一个是channelFactory,前面已经提到。其中调用用group方法是将bossGroup赋值给了group,调用将channel()方法将创建ChannelFactory赋值给了channelFactory。
接下来看bind方法中的doBind(localAddress)方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//1
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);//2
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind这个函数是我们要分析的重点,这个函数的主要工作有如下几点:
1、通过initAndRegister()方法得到一个ChannelFuture的实例regFuture。
2、通过regFuture.cause()方法判断是否在执行initAndRegister方法时产生来异常。如果产生来异常,则直接返回,如果没有产生异常则进行第3步。
3、通过regFuture.isDone()来判断initAndRegister方法是否执行完毕,如果执行完毕来返回true,然后调用doBind0进行socket绑定。如果没有执行完毕则返回false进行第4步。
4、regFuture会添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。
第3、4点想干的事就是一个:调用doBind0方法进行socket绑定。
下面将分成4部分对每行代码具体做了哪些工作进行详细分析。
1.final ChannelFuture regFuture = initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();//A
init(channel);//B
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);//C
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
A. channel = new NioServerSocketChannel()
这里的channel为一个NioServerSocketChannel对象,通过反射使用channelFactory的newChannel()方法,该方法在2.3节提到过,channelFactory对象是通过channel方法创建的。
NioServerSocketChannel类的继承体系结构如下:
进一步看看NioServerSocketChannel的构造方法:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
NioServerSocketChannel中常量字段的定义为:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
通过newSocket方法得到了一个ServerSocketChannel的实例。
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
接下来将上面得到的ServerSocketChannel的实例传入另一个构造方法:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//父类AbstractNioMessageChannel的构造函数
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
//父类 AbstractNioChannel的构造函数
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
try {
ch.configureBlocking(false);//设置当前的ServerSocketChannel为非阻塞的
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//父类AbstractChannel的构造函数
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
总结一下:
channel = new NioServerSocketChannel()产生一个实例对象时,其实干了很多事情。
- 1.产生来一个SocketChannelImpl类的实例,并设置为非阻塞的。
- 2.设置了config属性
config = new NioServerSocketChannelConfig(this, javaChannel().socket() - 3.设置SelectionKey.OP_ACCEPT事件
- 4.设置unsafe属性,主要作用为:用来负责底层的connect、register、read和write等操作。
unsafe = newUnsafe(); - 5.通过newChannelPipeline()创建实例,并赋值给pipeline属性,每个Channel都有自己的pipeline,当有请求事件发生时,pipeline负责调用相应的hander进行处理。
B. init(channel)
@Override
void init(Channel channel) throws Exception {
//1、设置新接入channel的option
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//2、设置新接入channel的attr
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 获取绑定的pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 为NioServerSocketChannel的pipeline添加一个初始化Handler,
// 当NioServerSocketChannel在EventLoop注册成功时,该handler的init方法将被调用
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 为NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor处理器
// 该Handler主要用来将新创建的NioSocketChannel注册到EventLoopGroup中
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
其实整个过程可以分为三个步骤:
- 设置 Channel 的 option 和 attr;
- 获取绑定的 pipeline,然后为 NioServerSocketChannel绑定的 pipeline 添加 Handler;
- 将用于服务端注册的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中。ServerBootstrapAcceptor 为一个接入器,专门接受新请求,把新的请求扔给某个事件循环器。ServerBootstrapAcceptor是ServerBootstrap的一个静态内部类,下面是构造方法。
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
C.ChannelFuture regFuture = config().group().register(channel);
这段代码的功能及时向EventLoopGroup中注册一个channel,注意这里的 group() 返回的是前面的 boss NioEvenLoopGroup,register方法实现在MultithreadEventLoopGroup,他的接口声明在EventLoopGroup中。下面我们进入register方法内部:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
调用 next() 方法从 EventLoopGroup 中获取下一个 EventLoop,调用 register() 方法注册:
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
将Channel和EventLoop封装成一个DefaultChannelPromise对象,然后调用register()方法。DefaultChannelPromis为ChannelPromise的默认实现,而ChannelPromisee继承Future,具备异步执行结构,绑定Channel,所以又具备了监听的能力,故而ChannelPromise是Netty异步执行的核心接口。
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// 必须要保证注册是由该EventLoop发起的
if (eventLoop.inEventLoop()) {
register0(promise); // 注册
} else {
// 如果不是单独封装成一个task异步执行
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
过程如下:
- 首先通过isRegistered() 判断该 Channel 是否已经注册到 EventLoop 中;
- 通过 eventLoop.inEventLoop() 来判断当前线程是否为该 EventLoop 自身发起的,如果是,则调用 register0() 直接注册;
- 如果不是,说明该 EventLoop 中的线程此时没有执行权,则需要新建一个线程,单独封装一个 Task,而该 Task 的主要任务则是执行 register0()。
无论当前 EventLoop 的线程是否拥有执行权,最终都会要执行 register0(),如下:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 真正的注册动作
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
//如果是首次注册,发起 pipeline 的 fireChannelActive
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
如果 Channel 处于 open 状态,则调用 doRegister() 方法完成注册,然后将注册结果设置为成功。最后判断如果是首次注册且处于激活状态,则发起 pipeline 的 fireChannelActive()。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 注册到NIOEventLoop的Selector上
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
这里注册时 ops 设置的是 0,也就是说 ServerSocketChannel 仅仅只是表示了注册成功,还不能监听任何网络操作,这样做的目的是(摘自《Netty权威指南(第二版)》):
- 注册方式是多态的,它既可以被 NIOServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或者写操作。
- 通过 SelectionKey.interestOps(int ops) 方法可以方便地修改监听操作位。所以,此处注册需要获取 SelectionKey 并给 AbstractNIOChannel 的成员变量 selectionKey 赋值。
接下来继续看 pipeline.fireChannelRegistered()做了什么事情。
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
在 invokeChannelRegistered() 会调用我们在前面设置的 handler (还记得签名的 handler(new LoggingServerHandler() )么)的 channelRegistered(),这个时候控制台应该会打印 loggin-channelRegistered。
到这里initAndRegister() (final ChannelFuture regFuture = initAndRegister();)就分析完毕了,该方法主要做如下三件事:
- 通过反射产生了一个 NioServerSocketChannle 对象;
- 调用 init(channel)完成初始化工作;
- 将NioServerSocketChannel进行了注册。
2.doBind0(regFuture, channel, localAddress, promise);
下面分析 doBind0()到底做了些什么。源码如下
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
首先new 一个线程 task,然后将该任务提交到 NioEventLoop 中进行处理,我们先看 execute()。
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
调用 inEventLoop() 判断当前线程是否为该 NioEventLoop 所关联的线程,如果是,则调用 addTask() 将任务 task 添加到队列中,如果不是,则先启动线程,在调用 addTask() 将任务 task 添加到队列中。addTask() 如下:
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
task 添加到任务队列 taskQueue成功后,执行任务会调用如下方法:
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
tail 在 DefaultChannelPipeline 中定义:final AbstractChannelHandlerContext tail; 有 tail 就会有 head ,在 DefaultChannelPipeline 中维护这一个 AbstractChannelHandlerContext 节点的双向链表,该链表是实现 Pipeline 机制的关键,更多详情会在 ChannelPipeline 中做详细说明。bind() 最终会调用 DefaultChannelPipeline 的 bind() 方法。如下:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (!validatePromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
首先对 localAddress 、 promise 进行校验,符合规范则调用 findContextOutbound() ,该方法用于在 pipeline 中获取 AbstractChannelHandlerContext 双向链表中的一个节点,如下:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
从该方法可以看出,所获取的节点是从 tail 开始遍历,获取第一个节点属性 outbound 为 true 的节点。其实该节点是 AbstractChannelHandlerContext 双向链表的 head 节点。获取该节点后,调用 invokeBind(),如下
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
handler() 返回的是 HeadContext 对象,然后调用其bind(),如下:
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
unsafe 定义在 HeadContext 中,在构造函数中初始化(unsafe = pipeline.channel().unsafe();),调用 bind() 如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
// 最核心方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
内部调用 doBind() ,该方法为绑定中最核心的方法,位于 NioServerSocketChannel 中,如下
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
javaChannel()返回的是 NioServerSocketChannel 实例初始化时所产生的 Java NIO ServerSocketChannel 实例(ServerSocketChannelImple实例),然后调用其 bind(),如下:
public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
Object var3 = this.lock;
synchronized(this.lock) {
if(!this.isOpen()) {
throw new ClosedChannelException();
} else if(this.isBound()) {
throw new AlreadyBoundException();
} else {
InetSocketAddress var4 = var1 == null?new InetSocketAddress(0):Net.checkAddress(var1);
SecurityManager var5 = System.getSecurityManager();
if(var5 != null) {
var5.checkListen(var4.getPort());
}
NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
Net.bind(this.fd, var4.getAddress(), var4.getPort());
Net.listen(this.fd, var2 < 1?50:var2);
Object var6 = this.stateLock;
synchronized(this.stateLock) {
this.localAddress = Net.localAddress(this.fd);
}
return this;
}
}
}