1 Direct Exchange 实战
1.1 Direct Exchange 的简单介绍
Direct 即是路由模式,也是 Exchange 的默认模式。唯有当 Routing Key 与 Binding Key 完全匹配的时候才将消息发送至消息队列。
事实上,RabbitMQ 默认提供了一个 Exchange,名字是空字符串,类型是 Direct,绑定到所有的Queue 上(每一个 Queue 和这个无名 Exchange 之间的 Binding Key 是 Queue 的名字)。所以,有时候我们感觉不需要 Exchange 也可以发送和接收消息,实际上是使用了 RabbitMQ 默认提供的 Exchange。
1.2 生产者的创建
我们新建一个 Spring Boot 项目,命名为 provider,同时导入依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
我们的 application.properties 如下:
server.port: 8080
spring.application.name: provider
spring.rabbitmq.host: 127.0.0.1
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
然后我们需要创建一个配置类
package com.example.provider.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 直连型交换机
* @author 30309
*
*/
@Configuration
public class DirectRabbitConfig {
//队列,名称为DirectQueue
@Bean
public Queue DirectQueue() {
return new Queue("DirectQueue",true); //true表示是否持久
}
//直连型交换机,名称为DirectExchange
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("DirectExchange");
}
//将队列和交换机绑定, 并设置用于匹配键:DirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
}
}
最后我们实现一个控制类
package com.example.provider.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* 生产者
* @author 30309
*
*/
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "Hello World!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
//发送信息
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值DirectRouting发送到交换机DirectExchange
rabbitTemplate.convertAndSend("DirectExchange", "DirectRouting", map);
return "ok";
}
}
我们的生产者代码部分就基本实现完毕了,接下来我们测试一下,直接运行项目。
在浏览器中输入 http://localhost:8080/sendDirectMessage ,然后我们打开 http://localhost:15672
发现有如下提示,证明我们的生产者已经生产了一条消息,且把消息推送到 RabbitMQ 服务器上。
1.3 消费者的创建
我们新建一个 Spring Boot 项目,命名为 consumer,同时导入依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接着我们创建一个配置类
package com.example.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
//队列,名称为DirectQueue
@Bean
public Queue DirectQueue() {
return new Queue("DirectQueue",true);
}
//Direct交换机,名称为DirectExchange
@Bean
DirectExchange DirectExchange() {
return new DirectExchange("DirectExchange");
}
//将队列和交换机绑定,并设置匹配键DirectRouting
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(DirectQueue()).to(DirectExchange()).with("DirectRouting");
}
}
然后我们创建一个消费者:
package com.example.consumer.receiver;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
* @author 30309
*
*/
@Component
@RabbitListener(queues = "DirectQueue")//监听的队列名称为DirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map message) {
System.out.println("消费者收到消息: " + message.toString());
}
}
运行 consumer 项目,我们发现消息已被消费
打开 http://localhost:15672 ,也验证了我们的说法
2 Topic Exchange 实战
2.1 Topic Exchange 的简单介绍
Topic 就是通配符模式。Routing Key 与 Binding Key 会根据正则表达式进行模糊匹配,若匹配成功则会将消息转发至消息队列。
2.2 生产者的创建
修改配置类
package com.example.provider.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置类
* @author 30309
*
*/
@Configuration
public class TopicConfig {
//绑定键
public final static String hello = "topic.hello";
public final static String world = "topic.world";
//队列一
@Bean
public Queue firstQueue() {
return new Queue(hello);
}
//队列二
@Bean
public Queue secondQueue() {
return new Queue(world);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//将队列一和topicExchange进行绑定,绑定的键值为topic.hello
//当且仅当消息携带的路由键是topic.hello时才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(hello);
}
//将队列二和topicExchange进行绑定,绑定的键值为用上通配路由键规则的topic.#
//只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
修改控制器
package com.example.provider.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* 生产者
* @author 30309
*
*/
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
Map<String, Object> map = new HashMap<>();
map.put("data", "Hello 'topic.hello'");
rabbitTemplate.convertAndSend("topicExchange", "topic.hello", map);
return "ok";
}
@RequestMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
Map<String, Object> map = new HashMap<>();
map.put("data", "Hello 'topic.world'");
rabbitTemplate.convertAndSend("topicExchange", "topic.world", map);
return "ok";
}
}
2.3 消费者的创建
修改配置类
package com.example.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置类
* @author 30309
*
*/
@Configuration
public class TopicConfig {
//绑定键
public final static String hello = "topic.hello";
public final static String world = "topic.world";
//队列一
@Bean
public Queue firstQueue() {
return new Queue(hello);
}
//队列二
@Bean
public Queue secondQueue() {
return new Queue(world);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
//将队列一和topicExchange进行绑定,绑定的键值为topic.hello
//当且仅当消息携带的路由键是topic.hello时才会分发到该队列
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(hello);
}
//将队列二和topicExchange进行绑定,绑定的键值为用上通配路由键规则的topic.#
//只要是消息携带的路由键是以topic.开头,都会分发到该队列
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
接着我们创建两个消费者
package com.example.consumer.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 消费者
*/
@Component
@RabbitListener(queues = "topic.hello")
public class TopicHelloReceiver {
@RabbitHandler
public void process(Map message) {
System.out.println("TopicHelloReceiver消费者收到消息:" + message.toString());
}
}
package com.example.consumer.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 消费者
*/
@Component
@RabbitListener(queues = "topic.world")
public class TopicWorldReceiver {
@RabbitHandler
public void process(Map message) {
System.out.println("TopicWorldReceiver消费者收到消息:" + message.toString());
}
}
2.4 测试
运行项目provider,consumer,输入 http://localhost:8080/sendTopicMessage1
我们发现,两个消费者都消费了这条消息,这是因为,TopicHelloReceiver 消费者监听了队列1,绑定键为 topic.hello,TopicWorldReceiver 消费者监听了队列2,绑定键为 topic.#,而当前推送消息携带的路由键为 topic.hello,两个消费者监听的队列的绑定键都能与这条消息携带的路由键匹配上。
我们接着输入 http://localhost:8080/sendTopicMessage2
只有 TopicWorldReceiver 消费者消费了这条消息,这是因为当前推送消息携带的路由键为 topic.world,只有 TopicWorldReceiver 消费者监听的队列的绑定键能与这条消息携带的路由键匹配上。
3 Fanout Exchange 实战
3.1 Fanout Exchange 的简单介绍
Fanout 又被称为订阅/广播模式。该模式模式与 Binding Key 和 Routing Key 无关,Exchange 将接收到的消息分发给所有与该交换器绑定的消息队列。注意,Fanout 转发消息是最快的。
3.2 生产者的创建
创建配置类
package com.example.provider.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置类
* @author 30309
*
*/
@Configuration
public class FanoutConfig {
@Bean
public Queue queue1() {
return new Queue("fanout1");
}
@Bean
public Queue queue2() {
return new Queue("fanout2");
}
@Bean
public Queue queue3() {
return new Queue("fanout3");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
}
创建控制器
package com.example.provider.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* 生产者
* @author 30309
*
*/
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageData = "Hello World!";
Map<String, Object> map = new HashMap<>();
map.put("messageData", messageData);
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
}
3.3 消费者的创建
创建配置类
package com.example.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置类
* @author 30309
*
*/
@Configuration
public class FanoutConfig {
@Bean
public Queue queue1() {
return new Queue("fanout1");
}
@Bean
public Queue queue2() {
return new Queue("fanout2");
}
@Bean
public Queue queue3() {
return new Queue("fanout3");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queue3()).to(fanoutExchange());
}
}
然后我们创建三个消费者
package com.example.consumer.receiver;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout1")
public class FanoutReceiver1 {
@RabbitHandler
public void process(Map message) {
System.out.println("FanoutReceiver1消费者:" + message.toString());
}
}
package com.example.consumer.receiver;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout2")
public class FanoutReceiver2 {
@RabbitHandler
public void process(Map message) {
System.out.println("FanoutReceiver2消费者:" + message.toString());
}
}
package com.example.consumer.receiver;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout3")
public class FanoutReceiver3 {
@RabbitHandler
public void process(Map message) {
System.out.println("FanoutReceiver3消费者:" + message.toString());
}
}
3.4 测试
运行项目 provider,consumer,输入 http://localhost:8080/sendFanoutMessage,发现结果如下
由于三个队列都绑定了 Fanout Exchange ,所以三个消息接收类都监听到了这条消息。