接着上篇NioEventLoop遗留的问题,Accept事件和Read事件。
当前selectionKey发生的事件是SelectionKey.OP_ACCEPT或者SelectionKey.OP_READ,执行unsafe的read方法。
NioUnsafe的多态
说到unsafe,其实在NioServerSocketChannel中提到过,Unsafe是定义在Channel中的接口,看看它的实现:
在PerocessKey方法中的定义是这样的
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
两个不同的实现NioByteUnsafe和NioMessageUnsafe分别是为NioSocketChannel和NioServerSocketChannel服务的,当然前者的实现分别是后者或者父类的内部类。那么他是何时与Channel绑定的?以NioServerSocketChannel为例,实例化的时候,调用了父类的构造方法:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
newUnsafe()的实现分别在不同的Unsafe实现中。根据多态来实现不同类型Channel的相同API,说白了就是NioMessageUnsafe的read处理Accept事件,NioByteUnsafe的处理Read事件,但是入口都是read。
下面我们开始分析accept事件的处理。
创建SocketChannel
read方法定义在NioMessageUnsafe类中,代码如下:
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
- 检查该 eventloop 线程是否是当前线程。
- 执行 doReadMessages 方法,并传入一个 readBuf 变量,这个变量是一个 List,通过调用方法doReadMessages来进行处理ServerSocketChannel的accept操作。 如果此时没有客户端连接,则退出for循环进行后续的处理,如果有客户端连接,则将客户端NioSocketChannel保存到readBuf中(默认不超过16个),如果超过16个,则也退出for循环进行后续的处理。
doReadMessages 方法的作用是通过 ServerSocketChannel 的 accept 方法获取到 Tcp 连接,然后封装成 Netty 的 NioSocketChannel 对象
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//省略异常捕获代码
SocketChannel ch = SocketUtils.accept(javaChannel());
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}
ServerSocketChannel有阻塞和非阻塞两种模式:
-
阻塞模式:ServerSocketChannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。阻塞模式下, accept()方法会一直阻塞到有新连接到达。
-
非阻塞模式:accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null。
在NioServerSocketChannel的构造函数分析中,我们知道,其通过ch.configureBlocking(false);语句设置当前的ServerSocketChannel为非阻塞的。
- 循环容器,执行 pipeline.fireChannelRead(readBuf.get(i)),回想JKD NIO的操作流程,一般通过accept获取到SocketChannel后,会将SocketChannel注册到Selector中,当然可以和ServerSocketChannel使用同一个,也可以使用不同的。当然这取决与你的设置。注册完成之后启动与SocketChannel关联的EventLoop中的线程(如果配置过个Eventloop),这个过程和ServerSocketChannel的注册启动类似。
SocketChannel的注册启动是通过ServerBootStrap中的ServerBootStrapAcceptor类来实现的,它是在ServerBootStrap启动过程中添加到ServerSocketChannel关联的PipeLine中的一个Inboundhandler。接下单独一节分析PipeLine中Accept时间的传输,fireChannelRead方法的实现在DefaultChannelPipeline类中。接下单独一节分析PipeLine中Accept时间的传输,fireChannelRead方法的实现在DefaultChannelPipeline类中。
注册和启动
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
内部调用了静态方法invokeChannelRead并传入了 head 节点,msg 就是 容器中的 Socket,改静态方法内部调用了AbstractChannelHandlerContext的invokeChannelRegistered(),注意这个方法和是普通方法并且没有参数,此方法内部接着调用了和context绑定的handler的channelRegistered方法,当然首先是head节点。几乎所有的Inbound事件都是一个模式,在Head节点调用AbstractChannelHandlerContext的fireChannelRegistered,这个方法的内部首先是寻找下一个Inbound类型的Context,然后Invoke对应Handler的ChannelRead方法。
Head节点的ChannelRead方法很直观易懂,这里我们重点分析ServerBootStarpAcceptor的ChannelRead方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//添加childHandler到NioSocketChannel的pipeline。
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
首先添加childHandler到NioSocketChannel的pipeline。childGroup.register(child)将NioSocketChannel注册到work的eventLoop中,这个过程和NioServerSocketChannel注册到boss的eventLoop的过程一样。这个注册过程主要有三个重点:
- 将EventLoop绑定到SocketChannel
- 最终调用AbstratNioSocketChannel.doRegister方法把SocketChannel注册到EventLoop的Selector上,并且随后调用一次fireChannelRegister。于此同时启动和SocketChannel关联的EventLoop的线程。
- 启动EventLoop的线程,开始等待读事件的到来
最后因此在此Read方法中没有调用fireChannelRead方法,因此不会传到Tial节点,至此Accept事件的处理就结束了。