一起养成写作习惯!这是我参与「掘金日新计划 · 4 月更文挑战」的第6天,点击查看活动详情。
上文写了Netty究竟是怎么运行的-连接流程的深入剖析,这篇将会分析写流程是怎样的。
写流程
想真的将数据写到网络,必须包括write和flush两个操作。
ctx.channel().writeAndFlush("from client: " + System.currentTimeMillis());
复制代码
为什么要flush呢?
将消息队列中的消息写入到SocketChannel中发送给对方,netty的write方法并不直接将消息写入SocketChannel,调用write只是把待发送的消息放到发送缓冲区里面,再通过调用flush真的发送出去。
write pipeline
write一般在channelActive或者channelRead的处理中调用,ctx.write write的流程如下:
中间的pipeline对要写的数据进行处理加工,
unsafe的write
重要的是:最后unsafe的write
@Override
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise, t);
}
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
复制代码
addMessage 只是将这个msg加入到outboundBuffer,作为tail,并且如果unflushedEntry为null,则更新unflushedEntry。
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
...
}
复制代码
- 必须调用flush才能真的发出数据
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
Entry entry = RECYCLER.get();
entry.msg = msg;
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total;
entry.promise = promise;
return entry;
}
复制代码
total的目的是计算msg的字节数
private static long total(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
return -1;
}
复制代码
支持的类型有
- ByteBuf
- FileRegion
- ByteBufHolder
所以,别的类型的数据是发不出来的。
unsafe flush
flush的pepeline流程几乎和write一样,但是在unsafe的实现上不一样。
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
复制代码
doWrite循环
flush0里面调用的doWrite方法(NioSocketChannel.java)。
首先了解一下数据结构ChannelOutboundBuffer,这个write的时候是将数据写到这里,flush的时候是从这里取走数据的。
另外没写完的时候会注册Key等下次写:
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
复制代码
doWriteInternal对不同类型的支持
flush0-doWrite-doWriteInternal的过程
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
in.remove();
return 0;
}
final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
复制代码
-
如果是ByteBuf,调用doWriteBytes(buf)
-
如果是FileRegion,调用doWriteFileRegion
-
其他类型就不支持了,所以不使用StringEncoder其实是发不出一个字符串的。
doWriteFileRegion
对文件的支持,支持transferTo的零拷贝方式。
@Override
protected long doWriteFileRegion(FileRegion region) throws Exception {
final long position = region.transferred();
return region.transferTo(javaChannel(), position);
}
复制代码
数据结构
ByteBuf
- ByteBuffer不能扩展和收缩
- ByteBuf通过两个位置指针协助缓冲区的读写操作
@Test
void test1(){
ByteBuf buffer = Unpooled.directBuffer(10);
buffer.writeBytes("hello军所女付付多绿扩扩扩扩扩扩扩扩扩扩扩扩扩扩扩扩扩扩扩".getBytes(StandardCharsets.UTF_8));
int readableBytes = buffer.readableBytes();
System.out.println("readableBytes = " + readableBytes);
Assertions.assertTrue(!(readableBytes <= 10));//扩容
ByteBuf byteBuf = buffer.readBytes(10);
int i = byteBuf.readableBytes();
byte[] bytes = new byte[10];
byteBuf.readBytes(bytes);
String string = new String(bytes);
System.out.println("string = " + string);
Assertions.assertTrue((i == 10));
//对于指针的描述是对的 两个读写指针
int i1 = byteBuf.readableBytes();
System.out.println("i1 = " + i1);
int i2 = byteBuf.writableBytes();
System.out.println("i2 = " + i2);
byteBuf.writeBytes("hello".getBytes());//
Assertions.assertTrue(byteBuf.readableBytes()==5);
//读过的可以清理
System.out.println("byteBuf.readerIndex() = " + byteBuf.readerIndex());
System.out.println("byteBuf.writerIndex() = " + byteBuf.writerIndex());
byteBuf.discardReadBytes();
System.out.println("byteBuf.readerIndex() = " + byteBuf.readerIndex());
System.out.println("byteBuf.writerIndex() = " + byteBuf.writerIndex());
byteBuf.clear();
System.out.println("byteBuf.writableBytes() = " + byteBuf.writableBytes());
System.out.println("byteBuf.writableBytes() = " + byteBuf.readableBytes());
}
复制代码
//扩容到了64.。
- 为什么可以做到自动扩容和缩容呢?
和ByteBuffer的相互转换
ByteBuffer byteBuffer = byteBuf.nioBuffer();
int remaining = byteBuffer.remaining();
复制代码
UnpooledHeapByteBuf
- heap 这个是可以推荐使用的,能自动扩容
- direct 内存管理麻烦,建议使用pooled
PooledByteBuf
特点:
- 使用direct实现
- Pool类的实现多样
@Test
void test3(){
PooledByteBufAllocator pooledByteBufAllocator = new PooledByteBufAllocator(true);
ByteBuf buffer = pooledByteBufAllocator.directBuffer();
System.out.println("buffer.isDirect() = " + buffer.isDirect());
}
复制代码
PoolArena的预先分配
- 是多个chunk组成的大块内存区域,每个chunk是一个page或多个page组成
- chunk中的page使用二叉树管理
- 每个page会被切分成大小相等的多个存储快,存储块的大小由第一次申请的内存块大小决定。如果一个page是8个字节,那么第一次申请的块大小是4个字节,那么这个page就包含2个存储块,如果第一次申请的是8个字节,那么这个快大小就是8
- page中存储区域的使用状态通过一个long数组来表示
基于内存池的实现是直接从缓冲区拿一个,而不是new。
总结
本文介绍了netty写流程的很多概念。但是写流程的主要子流程是write和flush,write写到netty的缓冲区,flush才真的写到连接里发出去。netty使用了池化的ByteBuf数据结构来处理IO,对于文件还有特殊的支持。