1. 主线
our thread就是我们自己写的netty服务器的主线程的代码
2. 启动
在下面三处打上断点
2.1 selector初始化是在NioEventLoopGroup初始化
运行程序:发现,查看NioEventLoopGroup
通过创建一个子的NioEventLoopGroup
,然后子的NioEventLoopGroup
又NioEventLoop
,并完成selector的创建
2.2 进入bind方法
跟进源码:
private ChannelFuture doBind(final SocketAddress localAddress) {
// channel注册、初始化、绑定到EventLoopGroup上
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// regFuture是异步的,到此处不一定会完成
if (regFuture.isDone()) {//注册成功获取回调
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 完成对端口绑定
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 如果没有完成注册操作会将注册任务封装成一个task,加入到regFuture的Listener上
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
2.2.1 initAndRegister方法的具体实现
2.2.1.1initAndRegister中的init()方法
@Override
void init(Channel channel) {
// 初始化option
setChannelOptions(channel, newOptionsArray(), logger);
// 初始化属性
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
// 构建一个pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;// 全局变量childGroup改名为currentChildGroup
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
// ChannelInitializer完成将我们需要的handler加入到pipeline中
// 当将所有的handler加入到pipeline完成以后,ChannelInitializer结束,从pipeline中移除
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// new ServerBootstrapAcceptor()对象时将currentChildGroup作为参数传入
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
2.2.1.2 initAndRegister方法中的register()
因为next方法我们之前已经讲过了两种选择器
跟进register()的代码
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// 判断当前线程是否是NioEvenetLoop中的线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 如果当前线程不在NioEvenetLoop中,将register0任务丢到eventLoop去执行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
继续跟进register0
方法
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 使用java的jdk的Nio变成,将channel注册到selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
2.2.2 doBind0方法的具体实现
doBind0
中也是讲bind任务放到eventLoop中
跟进bind源码,他会让pipeline去实现bind方法,我们直接进入pipelinde的head
找到bind方法
出现下面的代码:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// 查看当前channel是否已经活跃
boolean wasActive = isActive();
try {
// 完成bind
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// bind完成以后完成激活channel
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 激活channel
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
2.2.2.1 追踪bind方法
最后使用jdk的bind方法完成bind
2.2.2.2 pipeline.fireChannelActive();如何激活channel
不断跟进readIfIsAutoRead()
,发现使用的selectKey的ops操作。
总结:服务启动的核心步骤
一些知识点总结: