Server bind之后,就可以对外提供服务了。Netty使用了reactor模式来提升服务的并发处理能力。boss线程负责监听新的连接请求,当有新的连接进来时,将对应的channel指派一个worker线程来处理。Worker线程负责对该Channel的读写操作。
一.Boss线程
1.阻塞Select
for (;;) { try { // Boss线程专门负责监听新入连接,所以阻塞select selector.select(); // 如果有新连接,先把key清掉 selector.selectedKeys().clear(); // 循环请求队列,处理连接 for (;;) { SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket == null) { break; } registerAcceptedChannel(acceptedSocket, currentThread); } ...... }2.注册新连接
private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { ...... //根据用户自定义的的PipelineFactory创建pipeline ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); //hash分配worker线程,默认使用递增循环worker数组方式 NioWorker worker = nextWorker(); //将新的连接注册到worker线程,让worker线程负责后续读写 //新的channel是主channel的子channel,而PipelineSink和主channel是同一个 worker.register(new NioAcceptedSocketChannel( channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null); ...... }
void register(AbstractNioChannel<?> channel, ChannelFuture future) { synchronized (startStopLock) { ...... //创建注册通道的任务 Runnable registerTask = createRegisterTask(channel, future); //提交任务到阻塞队列 boolean offered = registerTaskQueue.offer(registerTask); //唤醒selector if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } }
3.创建注册任务
protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) { boolean server = !(channel instanceof NioClientSocketChannel); return new RegisterTask((NioSocketChannel) channel, future, server); }
二.worker线程
worker线程负责对应channel的读写操作,一个worker对应一个selector,会同时处理多个channel的读写。
1.主循环
for (;;) { wakenUp.set(false); ...... if (wakenUp.get()) { wakenupFromLoop = true; selector.wakeup(); } else { wakenupFromLoop = false; } cancelledKeys = 0; //处理注册通道的任务 processRegisterTaskQueue(); //处理异步事件,比如writeComplete事件 processEventQueue(); //处理写数据任务,如果业务线程有异步写的时候,会有WriteTask放入队列 processWriteTaskQueue(); //处理IO准备好的那些channel processSelectedKeys(selector.selectedKeys()); ...... }
2.RegisterTask执行
public void run() { ...... //如果是server,则使用异步模式 if (server) { channel.channel.configureBlocking(false); } //将新的channel注册到worker线程的selector上,默认监听READ事件 synchronized (channel.interestOpsLock) { channel.channel.register( selector, channel.getRawInterestOps(), channel); } ...... //触发BOUND的upstream事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件 if (server || !((NioClientSocketChannel) channel).boundManually) { fireChannelBound(channel, localAddress); } //触发CONNECTED的upsteam事件,该事件将在用户自定义的pipeline中运行,在这里EchoServerHandler默认不处理该事件 fireChannelConnected(channel, remoteAddress); ...... }
3.处理读写准备好的那些channel
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); try { int readyOps = k.readyOps(); //如果某个channel写就位,则读数据 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if (!read(k)) { // Connection already closed - no need to handle write. continue; } } //如果写就位,则写数据 if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k); } } catch (CancelledKeyException e) { close(k); } ...... }
4. 读取
//从channel中读取数据到内部的buffer,转换成内部的ChannelBuffer,触发messageReceived事件 protected boolean read(SelectionKey k) { final SocketChannel ch = (SocketChannel) k.channel(); final NioSocketChannel channel = (NioSocketChannel) k.attachment(); //预测下次读将读取的buffer大小,默认使用自适应的预测算法,如果上次读取把buffer读满,则增大该值,如果连续2次都没读满,则减小该值 //如果以上都不满足,则保持不变,默认长度1024 final ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); //默认BufferFactory为HeapChannelBufferFactory,默认使用Big Endian字节序 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); int ret = 0; int readBytes = 0; boolean failure = true; //从共享pool中拿配额,从channel中读取对应数据 ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder()); try { while ((ret = ch.read(bb)) > 0) { readBytes += ret; if (!bb.hasRemaining()) { break; } } failure = false; } ...... //有数据读入,则转换成自己的ChannelBuffer,并触发messageReceived事件,该事件将在用户自定义的Pipeline中执行 if (readBytes > 0) { bb.flip(); //构造一个ChannelBuffer,默认使用堆内的数组实现 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes); //复制数据到channelBuffer buffer.setBytes(0, bb); //写游标 buffer.writerIndex(readBytes); // 修改预测器的下次读取buffer大小 predictor.previousReceiveBufferSize(readBytes); // 触发messageReceived事件 fireMessageReceived(channel, buffer); } ...... }
5.EchoServerHandler接受消息
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { //通过channel将数据写回 e.getChannel().write(e.getMessage()); }
6.数据写回,write方法其实是触发一个Downsteam事件
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) { ChannelFuture future = future(channel); channel.getPipeline().sendDownstream( new DownstreamMessageEvent(channel, future, message, remoteAddress)); return future; }
7.ChannelPipeline中的处理
public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); //如果handler已经处理完了,则转发到ChannelSink处理,对于nioserver来说就是NioServerSocketPipelineSink if (tail == null) { try { getSink().eventSunk(this, e); return; } catch (Throwable t) { notifyHandlerException(e, t); return; } } //否则,继续调用其他handler sendDownstream(tail, e); }
8.NioServerSocketPipelineSink中处理channel事件
else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); //先放入写任务队列 boolean offered = channel.writeBufferQueue.offer(event); assert offered; //最后还是要通过work来写回数据 channel.worker.writeFromUserCode(channel); }
9.worker线程的处理
void writeFromUserCode(final AbstractNioChannel<?> channel) { ...... //如果业务方使用了业务线程异步写,则直接往worker线程的写队列扔一个WriteTask任务 if (scheduleWriteIfNecessary(channel)) { return; } ...... //如果业务方没有使用业务线程异步写,说明现在还在netty的Worker线程中,直接写 write0(channel); }
10.Worker线程直接写
protected void write0(AbstractNioChannel<?> channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; //循环写入,如果都写成功了,则将去掉该channel在selector中注册的WRITE事件监听 for (;;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf = null; ChannelFuture future = null; try { if (evt == null) { //从队列中拿需要写回的数据内容,如果没有了,则认为写成功了 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { removeOpWrite = true; channel.writeSuspended = false; break; } future = evt.getFuture(); //将ChannelBuffer转换成ByteBuffer,此处使用PooledSendBuffer channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); } else { future = evt.getFuture(); buf = channel.currentWriteBuffer; } long localWrittenBytes = 0; for (int i = writeSpinCount; i > 0; i --) { //将Buffer里的数据写出,因为是异步channel,如果socket的write队列满,会导致写处返回0,则重试 localWrittenBytes = buf.transferTo(ch); //有数据写出就返回,不管是否全部写出 if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; } if (buf.finished()) { break; } } //如果全部写出,则通知调用方 if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; evt = null; buf = null; future.setSuccess(); } //如果还没写完,则需要让selector也关心这个channel的write事件,让write就位时,继续写 else { // Not written fully - perhaps the kernel buffer is full. addOpWrite = true; channel.writeSuspended = true; ...... } } ...... } channel.inWriteNowLoop = false; //让selector监听write事件 if (addOpWrite) { setOpWrite(channel); } //写成功后,把write监听去掉 else if (removeOpWrite) { clearOpWrite(channel); } } //如果worker线程直接写,直接触发writeComplete upstream事件,让handler处理 if (iothread) { fireWriteComplete(channel, writtenBytes); } //如果是业务线程异步写,将通过worker线程的eventQueue实现异步延时触发writeComplete事件 else { fireWriteCompleteLater(channel, writtenBytes); } }
11.Worker线程异步写,当业务方使用多线程处理时,写回的动作对worker来说是异步的
12.业务线程放入写任务队列
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { boolean offered = writeTaskQueue.offer(channel.writeTask); assert offered; }
13.worker线程执行写任务
private void processWriteTaskQueue() throws IOException { for (;;) { final Runnable task = writeTaskQueue.poll(); if (task == null) { break; } task.run(); cleanUpCancelledKeys(); } }
14.WriteTask执行
private final class WriteTask implements Runnable { WriteTask() { } public void run() { writeTaskInTaskQueue.set(false); worker.writeFromTaskLoop(AbstractNioChannel.this); } }
15.worker线程执行数据写入
void writeFromSelectorLoop(final SelectionKey k) { AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment(); ch.writeSuspended = false; write0(ch); }