添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写接受消息类:
@Slf4j
@Component
public class MqReceiver {
//这个不会自动创建队列,需要自己先在浏览器上访问创建队列,否则报错
// @RabbitListener(queues = "process")
// public void process(String message){
// log.info("myFirstQueue:{}",message);
// }
//自动创建队列,
@RabbitListener(queuesToDeclare = @Queue("processAuto"))
public void processAuto(String message){
log.info("myFirstQueue:{}",message);
}
//自动创建队列,Exchange和Queue绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue("processAutoBind"),
exchange = @Exchange("myExchange")
))
public void processAutoBind(String message){
log.info("myFirstQueue:{}",message);
}
}
编写发送消息测试类:
/**
* 发送MQ消息测试
*/
@Component
public class MqSenderTest extends OrderApplicationTests{
@Autowired
private AmqpTemplate amqpTemplate;
// @Test
// public void send(){
// amqpTemplate.convertAndSend("process","now"+new Date());
// }
@Test
public void send1(){
amqpTemplate.convertAndSend("processAuto","now"+new Date());
}
@Test
public void send2(){
amqpTemplate.convertAndSend("processAutoBind","now"+new Date());
}
}
启动应用:观察MQ
执行测试方法:看控制台是否接受到消息
Exchange 简介
消息分组:
那么为什么我们需要 Exchange 而不是直接将消息发送至队列呢?
AMQP 协议中的核心思想就是生产者和消费者的解耦,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由 Exchange 来接收,然后 Exchange 按照特定的策略转发到 Queue 进行存储。Exchange 就类似于一个交换机,将各个消息分发到相应的队列中
在实际应用中我们只需要定义好 Exchange 的路由策略,而生产者则不需要关心消息会发送到哪个 Queue 或被哪些 Consumer 消费。在这种模式下生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息,Exchange 定义了消息路由到 Queue 的规则,将各个层面的消息传递隔离开,使每一层只需要关心自己面向的下一层,降低了整体的耦合度。
理解 Exchange
Exchange 收到消息时,他是如何知道需要发送至哪些 Queue 呢?这里就需要了解 Binding 和 RoutingKey 的概念:
Binding 表示 Exchange 与 Queue 之间的关系,我们也可以简单的认为队列对该交换机上的消息感兴趣,绑定可以附带一个额外的参数 RoutingKey。Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 Binding 做匹配,如果满足匹配,就往 Exchange 所绑定的 Queue 发送消息,这样就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue。RoutingKey 的意义依赖于交换机的类型。
先写两个接受消息类:
//水果供应商 接收消息
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange("myOrder"),
key = "fruit",
value = @Queue("processFruit")
))
public void processFruit(String message){
log.info("---------------------------fruit myFirstQueue:{}",message);
}
//数码供应商 接收消息
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange("myOrder"),
key = "computer",
value = @Queue("processComputer")
))
public void processComputer(String message){
log.info("---------------------------computer myFirstQueue:{}",message);
}
再写两个发送消息测试方法:
convertAndSend方法第一个参数是Exchange名称,第二个参数是key名称,再找到绑定的对应队列。
@Test
public void sendComputer(){
amqpTemplate.convertAndSend("myOrder","computer","now"+new Date());
}
@Test
public void sendFruit(){
amqpTemplate.convertAndSend("myOrder","fruit","now"+new Date());
}
启动,分别执行方法看结果: