SpringBoot整合RabbitMq
参考以下博主的文章
RabbitMQ入门 安装 SpringAMQP简单队列、工作队列、发布订阅(扇出模式,广播模式)、Direct模式(Roting模式)、Topic模式
RabbitMQ(2)、MQ问题:消息可靠性、延迟消息( 延迟队列(插件 ))、消息堆积(惰性队列)、MQ的高可用。ConfirmCallback机制、ReturnCallback机制、死信交换机
我这里只会记录如何整合SpringBoot,安装和部署的具体详情可以看上面这位博主写的文章。
一、环境准备
1、pom.xml
SpringBoot整合RabbitMQ核心只有这个
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.yml
2.1、基础配置
最简单就是链接上RabbitMQ服务就可以了
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# rabbitmq虚拟机: 默认就是/ 你可以当他是一个命名空间,这个空间下有很多的交换机、队列。
# 一般默认都不会自己创建新的虚拟机,因为业务不会多到需要分这么多rabbitmq虚拟机。
virtual-host: /
2.2、进阶配置
这个是豪华版,后面会慢慢说,开始就先按基础配置先。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#virtual-host: /
# 开启发布确认 消息从 producer 到 exchange 有一个 confirmCallback 确认模式 ->用于给生产者确认消息是否到达交换机
publisher-confirm-type: correlated
# 开启回退模式 消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
publisher-returns: true
template:
# 如果消息没有送达queue,就强制回退消息。true : 消息路由失败时会调用returnCallback回退消息 | false : 消息路由失败时会丢弃消息
mandatory: true
listener:
simple:
#消费者一次抓取几条消息
prefetch: 1
# none(无应答模式) auto(自动应答模式) manual(手动应答模式)
acknowledge-mode: auto
retry:
#开启重试 消费者本地的失败重试
enabled: true
#初始的失败等待时长,单位是ms,默认1000
initial-interval: 1000
#与上次重试间隔时长的倍数(1表示每次重试的时间间隔相同)。默认1
multiplier: 1
#最大重试的间隔,默认值10000
max-interval: 10000
#最多重试几次。默认3
max-attempts: 3
#是否无状态。默认true。如果涉及事务,要改成false
stateless: true
二、RabbitMQ入门
RabbitMQ提供了多种工作模式:点击这里跳转查看
1、简单模式
由生产者直接发送消息给消费者(也不考虑消息是否成功送达给消费者)
定义队列
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SimpleQueueConfig {
/**
* 声明 一个队列,队列名称为simple.queue
* 注意:Queue类是org.springframework.amqp.core.Queue,不要导错了
*/
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("simple.queue").build();
}
}
模拟生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class SimpleQueueTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟生产者
@Test
public void test(){
//参数1:队列名称
//参数2:消息内容
rabbitTemplate.convertAndSend("simple.queue","生产者,制造中...");
}
}
因为没有定义交换机,队列默认走了RabbitMQ默认的交换机
模拟消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
*/
@Component
public class SimpleQueueListener {
/**
* 监听名字为simple.queue的队列消息
* @param msg 生产者发送的消息
*/
@RabbitListener(queues = "simple.queue")
public void handleSimpleQueueMsg(String msg){
System.out.println("接收到simple.queue队列的消息,正在消费【simple.queue】队列的消息:"+msg);
}
}
重启SpringBoot服务,就会看到simple.queue队列的消息立刻给消费了
2、工作模式
一个生产者对多个消费者,消息不会重复消费,而且分摊给消费者。
application.yml添加配置
spring:
rabbitmq:
listener:
simple:
#消费者一次抓取几条消息,如果不设置这个,会发生分配不均的情况
prefetch: 1
定义队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* ↗C(消费者1)
* P(生产者) -> 交换机(默认) -> 队列(work.queue) ->
* ↘C(消费者2)
*/
@Configuration
public class WorkQueueConfig {
//定义持久化队列
@Bean
public Queue workQueue(){
//参数1 name : 队列名称
//参数2 durable : 是否持久化,默认是true,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
//参数3 exclusive : 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
//参数4 autoDelete : 默认false是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
return new Queue("work.queue",true,false,false);
}
}
模拟生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class WorkQueueTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟生产者
@Test
public void test() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("work.queue", "这是work队列消息" + i);
}
}
}
调用生产者后
模拟消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
*/
@Component
public class WorkQueueListener {
@RabbitListener(queues = "work.queue")
public void workQueueMsg1(String msg){
System.out.println("workQueueMsg1:接收到work.queue队列的消息,正在消费【work.queue】队列的消息:"+msg);
}
@RabbitListener(queues = "work.queue")
public void workQueueMsg2(String msg){
System.out.println("workQueueMsg2:接收到work.queue队列的消息,正在消费【work.queue】队列的消息:"+msg);
}
}
重启SpringBoot后
3、发布/订阅模式
向多个队列的消费者传递同要一条消息,就是发布/订阅模式。类似商家要搞活动了,向指定会员或者所有会员发送活动短信。
这里就要说到交换机了,就是图上的X。
Exchange类型:
Fanout:广播,交换机绑定了哪些队列,就把消息投递给这些队列
Direct: 定向,把消息交给符合指定routing key的队列
Topic:通配符,把消息交给符合routing pattern的队列
3.1、Fanout交换机
Fanout:广播,Fanout交换机绑定了哪些队列,就把消息投递给这些队列
定义Fanout类型交换机,定义队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* ↗ fanout.queue1 -> C(消费者1)
* P(生产者) -> fanout.exchange
* ↘ fanout.queue2 -> C(消费者2)
*
* 操作:
* 1. 声明Fanout类型的交换机:fanout.exchange
*
* 2. 声明队列1:fanout.queue1
* 声明队列2:fanout.queue2
*
* 3. 把交换机与队列1绑定
* 把交换机与队列2绑定
*/
@Configuration
public class FanoutQueueConfig {
//定义持久化交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange",true,false);
}
//定义持久化队列
@Bean
public Queue fanoutQueue1(){
//参数1 name : 队列名称
//参数2 durable : 是否持久化,默认是true,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
//参数3 exclusive : 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
//参数4 autoDelete : 默认false是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
return new Queue("fanout.queue1",true,false,false);
}
//定义持久化队列
@Bean
public Queue fanoutQueue2(){
//参数1 name : 队列名称
//参数2 durable : 是否持久化,默认是true,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
//参数3 exclusive : 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
//参数4 autoDelete : 默认false是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
return new Queue("fanout.queue2",true,false,false);
}
//fanoutExchange交换机与队列fanoutQueue1绑定
@Bean
public Binding fanoutQueue1Biding(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
//fanoutExchange交换机与队列fanoutQueue2绑定
@Bean
public Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
模拟生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class FanoutQueueTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟生产者
@Test
public void test() {
//参数1: 交换机名称
//参数2: 路由key : fanout类型的交换机不需要使用路由key,把这个值写成空的
//参数3: 消息内容
rabbitTemplate.convertAndSend("fanout.exchange", "", "这是一条fanout广播消息");
}
}
调用生产者后,你会看到绑定了该交换机的队列都收到消息了
![在这里插入图片描述](https://img-blog.csdnimg.cn/d08a2a3c31cb4e0a960b88f68608202c.png)
模拟消费者
```java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//消费者
@Component
public class FanoutQueueListener {
@RabbitListener(queues = "fanout.queue1")
public void fanoutQueue1Msg(String msg) {
System.out.println("fanout.queue1队列收到fanout.exchange交换机发送的消息:"+msg);
}
@RabbitListener(queues = "fanout.queue2")
public void fanoutQueue2Msg(String msg) {
System.out.println("fanout.queue2队列收到fanout.exchange交换机发送的消息:"+msg);
}
}
添加完上面监听队列的代码,重启服务后,可以看到监听绑定fanout.exchange交换机的队列都收到这一条消息
3.2、Direct交换机
Direct: 定向投递消息,把消息交给符合指定routing key的队列
说明:
队列在绑定Direct类型交换机时,需要给队列指定一个Routing Key(路由key)
生产者在发送消息时,必须指定消息的Routing Key
交换机根据消息的RoutingKey进行判断:只有队列的RoutingKey 与 消息的RoutingKey完全相同,才会收到消息
定义Direct类型交换机,定义队列并且设置RoutingKey
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 需要:
* 1. 声明Direct类型的交换机:direct.exchange
* 2. 声明队列1:direct.queue1
* 声明队列2:direct.queue2
* 3. 把交换机与队列1绑定:把 RoutingKey为orange的消息 -> 投递到队列1
* 把交换机与队列2绑定:把 RoutingKey为black的消息 -> 投递到队列2
* 把交换机与队列2绑定:把 RoutingKey为green的消息 -> 投递到队列2
*
* ↗ RoutingKey = orange -> direct.queue1 -> C(消费者1)
* P(生产者) -> direct.exchange
* ↘ RoutingKey = black
* -> direct.queue2 -> C(消费者2)
* ↘ RoutingKey = green
*
*/
@Configuration
public class DirectQueueConfig {
//定义direct类型交换机
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("direct.exchange").build();
}
//定义持久化队列
@Bean
public Queue directQueue1() {
return new Queue("direct.queue1",true,false,false);
}
//定义持久化队列
@Bean
public Queue directQueue2() {
return QueueBuilder.durable("direct.queue2").build();
}
//把交换机与队列direct.queue1绑定:把 RoutingKey为orange的消息,投递到队列direct.queue1
@Bean
public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("orange");
}
//把交换机与队列direct.queue2绑定:把 RoutingKey为black的消息,投递到队列direct.queue2
@Bean
public Binding directQueue2BindingInfo(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("black");
}
//把交换机与队列direct.queue2绑定:把 RoutingKey为green的消息,投递到队列direct.queue2
@Bean
public Binding direcQueue2BindingError(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("green");
}
}
模拟生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class DirectQueueTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟生产者
@Test
public void test(){
//rabbitTemplate.convertAndSend("direct.exchange","orange","这是orange");
//rabbitTemplate.convertAndSend("direct.exchange","black","这是black");
rabbitTemplate.convertAndSend("direct.exchange","green","这是green");
}
}
模拟消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectQueueListener {
@RabbitListener(queues = "direct.queue1")
public void directQueue1(String msg) {
System.out.println("我是direct.queue1队列,接收到消息:"+msg);
}
@RabbitListener(queues = "direct.queue2")
public void directQueue2(String msg) {
System.out.println("我是direct.queue2队列,接收到消息:"+msg);
}
}
这里就不演示了,自己测试下
3.3、Topic交换机
Topic:通配符,把消息交给符合routing pattern的队列
RoutingKey:发送到Topic类型交换机的消息不能有任意的routing键
必须是由点分隔的单词列表
可以有任意多个单词,最多255个字节
可使用 * 星号,匹配一个单词
可使用 # 井号,匹配0个或多个单词
定义Topic类型交换机,定义队列并且绑定
/**
* 需要:
* 1. 声明Topic类型的交换机:topic.exchange
* 2. 声明队列1:topic.queue1
* 声明队列2:topic.queue2
* 3. 把交换机与队列1绑定:将RoutingKey为*.orange.*的消息,投递到topic.queue1
* 把交换机与队列2绑定:将RoutingKey为 *.*.rabbit 和 lazy.# 的消息,投递到topic.queue2
*
* ↗ *.orange.* -> topic.queue1 -> C(消费者1)
* P(生产者) -> topic.exchange
* ↘ *.*.rabbit -> topic.queue2 -> C(消费者2)
* ↘ lazy.# -> topic.queue2 -> C(消费者2)
*/
@Configuration
public class TopicQueueConfig {
//定义topic类型交换机
@Bean
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange("topic.exchange").build();
}
//定义持久化队列
@Bean
public Queue topicQueue1(){
return QueueBuilder.durable("topic.queue1").build();
}
//定义持久化队列
@Bean
public Queue topicQueue2(){
return QueueBuilder.durable("topic.queue2").build();
}
//队列topic.queue1绑定topicExchange类型交换机,routingKey = *.orange.*
@Bean
public Binding queue1Binding(TopicExchange topicExchange, Queue topicQueue1){
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.orange.*");
}
//队列topic.queue2绑定topicExchange类型交换机,routingKey = *.*.rabbit
@Bean
public Binding queue2Binding(TopicExchange topicExchange, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.rabbit");
}
//队列topic.queue2绑定topicExchange类型交换机,routingKey = lazy.#
@Bean
public Binding queue22Binding(TopicExchange topicExchange, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("lazy.#");
}
}
模拟生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TopicQueueTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
//路由为*.orange.*的消息,投递到topic.queue1,将由消费者1接收
//rabbitTemplate.convertAndSend("topic.exchange", "xxxx.orange.ttttt", "routingKey=xxxx.orange.ttttt");
//路由为*.orange.*的消息,投递到topic.queue1 -> 这个topic.queue1队列收不到,因为左边的*没有(可使用* 星号,匹配一个单词)
//rabbitTemplate.convertAndSend("topic.exchange", "orange.ttttt", "routingKey=orange.ttttt");
//路由为*.*.rabbit的消息,投递到topic.queue2,将由消费者2接收
//rabbitTemplate.convertAndSend("topic.exchange", "a.b.rabbit", "routingKey=a.b.rabbit");
//路由为*.*.rabbit的消息,投递到topic.queue2,-> 这个topic.queue2队列收不到,因为*至少要匹配一个单词
//rabbitTemplate.convertAndSend("topic.exchange", "b.rabbit", "routingKey=b.rabbit");
//路由为lazy.#的消息,投递到topic.queue2,将由消费者2接收
//rabbitTemplate.convertAndSend("topic.exchange", "lazy", "routingKey=lazy");
//路由为lazy.#的消息,投递到topic.queue2,将由消费者2接收
//rabbitTemplate.convertAndSend("topic.exchange", "lazy.a", "routingKey=lazy.a");
//路由为lazy.#的消息,投递到topic.queue2,将由消费者2接收
//rabbitTemplate.convertAndSend("topic.exchange", "lazy.a.b", "routingKey=lazy.a.b");
}
}
模拟消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicQueueListener {
@RabbitListener(queues = "topic.queue1")
public void topicQueue1Msg(String msg) {
System.out.println("topic.queue1队列收到消息:"+msg);
}
@RabbitListener(queues = "topic.queue2")
public void topicQueue2Msg(String msg) {
System.out.println("topic.queue2队列收到消息:"+msg);
}
}
这里就不演示了,自己测试下