去年年底的时候,为我们的分布式消息队列中间件写源码分析系列文章的时候,看到我们的存储部分是参考了RocketMQ的实现。这部分有一个很有意思的内容
public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) throws IOException {
int currentPos = this.wrotePostion.get();
if (currentPos < this.fileSize) {
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
this.wrotePostion.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: " + this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
可以看到,在写消息的时候,并没有直接写入到mmap出来的MappedByteBuffer中,而是写到了另外一个单独的DirectBuffer中,消息刷盘也要先把DirectBuffer的数据commit到FileChannel中,再由FileChannel刷盘。读消息依旧从MappedByteBuffer中读取,但是只能读取已经commit的数据。
但是在我们的分布式消息队列中,却直接读取DirectBuffer,那么也就不用必须读commit位置前的数据,只要写到DirectBuffer的数据都可以读取消费。
看似带来的好处是消息消费没有了延迟,不需要等待定时任务commit数据后,才能被消费。但是,这违背了RocketMQ设计的初衷,这里不谈对与错、不谈好与坏,我们只谈设计初衷,RocketMQ之所以这么设计,基本初衷是为了从设计层面实现读写分离,而我们的改动恰好完全违背了该设计的初衷。
我们一点一点的看RocketMQ是如何设计的。
首先我们要知道FileChannel与MappedByteBuffer之间的关系
* A mapping, once established, is not dependent upon the file channel * that was used to create it. Closing the channel, in particular, has no * effect upon the validity of the mapping.
MappedByteBuffer自从FileChannel映射mmap出来之后,就基本没有任何关系了。并且我们看FileChannel的force方法有这么一个解释
* <p> This method is only guaranteed to force changes that were made to * this channel's file via the methods defined in this class. It may or * may not force changes that were made by modifying the content of a * {@link MappedByteBuffer <i>mapped byte buffer</i>} obtained by * invoking the {@link #map map} method. Invoking the {@link * MappedByteBuffer#force force} method of the mapped byte buffer will * force changes made to the buffer's content to be written. </p>
FileChannel的force只能保证把通过FileChannel写的数据刷盘,而通过映射出来的MappedByteBuffer写的数据,是不保证刷盘的。所以,要保证写数据与刷盘是通过同一途径。即如果使用DirectBuffer写数据,那么commit与刷盘是都要使用FileChannel相关方法。如果直接写MappedByteBuffer,那么刷盘时要使用MappedByteBuffer相关方法。
这个比较好理解,接下来我们要理解,既然FileChannel与MappedByteBuffer毫无关系,为什么commit之后的数据,就可以在MappedByteBuffer中能读取的到?
我们先来看Page cache的概念。
页缓存是Linux系统内核实现磁盘缓存。它主要用来减少对磁盘的I/O操作。具体的讲,是通过把磁盘中的数据缓存到物理内存中,把对磁盘的访问变为对物理内存的访问。
我们的MappedByteBuffer就是通过mmap出来的一部分page cache。
而linux的写数据又采用了write-back的策略,即写操作直接写到缓存中,后端存储不会立即直接更新,而是将page cache中被写入的页标记为“脏页”,并且被加入到脏页列表中。等待write-back进程或者用户手动刷盘,将脏页列表中的页数据写回到磁盘,再清理“脏页”标识。
所以FileChannel写数据(commit)后,实际也是写到了page cache中,而FileChannel与MappedByteBuffer映射的是同一份物理文件,所以都映射的相同的page cache。通过FileChannel commit后的数据,在MappedByteBuffer中即为可见。
既然利用了linux的page cache,减少了内核空间与用户空间的数据拷贝,那么也要承受其带来的问题,缺页异常与缓存回收。
先看缺页异常,分为两种Minor page faults和Major page faults。
Minor page faults即实际的指令(数据)已经在物理内存page cache中,只是没有被分配给当前进程,只需要通过MMU把当前page cache分配给当前进程即可,不需要磁盘I/O。
Major page faults就是我们常说的page faults了,数据不在物理内存page cache中,就需要访问磁盘读取数据,再放到物理内存中。
而实际上linux采用的都是“copy on write”(注意这里与我们Java中的CopyOnWrite系列集合不是一个概念),即mmap的时候,不会实际分配物理内存,而只是提前分配了虚拟内存,只有当真正去使用这些数据的时候,才会通过page faults实际映射文件。
在RocketMQ中为了避免这种“copy on write”的策略,采用了预热的方式,创建并mmap文件后,提前把每页都写一个字节的数据。
再看第二个问题,缓存回收。物理内存毕竟有限,为了给更重要的数据腾出空间或者收缩缓存空间,需要把一部分page清除。linux采用双链策略,即维护两个LRU链表:活跃链表与非活跃链表。只会替换非活跃链表缓存,活跃链表的缓存都从非活跃链表晋升而来。这就带来了一个问题,我们辛辛苦苦映射到物理内存的page cache,如果被换出,再次访问时又需要page faults。所以在RocketMQ中采用了内存锁定,通过mlock使得我们创建的DirectBuffer和MappedByteBuffer不会被换出。
在接下来我们需要理解为什么写DirectBuffer而不是HeapBuffer。我们看FileChannel的write方法,最终调用了IOUtil的write方法,在写数据时会判断是否为DirectBuffer,如果不是的话,会临时创建DirectBuffer并拷贝数据,再通过DirectBuffer写数据。这是为了避免写数据时,发生GC导致数据发生变化。所以采用DirectBuffer省去了拷贝数据的过程,进一步提高了性能。
既然无论是写MappedByteBuffer还是写DirectBuffer,最终都是先写到了page cache中,又是如何做到读写分离的?在RocketMQ中,数据写入到DirectBuffer中不会立即commit,而是需要积攒若干页(默认4页)后,批量commit。一方面是为了提高性能,减少了操作系统脏页刷新的频率。另一方面也正是做到了读写分离。因为commit以整页为单位,没有被commit的数据是不会被读取到,所以不会出现同时读写同一page的情况。并且commit是批量定时进行,又进一步减少了page cache读写冲突的概率。
我们再看下linux脏页刷盘的三种场景:
- 空闲内存低于一定阈值时,linux会写回脏页以便释放内存。
- 脏页驻留时间超过一定阈值,linux会写回脏页。
- 用户进行手动触发。
我们批量写数据,再配合定时手动触发回写,也就避免了linux内核进行脏页回写。同样提高了性能。
综上,RocketMQ在设计层面实现了读写分离的效果,并且在性能方面做到了很极致。同样事情都有利弊,这也导致了数据消费的及时性,即数据必须要等到commit后才可以被消费到。
再返回来看,我们的分布式消息队列参考了RocketMQ的存储实现,但是却修改了读取消息的部分,采用了直接读取DirectBuffer的方式,这种做法是否妥当,还值得商榷。