文章目录
1.RabbitMQ的安装
因为linux安装实在是问题很多,建议windows系统的小伙伴,采用windows版本的RabbitMQ来安装。
可以参照:https://blog.csdn.net/qq_31634461/article/details/79377256
安装好之后,每一次执行项目,如果没有开启RabbitMQ的服务,IDEA会报错,记住,以管理员身份进入cmd界面,执行net start RabbitMQ
开启服务,net stop RabbitMQ
关闭服务。
2.新建测试项目
- 新建SpringBoot项目,添加相应的pom.xml依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server.port=8888
spring.application.name=springcloud-mq
## rabbitMqd的配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
- 创建消息队列
@Configuration
public class QueueConfig {
@Bean
public Queue createQueue(){
return new Queue("hello-queue");
}
}
- 创建消息的发送方
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
// 向消息队列发送消息
public void send(String msg) {
// 参数1:队列的名称 参数2:消息
this.amqpTemplate.convertAndSend("hello-queue", msg);
}
}
- 创建消息的接收方
这里需要指定相应的队列名称,采用的是消息队列监听的机制!
@Component
public class Receiver {
/**
* 接收消息的方法,采用消息队列监听机制
* @param msg
*/
@RabbitListener(queues = "hello-queue")
public void receive(String msg){
System.out.println("receiver:"+msg);
}
}
- 创建测试类
代码中的循环块是为了在消息队列的可视化视图管理工具中能够看见响应的数据。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class QueueTest {
@Autowired
private Sender sender;
@Test
public void test1() throws Exception{
while (true) {
Thread.sleep(1000);
this.sender.send("你好,欢迎使用RabbitMQ消息队列!");
}
}
}
3.RabbitMQ基础知识
1.Provider
消息生产者,即投递消息的程序。
2.Consumer
消息消费者,即接收消息的程序。
3.没有使用消息队列,A->B
graph LR
A-->B
4.使用了消息队列,B相当于消息队列
graph LR
A-->B
B-->c
5.队列
简单来说,队列就是一个的缓冲区,就相当于出货时候的仓库,即中转站。
6.队列中存储的内容是什么?
在RabbitMQ中,信息流从应用程序出发来到RabbitMQ的队列之中,所有的信息可以只存在一个队列之中。队列中可以存放很多信息,因为它基本是一个无限制的缓冲区,前提是消息队列所在的机器内存充足!
7.队列与应用程序的关系?
如下图所示,多个生产者可以将消息放入队列之中,与之相对,多个消费者可以从队列中取出消息。
graph LR
A-->B
C-->B
D-->B
B-->E
B-->F
B-->G
3.Direct交换器
Direct交换器:发布与订阅、完全匹配型。
Direct交换器模式实质就是一对一的服务,你订阅什么服务,最后给你提供的就是什么服务。
业务场景:系统日志处理场景
1.微服务产生的日志,交给日志处理服务器去处理
2.日志处理服务器有四个服务,分别为DEBUG/INFO/WARN/ERROR等
3.服务直接采用的是direct(分布订阅)
由图可以知道,此交换器涉及服务者与消费者,在创建项目的时候需要创建服务的提供者与服务的接受者。
首先配置相应的properties:
server.port=8888
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 设置交换器的名称
mq.config.exchange=log.direct
# 设置info队列名称
mq.config.info.queue=log.info
# 设置info路由键
mq.config.info.queue.routing.key=log.info.routing.key
# error队列名称
mq.config.error.queue=log.error
# 设置error路由键
mq.config.error.queue.routing.key=log.error.routing.key
接收者,也就是服务处理的一方:
/**
* bindings中有三个参数:
* @QueueBinding: value 表示绑定队列名称
* exchange 表示绑定的交换器
* key 表示绑定的路由键
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.info.queue}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.info.queue.routing.key}"
)
)
public class InfoReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Info............receiver:" + msg);
}
}
发送方,也就是服务提供的一方:提供交换器的名称以及路由键就可以找到服务的处理方
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
// 路由键名称
@Value("${mq.config.info.queue.routing.key}")
private String routingKey;
// 交换器
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg) {
this.amqpTemplate.convertAndSend(this.exchange, this.routingKey, msg);
}
}
4.Topic交换器
Topic交换器是一种主题,规则交换器。
设定一种规则,符合此规则的则流入到相应的队列当中,被相应的处理程序所处理。
# provider的properties配置
# 注意,由于Topic模式不是一对一的,所以对于提供方只需要知道交换器是谁就行
server.port=8888
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 设置交换器的名称
mq.config.exchange=log.topic
# consumer的properties配置
# 注意,由于Topic模式的处理阶段也是从队列中获取,所以对于消费者需要知道相应的队列的准确的信息。
server.port=8888
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 设置交换器的名称
mq.config.exchange=log.topic
# 设置info队列名称
mq.config.info.queue=log.info
# error队列名称
mq.config.error.queue=log.error
# 总的处理队列名称
mq.config.all.queues=log.all
编写相应的provider类信息:
注意:在convertAndSend
中的第二个参数就是Topic交换器中的规则参数,在Consumer中会设置相应的规则。
// UserSender类
@Component
public class UserSender {
@Autowired
private AmqpTemplate amqpTemplate;
// 交换器
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg) {
this.amqpTemplate.convertAndSend(this.exchange, "user.log.debug", "debug--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "user.log.info", "info--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "user.log.error", "error--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "user.log.warn", "warn--"+msg);
}
}
// ProviderSender类
@Component
public class ProviderSender {
@Autowired
private AmqpTemplate amqpTemplate;
// 交换器
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg) {
this.amqpTemplate.convertAndSend(this.exchange, "provider.log.debug", "debug--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "provider.log.info", "info--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "provider.log.error", "error--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "provider.log.warn", "warn--"+msg);
}
}
// OrderSender类
@Component
public class OrderSender {
@Autowired
private AmqpTemplate amqpTemplate;
// 交换器
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg) {
this.amqpTemplate.convertAndSend(this.exchange, "order.log.debug", "debug--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "order.log.info", "info--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "order.log.error", "error--"+msg);
this.amqpTemplate.convertAndSend(this.exchange, "order.log.warn", "warn--"+msg);
}
}
编写相应的Consumer类信息:
// Info的接收者,规则在此作为一种校验规则,以此是的服务进入相应的队列之中
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.info.queue}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "*.log.info"
)
)
public class InfoReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Info............receiver:" + msg);
}
}
// Error的接收者
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.error.queue}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "*.log.error"
)
)
public class ErrorReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Error............receiver:" + msg);
}
}
// All所有日志的接收者
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.all.queues}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "*.log.*"
)
)
public class AllReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("all............receiver:" + msg);
}
}
最后的测试类:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class SenderTest {
@Autowired
private ProviderSender providerSender;
@Autowired
private UserSender userSender;
@Autowired
private OrderSender orderSender;
@Test
public void test1() throws InterruptedException {
this.providerSender.send("provider!");
this.userSender.send("userSender!");
this.orderSender.send("orderSender!");
}
}
5.Fanout交换器
Fanout交换器的原理就是广播形式的消息发送,与此交换器相连的消息队列,都会收到消息。
相比于Direct的实现形式,Fanout没有一对一,是一对多,所以限制相连的key条件就没有了,相对于Topic交换器的实现,Fanout的校验规则为空,表示只要与交换器相连的都视为可以接收对象。
# provider的properties文件
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 设置交换器的名称
mq.config.exchange=order.fanout
# ------------------------------------------------
# consumer的properties文件,consumer从队列中获取提供的服务(信息)
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 设置交换器的名称
mq.config.exchange=order.fanout
# sms队列名称
mq.config.sms.queue=order.sms
# push队列名称
mq.config.push.queue=order.push
编写provider的代码:provider的实现是以广播形式的,所以只需要一个就好。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class SenderTest {
@Autowired
private OrderSender orderSender;
@Test
public void test1() throws InterruptedException {
this.orderSender.send("广播发送信息!!!!");
}
}
编写consumer的代码:注意,这里没有key,也就是没有限制条件,只要交换器一致即满足条件。
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.push.queue}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
)
)
public class PushReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("push--queue----:" + msg);
}
}
// ---------------------------------------------------
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.sms.queue}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
)
)
public class SmsReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("sms--queue----:" + msg);
}
}
测试类:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class SenderTest {
@Autowired
private OrderSender orderSender;
@Test
public void test1() throws InterruptedException {
this.orderSender.send("广播发送信息!!!!");
}
}
6.使用RabbitMQ实现松耦合设计
解耦:对于添加代码的行为开放,对于修改代码的行为关闭,称为解耦操作。
经典的RabbitMQ的松耦合操作,是基于队列来实现的,无论是使用Direct、Topic或者是Fanout,都可以实现解耦合操作,以Fanout为例,只需要添加一个红包队列,添加一个红包的接收消费者,即可以实现订单的红包发送,业务发展过程中如果还需要扩展其他功能模块,只需要添加代码即可,不需要发生修改代码的操作。
# 红包队列名称 新增的
mq.config.red.queue=order.red
// 新增的代码
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.red.queue}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
)
)
public class RedReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("十元红包优惠券已经就位....red--queue----:" + msg);
}
}
测试代码不变,即可以看到相应红包功能的扩展行为。
7.RabbitMQ的数据丢失及解决方法
RabbitMQ在运行的时候,如果@Queue
中的属性auto-delete
设置为true表示为,当服务关闭的时候,此队列被删除。此时如果人为的中断消费端服务,此时队列已经不再运行,但是服务端还在继续提供,只是没有被接收到,此时如果没有阻断ACK确认的手段,服务端是认为消息被接收而一直发送,这就导致两次运行时候的接收的结果不一致,导致数据丢失的情况产生。
要解决数据丢失的问题,需要从消息队列上来做文章,此时,需要将属性auto-delete
改为false,此时当服务关闭的时候,队列不会被删除,而是一直在存储信息,直到消息再次回复,数据会被拿出来消费,不会有数据丢失。
修改属性以后,重新测试,然后中断消费者端服务。
8.RabbitMQ的ACK确认机制
ACK确认机制简单来说就是在提供者与消费者之间有一个协议,当消费者接受到消息的时候需要反馈给提供者一个ACK消息,表示接收到了此消息,然后此消息会从RabbitMQ的消息队列中被删除,如果由于网络异常或者服务器异常等原因造成消费端没有给提供端反馈ACK,那么提供端将会再次把消息放入到消息队列中,只有正确的收到ACK反馈,消息才会从消息队列中被删除,否则消息将永远不会被删除。
当你忘记了ACK机制,后果将会很严重,当Consumer退出以后,提供者将会一直发送信息,这些信息会存在RabbitMQ的服务器中,直到服务器的内存被占满为止,随之造成内存泄漏等现象。
最简单的模拟ACK异常,就是在消费端抛出一个不被处理的异常即可。
public class InfoReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Info............receiver:" + msg);
throw new RuntimeException("哈哈哈");
}
}
此时启动项目,控制台会一直处于抖动状态,一直在刷新,陷入死循环!
此时看RabbitMQ的后台控制界面:
要解决ACK反馈异常,需要开启重试属性,以及开次重试次数的限制,以此确保在发生ACK异常的时候,服务器不至于发生内存泄漏的严重后果。
# 开启重试
spring.rabbitmq.listener.direct.retry.enabled=true
# 设置重试次数
spring.rabbitmq.listener.direct.retry.max-attempts=3