用nettys收发网络数据的时候,一般不会注册SelectionKey.OP_WRITE事件。但是,如果在netty写数据的时候AbstractNioWorker.write0,发现写不进去了buf.finished()==false,可能是操作系统内核缓冲区满了,这个时候会把SelectionKey关联的Channel注册上OP_WRITE事件。
protected void write0(AbstractNioChannel<?> channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; boolean iothread = isIoThread(channel); long writtenBytes = 0; final SocketSendBufferPool sendBufferPool = this.sendBufferPool; final WritableByteChannel ch = channel.channel; final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); synchronized (channel.writeLock) { channel.inWriteNowLoop = true; for (;;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf = null; ChannelFuture future = null; try { …… if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; // Mark the event object for garbage collection. //noinspection UnusedAssignment evt = null; buf = null; future.setSuccess(); } else {//写不进去啦,内核缓冲区可能满了 // Not written fully - perhaps the kernel buffer is full. addOpWrite = true;//这个参数在后边会导致注册OP_WRITE事件 channel.writeSuspended = true;//这个参数会影响到后续的写入操作 if (localWrittenBytes > 0) { // Notify progress listeners if necessary. future.setProgress( localWrittenBytes, buf.writtenBytes(), buf.totalBytes()); } break; } } catch (AsynchronousCloseException e) { // Doesn't need a user attention - ignore. } catch (Throwable t) { …… } } channel.inWriteNowLoop = false; // Initially, the following block was executed after releasing // the writeLock, but there was a race condition, and it has to be // executed before releasing the writeLock: // // https://issues.jboss.org/browse/NETTY-410 // if (open) { if (addOpWrite) { setOpWrite(channel); } else if (removeOpWrite) { clearOpWrite(channel); } } } …… }
同时,AbstractNioWorker.write0的时候,会把被写的Channel.writeSuspended设置为true,这个参数在Channels.write -> DefaultChannelPipeline.sendDownstream -> NioClientSocketPipelineSink.eventSunk -> AbstractNioWorker.writeFromUserCode时会控制后续写入操作。
public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { ChannelStateEvent event = (ChannelStateEvent) e; NioClientSocketChannel channel = (NioClientSocketChannel) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); switch (state) { case OPEN: …… case INTEREST_OPS: channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBufferQueue.offer(event); assert offered; channel.worker.writeFromUserCode(channel);//写入前的判断逻辑 } } void writeFromUserCode(final AbstractNioChannel<?> channel) { …… if (channel.writeSuspended) {//如果这里为true,永远也不会调用真正的写入操作了 return; } …… write0(channel); }
但是上边提到,在写入不了的时候,注册了OP_WRITE事件,一旦内核缓冲有空间了,在selector.selectKeys()的时候,就会抓到这个注册了OP_WRITE事件的selectionKey(问题是,哪个线程会做这样的事情?channel和worker是绑定的?一个worker关联一个selector?)NioWorker.run -> AbstractNioWorker.run
public void run() { thread = Thread.currentThread(); int selectReturnsImmediately = 0; Selector selector = this.selector; if (selector == null) { return; } // use 80% of the timeout for measure final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; boolean wakenupFromLoop = false; for (;;) { wakenUp.set(false); try { long beforeSelect = System.nanoTime(); int selected = select(selector);//select准备就绪的key if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) { long timeBlocked = System.nanoTime() - beforeSelect; if (timeBlocked < minSelectTimeout) { boolean notConnected = false; // loop over all keys as the selector may was unblocked because of a closed channel for (SelectionKey key: selector.keys()) { SelectableChannel ch = key.channel(); try { if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() || ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) { notConnected = true; // cancel the key just to be on the safe side key.cancel(); } } catch (CancelledKeyException e) { // ignore } } if (notConnected) { selectReturnsImmediately = 0; } else { // returned before the minSelectTimeout elapsed with nothing select. // this may be the cause of the jdk epoll(..) bug, so increment the counter // which we use later to see if its really the jdk bug. selectReturnsImmediately ++; } } else { selectReturnsImmediately = 0; } …… } else { // reset counter selectReturnsImmediately = 0; } …… if (wakenUp.get()) { wakenupFromLoop = true; selector.wakeup(); } else { wakenupFromLoop = false; } cancelledKeys = 0; processTaskQueue();//处理task,真正的逻辑是在这里处理的? selector = this.selector; // processTaskQueue() can call rebuildSelector() if (shutdown) { …… } else { process(selector);//处理就绪的key } } catch (Throwable t) { …… } } } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); try { int readyOps = k.readyOps(); //读事件 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if (!read(k)) { // Connection already closed - no need to handle write. continue; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k);//注册了OP_WRITE事件!现在内核告知可以继续写啦! } } catch (CancelledKeyException e) { close(k); } …… } } void writeFromSelectorLoop(final SelectionKey k) { AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment(); ch.writeSuspended = false;//写不再挂起,调用Channel.write走到writeFromUserCode()的时候,也可以顺利的走到write0了 write0(ch); }