创建时机
Unsafe实例被创建,即创建Channel时,ChannelOutboundBuffer被创建,每个Channel实例对应一个ChannelOutboundBuffer实例。
数据结构
ChannelOutboundBuffer实质上是无界的单向链表,如下图所示:
write&flush
- write本质:将待发送数据写入ChannelOutboundBuffer,即加入单向链表;
- flush本质:将ChannelOutboundBuffer中的数据写到TCP发送缓冲区,如果ChannelOutboundBuffer被全部flush完,则取消OP_WRITE;相反,如果没有flush完,而发送缓冲区满,则注册OP_WRITE;
isWritable
背景:由于ChannelOutboundBuffer是无界的链表,当应用层写入数据的速度 > Socket的发送速度时,会导致ChannelOutboundBuffer无限增长,产生OOM;
原理: 当ChannelOutboundBuffer的容量超过高水位设定阈值后,isWritable()返回false,表明消息产生堆积,需要降低写入速度; 当ChannelOutboundBuffer的容量低于低水位设定阈值后,isWritable()返回true,表明消息过少,需要提高写入速度。
超过高水位
当ChannelOutboundBuffer的容量超过高水位设定阈值后,isWritable()返回false,并且触发fireChannelWritabilityChanged(),如下代码所示:
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}
低于低水位
当ChannelOutboundBuffer的容量低于低水位设定阈值后,isWritable()返回true,并且触发fireChannelWritabilityChanged(),如下代码所示:
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
pipeline.fireChannelWritabilityChanged();
}
}
参考: