我们知道,在NioEventLoop当中,我们会循环处理得到的selectedKeys,调用的方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
其中有代码会去处理Accept事件
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
调用了Channel的unsafe属性的read方法,里面会有代码产生并获取SocketChannel
int localRead = doReadMessages(readBuf);
int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i));//pipeline就会将这个消息交给他的channelHandler处理, } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete();
我们知道,在初始化Channel的时候,会在这个Channel的pipeline中添加一些handler,并在注册完成之后完成添加操作
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } })
对于ChannelInitializer,他也是一个channelHandler,但是和其他的Handler不同的是,他在注册完成之后,完成了响应的channelAdd回调的过程中,会执行它的initChannel方法,并将自身从pipeline中移除掉。
所以在initChannel方法中,我们又看到了 ch.eventLoop().execute,这个会向eventLoop的提交一个任务,这个任务会执行一个添加handler的任务,所以unsafe.read()方法中的
pipeline.fireChannelRead(readBuf.get(i));
会进入到ServerBootstrapAcceptor的channelRead方法中,在这里完成对socketChannel的初始化和注册工作。
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel());//看见没有,这个就是生产一个socketChannel的方法,基本的方法就在这个里面 try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }