1. 概述
RabbitMQ是一个接收,存储和转发消息数据的消息中间件。主要有四大核心部分,生产者、交换机、队列和消费者,工作原理如下图:
1.1. 核心名称解释
Broker:接收和分发消息的应用
Connection:publisher/consumer和broker之间的TCP连接
Channel:在connection内部建立的逻辑连接,作为轻量级的connection极大减少了操作系统建立TCP connection的开销
Exchange:交换机,message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去,常用的类型有:direct(point-to-point)、topic(publish-subscribe)和fanout(multicast)
Queue:消息最终被送到这里等待consumer取走
Binding:exchange和queue之间的虚拟连接,binding中科院包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
2. 引入核心依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3. 核心配置信息
spring:
application:
name: springboot-rabbitmq
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
rabbitmq:
host: xx.xx.xx.xx
port: 5672
virtual-host: /
publisher-confirm-type: correlated #发布消息成功到交换机后会触发回调方法
publisher-returns: true #确认消息发送到队列
username: admin
password: 123456
4. 回调配置类
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@PostConstruct
private void init() {
//true:交换机无法将消息进行路由时,会将该消息返回给生产者;false:如果发现消息无法进行路由,则直接丢弃
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
logger.info("交换机接收到id为:{}的消息确认成功!", id);
} else {
logger.info("id为:{}的消息未成功投递到交换机,原因是:{}", id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.error("消息:{}被交换机:{}退回,退回码是:{},退回原因是:{},路由是:{}", message.getBody(), exchange, replyCode, replyText, routingKey);
}
}
5. 交换机配置类
交换机一方面接收来自生产者的消息,另一方面将消息推入队列,交换机必须明确应该把这些消息放入特定队列还是放入多个队列,或者直接丢弃。
交换机的类型有:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
5.1. Direct交换机
消息只会投递到绑定的routingKey队列之中,如下图:
X交换机绑定了两个队列,队列Q1绑定键为orange,队列Q2绑定键为black和green。在这种绑定情况下,生产者发布消息到交换机,绑定键位orange的消息会被发布到队列Q1,绑定键位black和green的消息会被发布到队列Q2,其他消息则会被直接丢弃。
如果绑定多个队列的键都相同,则跟fanout类似,跟广播一样
5.1.1. Direct配置类
@Configuration
public class RabbitDirectConfig {
private static final String DIRECT_EXCHANGE_NAME = "direct-exchange";
private static final String DIRECT_QUEUE_NAME = "direct-queue";
private static final String ROUTING_KEY = "direct.route.key";
/**
* 声明Exchange
* @return
*/
@Bean(name = "directExchange")
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE_NAME);
}
/**
* 声明队列
* @return
*/
@Bean(name = "directQueue")
public Queue directQueue() {
return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();
}
/**
* 声明确认队列绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding queueBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
5.1.2. 生产者类
@RestController
@RequestMapping("/direct")
public class DirectProducerController {
private static final String DIRECT_EXCHANGE_NAME = "direct-exchange";
private static final String ROUTING_KEY = "direct.route.key";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, ROUTING_KEY, message + ROUTING_KEY, correlationData);
}
}
5.1.3. 消费者类
@Component
public class DirectConsumer {
private static final String DIRECT_QUEUE_NAME = "direct-queue";
private static Logger logger = LoggerFactory.getLogger(ConfirmConsumer.class);
@RabbitListener(queues = DIRECT_QUEUE_NAME)
public void receiveMsg(Message message) {
String msg = new String(message.getBody());
logger.info("接收到队列direct.queue消息:{}", msg);
}
}
5.2. Topic交换机
topic交换机的routingKey必须满足一定的规则,即以点号分隔开的单词列表,最多不超过255个字节。
(星号)代替一个单词
#(井号)代替零个或多个单词
例如Q1绑定的routingKey是(topic.*.*),Q2绑定的routingKey是(*.*.rabbit)
当routingKey中以topic开头的消息都会被Q1接收,同理,以rabbit结尾的消息都会被Q2接收。当一个队列绑定键routingKey是#时,那么这个队列将接收所有数据,类似于fanout,如果队列绑定routingKey中没有#和,那么该队列绑定类型就是direct.
5.2.1. Topic配置类
@Configuration
public class RabbitTopicConfig {
private static final String TOPIC_EXCHANGE_NAME = "topic-exchange";
private static final String TOPIC_QUEUE_NAME_A = "topic-queue-A";
private static final String TOPIC_QUEUE_NAME_B = "topic-queue-B";
private static final String ROUTING_KEY_A = "topic.*.*";
private static final String ROUTING_KEY_B = "*.*.rabbit";
/**
* 声明Exchange
* @return
*/
@Bean(name = "topicExchange")
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_NAME);
}
/**
* 声明队列
* @return
*/
@Bean(name = "topicQueueA")
public Queue topicQueueA() {
return QueueBuilder.durable(TOPIC_QUEUE_NAME_A).build();
}
/**
* 声明队列
* @return
*/
@Bean(name = "topicQueueB")
public Queue topicQueueB() {
return QueueBuilder.durable(TOPIC_QUEUE_NAME_B).build();
}
/**
* 声明确认队列绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding topicQueueBindingA(@Qualifier("topicQueueA") Queue queue, @Qualifier("topicExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_A);
}
/**
* 声明确认队列绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding topicQueueBindingB(@Qualifier("topicQueueB") Queue queue, @Qualifier("topicExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_B);
}
}
5.2.2. 生成者类
@RestController
@RequestMapping("/topic")
public class TopicProducerController {
private static final String TOPIC_EXCHANGE_NAME = "topic-exchange";
private static final String ROUTING_KEY_A = "topic.*.*";
private static final String ROUTING_KEY_B = "*.*.rabbit";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_A, message, correlationData);
}
}
5.2.3. 消费者类
@Component
public class TopicConsumer {
private static final String TOPIC_QUEUE_NAME_A = "topic-queue-A";
private static final String TOPIC_QUEUE_NAME_B = "topic-queue-B";
private static Logger logger = LoggerFactory.getLogger(TopicConsumer.class);
@RabbitListener(queues = TOPIC_QUEUE_NAME_A)
public void receiveMsgA(Message message) {
String msg = new String(message.getBody());
logger.info("接收到队列topic.queueA消息:{}", msg);
}
@RabbitListener(queues = TOPIC_QUEUE_NAME_B)
public void receiveMsgB(Message message) {
String msg = new String(message.getBody());
logger.info("接收到队列topic.queueB消息:{}", msg);
}
}
5.3. Fanout交换机
接收到所有消息广播到它知道的所有队列中
5.3.1. Fanout配置类
@Configuration
public class RabbitFanoutConfig {
private static final String FANOUT_EXCHANGE_NAME = "fanout-exchange";
private static final String FANOUT_QUEUE_NAME_A = "fanout-queue-A";
private static final String FANOUT_QUEUE_NAME_B = "fanout-queue-B";
private static final String FANOUT_QUEUE_NAME_C = "fanout-queue-C";
/**
* 声明Exchange
* @return
*/
@Bean(name = "fanoutExchange")
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE_NAME);
}
/**
* 声明队列
* @return
*/
@Bean(name = "fanoutQueueA")
public Queue fanoutQueueA() {
return QueueBuilder.durable(FANOUT_QUEUE_NAME_A).build();
}
/**
* 声明队列
* @return
*/
@Bean(name = "fanoutQueueB")
public Queue fanoutQueueB() {
return QueueBuilder.durable(FANOUT_QUEUE_NAME_B).build();
}
/**
* 声明队列
* @return
*/
@Bean(name = "fanoutQueueC")
public Queue fanoutQueueC() {
return QueueBuilder.durable(FANOUT_QUEUE_NAME_C).build();
}
/**
* 声明确认队列绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding queueBindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
/**
* 声明确认队列绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding queueBindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
/**
* 声明确认队列绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding queueBindingC(@Qualifier("fanoutQueueC") Queue queue, @Qualifier("fanoutExchange") FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
}
5.3.2. 生产者类
@RestController
@RequestMapping("/fanout")
public class FanoutProducerController {
private static final String FANOUT_EXCHANGE_NAME = "fanout-exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, null, message, correlationData);
}
}
5.3.3. 消费者类
@Component
public class FanoutConsumer {
private static final String FANOUT_QUEUE_NAME_A = "fanout-queue-A";
private static final String FANOUT_QUEUE_NAME_B = "fanout-queue-B";
private static final String FANOUT_QUEUE_NAME_C = "fanout-queue-C";
private static Logger logger = LoggerFactory.getLogger(ConfirmConsumer.class);
@RabbitListener(queues = FANOUT_QUEUE_NAME_A)
public void receiveMsgA(Message message) {
String msg = new String(message.getBody());
logger.info("接收到队列fanout.queueA消息:{}", msg);
}
@RabbitListener(queues = FANOUT_QUEUE_NAME_B)
public void receiveMsgB(Message message) {
String msg = new String(message.getBody());
logger.info("接收到队列fanout.queueB消息:{}", msg);
}
@RabbitListener(queues = FANOUT_QUEUE_NAME_C)
public void receiveMsgC(Message message) {
String msg = new String(message.getBody());
logger.info("接收到队列fanout.queueC消息:{}", msg);
}
}