1、高可用机制
通过Master
和Slave
的配合达到高可用性
在Broker
中的配置文件中,brokerId
为0的则是Master
,大于0的是Slave
Master
角色的Broker支持读写,Slave
的Broker仅支持读,也就是说Produer只能和Master的Broker连接写消息,Consumer随机
Consumer的可用性:在Consumer的配置文件中,不需要设置配置,当Master不可用或者繁忙的时候,Consumer会被自动切换到Slave读取。
Produer的可用性:在创建Topic
的时候,把 Topic
的多个Message Queue创建在多个Broker组上(相同Broker名称,不同的brokerId
的机器组成一个Broker组),当一个Broker组的Master不可用后,其他组的Master仍然可用。
!!!!RocketMQ还不支持Slave转成Master,需要手动修改配置文件
2、同步刷盘/异步刷盘
RocketMQ
的消息存储在磁盘上,可以保证消息不丢失,还可以保证消息超出内存的限制。
1)异步刷盘
在写返回成功之后,消息只是可能被写入了内存的pageCache中,写操作返回快,吞吐量大。当消息积累到一定程度的时候,就会触发统一写入磁盘。
2)同步刷盘
在写返回成功之后,消息已经被写入磁盘。
RocketMQ同步刷盘的大致做法是,基于生产者消费者模型,主线程创建刷盘请求实例—GroupCommitRequest并在放入刷盘写队列后唤醒同步刷盘线程—GroupCommitService,来执行刷盘动作(其中用了CAS变量和CountDownLatch来保证线程间的同步)。
这里,RocketMQ源码中用读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费生成的同步刷盘请求可以不用加锁,提高并发度。
1、首先消息写入内存的pageCache
后,立即通知刷盘线程刷盘,然后等待刷盘的完成(阻塞),刷完成功后唤醒等待线程,返回写入成功。
配置文件
通过Broker的flushDiskType
的sync_flush
和async_flush
3、同步复制/异步复制
如果有一个Broker组有 Master 和 Slave ,消息需要从 Master 复制到 Slave,有同步 和 异步
1)同步复制
如果 Master 出现故障,数据已全部复制,但是同步产生写入延迟,降低系统的吞吐量。
2)异步复制
异步只要Master写入成功,异步返回给客户端即可
但是如果Master 出现故障,有些数据没有被写入 Slave,会产生数据丢失的情况
4、消息存储及持久化
(1)CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容。单个文件大小默认1G ,消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
通过建立类似索引文件—ConsumeQueue的方式来区分不同Topic下面的不同MessageQueue的消息,同时为消费消息起到一定的缓冲作用(只有ReputMessageService异步服务线程通过doDispatch异步生成了ConsumeQueue队列的元素后,Consumer端才能进行消费)
(2) ConsumeQueue:消息消费的逻辑队列,其中包含了这个MessageQueue在CommitLog中的起始物理位置偏移量offset,消息实体内容的大小和Message Tag的HashCode。从实际物理存储来说,ConsumeQueue对应每个Topic和QueuId下面的文件。单个文件大小约5.72M,每个文件由30W条数据组成,每个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满了,则写入下一个文件;
(3)IndexFile:用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引;
(4)MapedFileQueue:MappedFileQueue
存储队列,数据定时删除,无限增长。
对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该offset所在MappedFile(具体物理存储位置的抽象)、创建、删除MappedFile等操作;
(5)MappedFile:文件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写入PageCache缓存区(commit),或者原子性地将消息持久化的刷盘(flush);
(6)ProcessQueue:主要是Message Queue的一个快照,由TreeMap和一个读写锁。TreeMap以 Message Queue的 Offset作为 Key,消息内容的引用作为Value
消息存储由ConsumerQueue
和CommitLog
配合完成。
CommitLog
描述的是整个CommitLog
目录,而MappedFileQueue
描述的则是CommitLog File
数组容器,而MappedFile
描述一个CommitLog File
MappedFile
是 PageCache
文件封装,操作物理文件在内存中的映射以及将内存数据持久化到物理文件中,代码中写死了要求os
系统的页大小为4k
, 消息刷盘根据参数(commitLog
默认至少刷4 页,consumeQueue
默认至少刷2 页)才刷
commitLog
其实调用存储消费队列mappedFileQueue.load()
方法来加载的。
队列有多个文件(MappedFile
)组成,由集合对象List 表示升序排列,前面讲到文件名即是消息在此文件的中初始偏移量,排好序后组成了一个连续的消息队,当消息到达broker
时,需要获取最新的MappedFile
写入数据,调用MappedFileQueue
的getLastMappedFile
获取,此函数如果集合中一个也没有创建一个,如果最后一个写满了也创建一个新的。
MappedFileQueue
在获取getLastMappedFile
时,如果需要创建新的MappedFile
会计算出下一个MappedFile
文件地址,通过预分配服务AllocateMappedFileService
异步预创建下一个MappedFile
文件,这样下次创建新文件请求就不要等待,因为创建文件特别是一个1G
的文件还是有点耗时的
1、消息真正的物理存储文件是CommitLog
,每台Boker
上的CommitLog
被本机所有的ConsumerQueue
共享。
在CommitLog
中,一个消息的存储长度是不固定的,ConsumerQueue
的内容也会被持久化。
1)CommitLog
顺序写,可以提高写入效率
2)CommitLog
随机读,虽然是随机读,但是利用了系统的pageCache
机制,可以批量从磁盘读
RocketMq
采用mmap+write
的方式进行I/O操作
内存锁定
Linux
系统为了优化IO读写的效率与速度,引入了一种内存机制(物理内存),即数据从磁盘到内存的复制过程由内核实现,而实现的基础则是pageCache
,pageCache
的大小默认是4kb
。关于pageCache
的内容很多,笔者对此也了解较浅,故不做赘述,后续深入了解后再补充。
物理内存是有操作系统级别控制,当运行的Java 进程结束后,物理内存也不会理解释放,该问题进一步导致在Linux系统中程序频繁读写文件后,可用物理内存变少。当系统的物理内存不够用的时,就需要将物理内存中的一部分空间释放出来,以供当前运行的程序使用。那些被释放的空间可能来自一些很长时间没有什么操作的程序,这些被释放的空间被临时保存到Swap空间中,等到那些程序要运行时,再从Swap分区中恢复保存的数据到内存中。这样,系统总是在物理内存不够时,才进行Swap交换。为了减少系统级别的Swap交换,RocketMQ
通过使用mlock
来锁定内存。
mlock
的作用如下:
-
被锁定的物理内存在被解锁或进程退出前,不会被页回收流程处理。
-
被锁定的物理内存,不会被交换到swap分区设备。
-
进程执行
mlock
操作时,内核会立刻分配物理内存(注意COW的情况)
内存预热
日常中使用缓存来解决系统的性能问题,减少对底层数据库的直接读写,降低数据库的读压力,这个过程在操作系统IO读写亦是同样的道理。pageCache可以理解为系统缓存,而内存预热的目的则是建议操作系统预先将文件内容加载至pageCache,当读取数据时会优先判断是否命中pageCache,如果无法命中则会抛出一次缺页中断,直接从磁盘读取,一次降低了IO吞吐量。
madivse
函数的意义是建议操作系统加载数据至pageCache中,方法参数:int madvise(void *addr 、长度 size_t , int 建议),如下提供两个常见的建议
:
-
madv_willneed 预计在不久的将来访问(因此,可能最好已阅读一些页面 .)
-
madv_dontneed 不要期待在不久的将来访问(用的时间.用给定的范围后,使内核可以释放与它关联的资源.)在此范围内的页的后续访问都将成功,但从基础会在重新装入存储器内容的映射文件(看到mmap(2))在没有基本映射的页面请求或零填充。
RocketMQ
在创建文件时正是使用了madv_willneed
,由于文件创建的方式由异步线程完成,故而内存预热对于当前的IO读写影响不大
RocketMQ的做法是,在做Mmap内存映射的同时进行madvise系统调用,目的是使OS做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果
ConsumerQueue
是消息的逻辑队列
ConsumerQueue
里只存偏移量信息-------QueueOffset
相当于字典的目录用来指定消息在消息的真正的物理文件CommitLog
上的位置
预先分配MappedFile
1、在消息写入过程中(调用CommitLog
的putMessage()
方法),CommitLog
会先从MappedFileQueue
队列中获取一个 MappedFile
,如果没有就新建一个。
2、MappedFile
的创建过程是将构建好的一个AllocateRequest
请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为AllocateRequest
对象)添加至队列中
3、后台运行的AllocateMappedFileService
服务线程(在Broker启动时,该线程就会创建并运行),会不停地run,只要请求队列里存在请求,就会去执行MappedFile
映射文件的创建和预分配工作
4、分配的时候有两种策略,一种是使用Mmap
的方式来构建MappedFile
实例,另外一种是从TransientStorePool
堆外内存池中获取相应的DirectByteBuffer
来构建MappedFile
(ps:具体采用哪种策略,也与刷盘的方式有关)。
5、并且,在创建分配完下个MappedFile
后,还会将下下个MappedFile
预先创建并保存至请求队列中等待下次获取时直接返回。
5、消息有序无序
1)消息生产与消息消费相互分离,Producer端发送消息最终写入的是CommitLog(消息存储的日志数据文件),Consumer端先从ConsumeQueue(消息逻辑队列)读取持久化消息的起始物理位置偏移量offset、大小size和消息Tag的HashCode值,随后再从CommitLog中进行读取待拉取消费消息的真正实体内容部分;
1、首先构建一个线程池来接收消费请求 ConsumerRequest
,构建一个单线程的本地线程,用来稍后定时重新消费ConsumerRequest
,用来执行定时周期性的锁队列任务。
2、获取正在消费队列列表ProcessQueueTable
所有的MessageQueue
,其实就是 broker的queue中标记一下消费端,表示这个Queue被某个锁定。
3、ConsumeRequest
是由ProcessQueue
和Messagequeue
组成
每个Messagequeue都会判断processqueue的lock属性是否为 true ,不是的话锁定放到本地线程后再消费
通过ReblanceImp的lockAll方法每隔一段时间定时锁住当前消费端消费的队列。
全局有序
默认不保证有序消息,在数据读取的过程中,可能有多个Consumer,每个 Consumer 也可能启动多个线程去消费,所以消息被哪个Consumer 消费,顺序是不能保证
要保证全局顺序消息, 需要先把Topic 的读写队列数设置为一,然后Producer 和Consumer 的并发设置也要是一。
简单来说,为了保证整个Topic 的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。这时高并发、高吞吐量的功能完全用不上了
部分顺序消息
发送端使用MessageQueueSelector 类来控制把消息发往哪个Message Queue
消费端通过使用MessageListenerOrderly 类来解决单Message Queue 的消息被并发处理的问题
解决方案
1、保证消费逻辑的幕等性(多次调用和一次调用效果相同)
2、维护一个巳消费消息的记录,消费前查询这个消息是否被消费过
6、死信队列(Dead-Letter-Queue)
rocketMQ和其他MQ的死信有所区别
1、消息被拒并且不重新投递
2、消费端一直不回传消费的结果。rocketmq认为消息没收到,consumer下一次拉取,broker依然会发送该消息。
所以,任何异常都要捕获返回ConsumeConcurrentlyStatus.RECONSUME_LATER
rocketmq会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup
重试的消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,此时需要人工干预了
使用DefaultMQPullConsumer才可以修改失败次数,DefaultMQPushConsumer不能修改此值。
如果你去查询这个%DLQ%TestConsumer1死信队列,会发现查询不到。
但是到broker上看,这个Queue是有的,topic有一个权限设置perm,2为写,4为读,6为读写。