Netty设计模式与源码分析
注册 Channel 的 register 操作
经过前面的铺垫,我们已经具备一定的基础了,我们开始来把前面学到的内容揉在一起。这节,我们会介绍 register 操作,这一步其实是非常关键的,对于我们源码分析非常重要。
selector如何注册到serverSocket上的。
继续跟踪代码,找到demo的bind方法,这是一个异步方法,Netty所有逻辑的入口,包括我们说的register操作、Chanel pipeline操作、boss线程与work线程工作的入口、端口绑定等。
bind
//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步=事件的执行情况
//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture f = b.bind(PORT).sync();
最终找到initAndRegister方法,这里我们主要关注3个方法
实例化NioServerSocketChannel
channel = channelFactory.newChannel();
初始化通道的一些信息
init(channel);
注册,将监听事件注册到NioServerSocketChannel上
ChannelFuture regFuture = config().group().register(channel);
我们按步骤进行分析
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//这个就是根据反射创建NioServerSocketChannel对象
channel = channelFactory.newChannel();
//这里初始化NioServerSocketChannel一些逻辑
init(channel);
} catch (Throwable t) {
**********************略*************
}
//注册,将监听事件注册到NioServerSocketChannel上
ChannelFuture regFuture = config().group().register(channel);
***************************************
return regFuture;
}
channelFactory.newChannel()
我们先找到创建服务端的代码,找到初始化通道的代码,这里穿的是NioServerSocketChannel.class
思考:这里为什么用反射创建对象呢,而NioSocketChannel则是直接new的。因为服务端通道不确定,我们可以使用udp,也可以使用bio通道。
这里的ChanelClass就是NioServerSocketChannel,封装到一个ReflectiveChannelFactory对象中,并将构造方法赋值给constructor
我们回到主线逻辑上
channel = channelFactory.newChannel();
最终调用,就是就是根据反射创建NioServerSocketChannel对象(无参的构造方法)
NioServerSocketChannel
无参构造方法
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
调用jdk底层创建的ServerSocketChannel
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
//创建jdk底层的 ServerSocketChannel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
我们把刚刚获取的ServerSocketChannel传入父类构造方法中。
//channel jdk 中nio中的ServerSocketChannel对象终于被创建出来了
public NioServerSocketChannel(ServerSocketChannel channel) {
//调用父类构造方法
super(null, channel, SelectionKey.OP_ACCEPT);
//创建一个config对象 主要是用于保存配置信息的对象
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
继续走构造方法,初始信息及跟jdk的关联注册等逻辑。详情请看注释代码。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
//将jdk 的 ServerSocketChannel 赋值给 ch 我们终于找到了
//netty中的NioServerSocketChannel 与 jdk中的 ServerSocketChannel 的关联
this.ch = ch;
//server 端 这是一个 accept事件
this.readInterestOp = readInterestOp;
try {
//设置为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
*******************************************
}
}
继续看父类构造方法,主要初始化unsafe与pipline,注意,NioServerSocketChannel与NioSocketChannel的unsafe对象是不同的。
protected AbstractChannel(Channel parent) {
//NioServerSocketChannel 这个parent为null
//NioSocketChannel 这个为 NioServerSocketChannel
this.parent = parent;
// 每个chanel 唯一的id
id = newId();
//unsafe 主要是处理io读写操作
unsafe = newUnsafe();
//pipeline 双向链表 终于来了
pipeline = newChannelPipeline();
}
pipeline
现在的this是NioServerSocketChannel
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
pipline是一个双向链表,有一个头head和一个尾节点tail,需要用context包裹住,
protected DefaultChannelPipeline(Channel channel) {
*******************************************
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
至此,NioServerSocketChannel对象就实例化完毕了,里面的逻辑已经在上面了。
源码跟踪到这里,我们终于找到了梦寐以求的ChannelPipeline,那么现在我们先讲一下Pipeline
ChannelPipeline,和 Inbound、Outbound
很多小伙伴应该都有 Netty 中 pipeline 的概念。我们使用 Netty 的时候,通常就只要写一些自定义的 handler 就可以了,我们定义的这些 handler 会组成一个 pipeline,用于处理 IO 事件,这个和我们平时接触的 Filter 或 Interceptor 表达的差不多是一个意思。
每个 Channel 内部都有一个 pipeline,pipeline 由多个 handler 组成,handler 之间的顺序是很重要的,因为 IO 事件将按照顺序顺次经过 pipeline 上的 handler,这样每个 handler 可以专注于做一点点小事,由多个 handler 组合来完成一些复杂的逻辑。
每个handler都需要实现Inbound和 Outbound接口,每个handler的顺序很重要 ,Inbound代表入栈事件,Outbound代表出栈事件。入栈事件执行的顺序是从前到后,依次寻找入栈事件并执行。而Outbound则是从后往前执行,依次找到所有Outbound的hander并执行。
Inbound需要实现ChannelInboundHandlerAdapter
Outbound需要实现ChannelOutboundHandlerAdapter
当然Netty中有很多默认的实现
在创建NioServerSocketChannel中时我们知道会实例化一个pipeline对象。
childHandler 中指定的 handler 不是给 NioServerSocketChannel 使用的,是给 NioSocketChannel 使用的。
handler才是给NioServerSocketChannel使用的,我们一起来看。
handler赋值到了handler属性
childHandler赋值给了childHandler属性
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
init方法
我们现在到initAndRegister方法中
进入到ServerBootstrap的init方法中
@Override
void init(Channel channel) {
****************************************
//获取到NioServerSocketChannel中的pipeline
ChannelPipeline p = channel.pipeline();
//获取handler 就是给 NioServerSocketChannel 用的handler
final EventLoopGroup currentChildGroup = childGroup;
//获取ChannelHandler 就是给 NioSocketChannel 用的handler
final ChannelHandler currentChildHandler = childHandler;
*******************************************
// 开始往 pipeline 中添加一个 handler,这个 handler 是 ChannelInitializer 的实例
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 这个方法返回我们最开始指定的 LoggingHandler 实例
//从配置中取到handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 先不用管这里的 eventLoop
// childOptions 需要进一步 处理逻辑
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加一个 handler 到 pipeline 中:ServerBootstrapAcceptor
// 从名字可以看到,这个 handler 的目的是用于接收客户端请求
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这里涉及到 pipeline 中的辅助类 ChannelInitializer,我们看到,它本身是一个 handler(Inbound 类型),但是它的作用和普通 handler 有点不一样,它纯碎是用来将其他的 handler 加入到 pipeline 中的。
此时的 pipeline 应该是这样的:
ChannelInitializer 的 initChannel(channel) 方法被调用的时候,会往 pipeline 中添加我们最开始指定的 LoggingHandler 和添加一个 ServerBootstrapAcceptor。但是我们现在还不知道这个 initChannel 方法何时会被调用。
现在要记住 pipeline 现在的样子,head + channelInitializer + tail
register
终于到了注册了(异步操作)
MultithreadEventLoopGroup.register
取模算法或位运算在EventLoopGroup中取到一个EventLoop,然后执行register
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
SingleThreadEventLoop.register
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
SingleThreadEventLoop.register(final ChannelPromise promise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//获取到NioServerSocketChannel 的 unsafe 执行具体的注册逻辑
promise.channel().unsafe().register(this, promise);
return promise;
}
AbstractChannel
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//eventLoop赋值给NioServerSocketChannel的属性上
//这里就是eventLoop与Chanel关联上了
AbstractChannel.this.eventLoop = eventLoop;
//是不是eventLoop内部的线程执行的
//我们现在注册 是主线程在执行 不是eventLoop中的线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 调用eventLoop的execute方法
// 提交任务给 eventLoop到队列中,eventLoop 中的线程会负责调用 register0(promise)
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
******************************************
}
}
}
这里有两步需要说明,一是eventLoop.execute(*****),二是registerer0方法。
先不说register0()方法,我们先说说eventLoop怎样执行的
eventLoop.execute(runnable) 方法
我们先记住刚刚把register0方法存入队列中,并启动线程
private void execute(Runnable task, boolean immediate) {
//判断当前线程是否是 eventLoop 中的线程
boolean inEventLoop = inEventLoop();
//将任务 添加到 eventLoop 的taskQueue中
addTask(task);
if (!inEventLoop) {
//这步很重要 主要是靠这步执行 reactor的循环 监听 等一系列事件
//开启 eventLoop 的线程
startThread();
*******************************************
}
***********************************************
}
开启线程
private void startThread() {
//如果线程没有启动
if (state == ST_NOT_STARTED) {
//置为 主线程启动状态
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 开启线程
doStartThread();
success = true;
} finally {
*******************************
}
}
}
}
private void doStartThread() {
assert thread == null;
// eventLoop 的 executor 属性 终于派上用场了
// executor 线程池 执行里面的线程
// 并将线程 赋值 给 thread 属性
// 这个executor 是我们 在创建 EventLoopGroup 对象是就创建好的 然后传入 eventLoop 中
// 执行
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//重中之重来了 执行 NioEventLoop 的 run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
***********************************
} finally {
***********************************
}
}
});
}
按照前面的逻辑推理,现在taskQueue中的对象就是registerer0,说明现在registerer0还没有执行呢,何时执行呢,我们看SingleThreadEventExecutor.this.run();这个方法,最终会调用NioEventLoop 的run方法
NioEventLoop run方法
现在我们是服务端启动方法,这个方法主要是boss线程,主要负责服务端监听与处理队列中的任务,我们现在没有客户端的连接,此时主要是处理队列中的任务,也就是register0方法。当有客户端连接时,我们后续会讲客户端连接事件。
此时队列不为空,我们会直接执行selectNow方法,直接放回,我们没有accept事件,没有SelectedKeys。
@Override
protected void run() {
int selectCnt = 0;
//死循环 循环监听客户端的连接事件
for (;;) {
try {
int strategy;
try {
//我们之前说的策略 这个策略是 如果队列不为空 执行 selector.selectorNew
//如果队列为空 执行阻塞的方法 selector.selector
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
//当有客户端 accept事件时
case SelectStrategy.SELECT:
*************************
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
***************************************
}
// fall through
default:
}
} catch (IOException e) {
************************************
}
*************************************
else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
//处理selectedkey
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
//处理队列中的任务
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
****************************************************
}
********************************************
} catch (CancelledKeyException e) {
**************************************
}
}
}
先根据 hasTasks() 的结果来决定是执行 selectNow() 还是 select(****)。
如果有任务正在等待,那么应该使用无阻塞的 selectNow(),直接返回结
果执行队列中的任务,如果没有任务在等待,那么就可以使用带阻塞的
select 操作。
之前的register0方法就是NioEventLoop的任务队列中taskQueue中,这是 NioEventLoop 接收到的第一个任务,所以最终会在这里启动执行register0方法。
最终调用到下列方法,循环执行任务队列的所有任务,现在也就是register0方法。我们回去看看register0方法的逻辑。
protected static void safeExecute(Runnable task) {
try {
//执行队列中的任务
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
现在我们讲的时服务端的注册,此时队列中只有register0方法,等待register0方法执行完了,说明已经注册成功,之后等待客户端连接就好了。
register0
很重要的一步逻辑。
private void register0(ChannelPromise promise) {
try {
*****************************
//执行具体的注册事件
// *** 进行 JDK 底层的操作:Channel 注册到 Selector 上 ***
doRegister();
neverRegistered = false;
registered = true;
*****************************************************
//很重言 处理pipeline
//最终会执行 ChannelInitializer 的 init(channel)
pipeline.invokeHandlerAddedIfNeeded();
*****************************************************
// 调用pipeline中的所有 Registered 方法
// 感知到,往 pipeline 中扔一个事件
pipeline.fireChannelRegistered();
// 这里 active 指的是 channel 已经打开
if (isActive()) {
// 如果该 channel 是第一次执行 register,那么 fire ChannelActive 事件
if (firstRegistration) {
//如果注册成功 执行pipeline 所有的 Active 方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
********************************************************
}
}
doRegister();
调用jdk底层将channel注册到seletor上,熟悉的nio代码
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//获取jdk的serverSocketChannel 并注册事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
*****************************************************
}
}
}
这里做了 JDK 底层的 register 操作,将 SocketChannel(或 ServerSocketChannel) 注册到 Selector 中,并且可以看到,这里的监听集合设置为了 0,也就是什么都不监听
当然,也就意味着,后续一定有某个地方会需要修改这个 selectionKey 的监听集合。
register 成功以后,执行了以下操作:
pipeline.invokeHandlerAddedIfNeeded()
最终调用这段代码
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
。
最终会执行ServerBootstrap的init方法,将handler加入到pipeline中。
此时的handler是什么样子的呢
此时pipeline会将ChannelInitializer删除掉,并将pipeline变成以下。具体请参考代码分析。
head -> handler -> serverbootstrapacceptor -> tail
serverbootstrapacceptor是什么时候调用的呢,我们之后再说,我们只需要记住现在pipeline的样子就好了。
pipeline.fireChannelRegistered();
调用到,入栈这里传入的时head
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
// next 此时是 head
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
// 执行 head 的 invokeChannelRegistered()
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
也就是说,这里会先执行 head.invokeChannelRegistered() 方法,最终调用共channelRegistered方法,channelRegistered会执行。
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRegistered();
}
}
head没有实现channelRegistered方法,调用父类的方法,此处逻辑为找到下一个inbound handler 并执行channelRegistered方法。
@Override
public ChannelHandlerContext fireChannelRegistered() {
// findContextInbound() 方法会沿着 pipeline 找到下一个 Inbound 类型的 handler
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
pipeline.fireChannelRegistered() 是将 channelRegistered 事件抛到 pipeline 中,pipeline 中的 handlers 准备处理该事件。而 context.fireChannelRegistered() 是一个 handler 处理完了以后,向后传播给下一个 handler。
findContextInbound() 将找到下一个 Inbound 类型的 handler,然后又是重复上面的几个方法。
总之就是从 head 中开始,依次往下寻找所有 Inbound handler,执行其 channelRegistered(ctx) 操作。
注意:如果没有向后传递,则handler就会停止在这个handler上。
继续重复调用,直到所有的inbound handler都执行完成。
pipeline.fireChannelActive();
注册事件完成之后,这个事件表明通道已经处于活跃状态了。现在我们知道我们的业务handler的channelRegistered与channelActive是何时执行的了吧。
.注意:fire开头的方法都是一次执行pipeline中的handler,分为inbound与outbound,inbound从head开始,outbound从tail开始