进阶篇 RocketMQ 原理之消息存储
让珊瑚远离惊涛骇浪的侵蚀吗?那无异是将它们的美丽葬送
「这是我参与2022首次更文挑战的第11天,活动详情查看:2022首次更文挑战」
文件目录结构
MQ 中存储的消息都放在了本地系统文件中,都在~/store文件夹下面了
1. abort
该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动Broker的情况下,发现这个文件是存在的,则说明之前Broker的关闭是非正常关闭。
2. checkpoint
其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳
3. commitlog
其中存放着commitlog文件,而消息是写在commitlog文件中的
4. config
存放着Broker运行期间的一些配置数据
5. consumequeue
其中存放着consumequeue文件,队列就存放在这个目录中
6. index
其中存放着消息索引文件indexFile
7. lock
运行期间使用到的全局资源锁
commitlog文件
在很多资料中commitlog目录中的文件简单就称为commitlog文件。但在源码中,该文件被命名为mappedFile
commitlog目录中存放着很多的mappedFile文件,当前Broker中的所有消息都是落盘到这些mappedFile文件中的。mappedFile文件大小为1G(小于等于1G),文件名由20位十进制数构成,表示当前文件的第一条消息的起始位移偏移量。
第一个文件名一定是20位0构成的。因为第一个文件的第一条消息的偏移量commitlog offset为0,当第一个文件放满时,则会自动生成第二个文件继续存放消息。假设第一个文件大小是1073741824字节(1G = 1073741824字节),则第二个文件名就是00000000001073741824,起始偏移量为1073741824,以此类推,第n个文件名应该是前n-1个文件大小之和,一个Broker中所有mappedFile文件的commitlog offset是连续的,消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
注意
一个Broker中仅包含一个commitlog目录,所有的mappedFile文件都是存放在该目录中的。即无论当前Broker中存放着多少Topic的消息,这些消息都是被顺序写入到了mappedFile文件中的。也就是说,这些消息在Broker中存放时并没有被按照Topic进行分类存放,因为是顺序读写的文件所以访问效率非常高,省略了询盘的时间
消息单元实体
mappedFile文件内容由一个个的消息单元构成。每个消息单元中包含消息总长度MsgLen、消息的物理位置physicalOffset、消息体内容Body、消息体长度BodyLength、消息主题Topic、Topic长度TopicLength、消息生产者BornHost、消息发送时间戳BornTimestamp、消息所在的队列QueueId、消息在Queue中存储的偏移量QueueOffset等近20余项消息相关属性
commitlog中存放了Queue的信息,我们在后续看两者之间的关系
consumequeue
存在的目的
消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的,Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
consumequeue文件可以看成是基于主题Topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
目前只能看到之前创建的TopicTest主题,看看这个主题里面有哪些队列,默认创建了四个队列
默认创建了4个队列0,1,2,3,看看这个队列里面有什么
同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。
消息的写入流程
- Broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即QueueOffset
- 将queueId、queueOffset等数据,与消息一起封装为消息单元
- 将消息单元写入到commitlog
- 同时,形成消息索引条目,将消息索引条目分发到相应的consumequeue
消息的拉取流程
- Consumer获取到其要消费消息所在Queue的消费偏移量offset,计算出其要消费消息的消息offset,(消费offset即消费进度,consumer对某个Queue的消费offset,即消费到了该Queue的第几条消息消息offset = 消费offset + 1)
- Consumer向Broker发送拉取请求,其中会包含其要拉取消息的Queue、消息offset及消息Tag
- Broker计算在该consumequeue中的queueOffset(_queueOffset = _消息_offset * 20_字节)