文章目录
1. 我们这篇博客要做什么
我们用Java原生的代码来模拟RabbitMQ的各种实现,参考官方网站,目的是为了让我们了解rabbitmq的细节
而不是集成spring boot,当然如果想看spring boot集成RabbitMQ可以看这篇博客博客地址,很详细
2 、消息中间件(消息队列)
-
消息中间件的传递模式:点对点point to point 、发布与订阅(publish subscribe)
点对点模式是基于队列的,消息生产发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。
发布订阅模式定义了如何向 个内容节点发布和订阅消息,这个内容节点称为主题 (topic) ,主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用
-
流行的消息中间件
目前开源的消息中间件有很多,比较主流的有 RabbitMQ 、Kafka 、ActiveMQ 、RocketMQ等。
-
消息中间件的优点
提供了以松散藕合的灵活方式集成应用程序的 种机制。它们提供了基于存储和转发的应用程序之间的异步数据发送,用程序彼此不直接通信,而是与作为中介的消息中间件通信 消息中间件提供了有保证的消息发送,应用程序开发人员无须了解远程过程调用 RPC) 和网络通信协议的细节
总结:解耦、存储安全而且可恢复、易扩展、削峰、消息有序、异步通信
-
适用场景
消息中间件适用于需要可靠的数据传送的分布式环境
3、Rabbit初步介绍
3.1 生产者、消费者模型
producer :生产者
consumer: 消费者
Broker :消息中间件服务节点
对于RabbitMQ 来说, RabbitMQ Broker 简单 看作 RabbitMQ 服务节点或者 RabbitMQ 服务实 例,大多数情况下 可以将一个RabbitMQ Broker 看作一台 RabbitMQ服务器
Queue: 队列,是 RabbitMQ 的内部对象,用 于存储消息
3.2 交换器、路由键、绑定
Exchange: 交换器
生产者将消息发送到 Exchange (交换器,通常也可以用大写的 “X” 来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃
3.2.1 交换器类型
fanout、direct、topic、headers
fanout : 广播模式,会把发送到该交换器的消息路由发送到所有与该交换器绑定的队列中
direct: 把消息路由到哪些BindingKey和RoutringKey完全匹配的队列中,最简单的
topic:可以模糊关联
3.2.2 topic交换器,作用:把多个队列根据模糊规则,最后只让一个队列去接收
比如topic.messageA 、topic.messageB、topic.messageC
,我们用topic.messageB
模糊去接收全部带有topic.
的,最后的效果就是发送到topic.messageA 、topic.messageB、topic.messageC
的消息最后都被topic.messageB
接收到了
//队列A
@Bean
public Queue queueMessagesA() {
//RoutingKey
return new Queue("topic.messageA");
}
//队列B
@Bean
public Queue queueMessagesB() {
//RoutingKey
return new Queue("topic.messageB");
}
//topic交换路由
@Bean
TopicExchange exchange() {
//topic交换器名称
return new TopicExchange("exchange");
}
//交换规则
@Bean
Binding bindingExchangeMessages(Queue queueMessagesB, TopicExchange exchange) {
// 队列 交换器 BindingKey
return BindingBuilder.bind(queueMessagesB).to(exchange).with("topic.#");
}
4. 消息安全
4.1.如何保证消息不被重复消费呢?
要回答好这个问题,首先要知道为什么消息会被重复消费,大多都是因为网络不通导致,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。所以解决问题的方式有如下三种思路
- 消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
- 如果你拿到这个消息做redis的set的操作,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
- 如果上面两种情况还不行,准备一个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
4.2.如何保证消息的可靠性传输呢?
其实这个问题是第一个问题的扩展,换而言之,我们要保证可靠性传输,其实就是保证防止生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据而已
其实这些问题早在中间件开发者已经考虑到了,也提供了一些可配置的文件给我们自行设定相关参数,消息队列一般都会持久化到磁盘这个不用担心,然后生产者数据丢失的话MQ的事务会回滚,可以尝试重新发送,消费者丢的的话一般都是采用了自动确认消息模式导致消费信息被删,只要修改为手动确认就行了,也就是说消费者消费完之后,调用一个MQ的确认方法就行了
4.3如何保证从消息队列里拿到的数据按顺序执行?
通过算法,将需要保持先后顺序的消息放到同一个消息队列中,然后只用一个消费者去消费该队列。
rabbitmq:拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue即可
4.4.如何解决消息队列的延时以及过期失效问题?有几百万消息持续积压几小时,怎么解决?
这个问题是生产环境出现事故后的,考察你如何快速的解决问题,,消息队列的延迟和过期失效是消息队列的自我保护机制,目的是为了防止本身被挤爆,当然是可以关闭保护,比如当某个消息消费失败5次后,就把这个消息丢弃等,尽量不要关掉保护机制,那么问题来了,那些被丢弃的消息难道就不要了吗?其实并不是,我们可以针对该业务,查询出来将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把丢的数据给他补回来。
4.5.数据是通过push还是pull方式给到消费端,各自有什么弊端?
Push模型实时性好,但是因为状态维护等问题,难以应用到消息中间件的实践中,因为
在Broker端需要维护Consumer的状态,不好适用于Broker去支持大量的Consumer的场景
Consumer的消费速度是不一致的,Broker进行推送难以处理不同的Consumer的状况
Broker难以应对Consumer无法消费消息的情况,因为不知道Consumer的宕机是短暂的还是永久的)
另外推送消息(量可能会很大)也会加重Consumer的负载或者压垮Consumer。
如果对应只有1个Consumer,用push比pull好。
Pull模式实现起来会相对简单一些,但是实时性取决于轮训的频率,在对实时性要求高的场景不适合使用。