当你点进这篇文章我觉得你已经对 rabbitmq
不是很陌生了,至少清楚它是一个队列可以发送消息使用,下面简述一下它适合在什么场景下使用,与其他同类产品的对比,以及如何使用
后续我也会写一个系列专门来 rabbitmq
,此篇还是较为简单的介绍,主要还是针对 springboot
中的适用来写的
我们要在何时使用rabbitmq
应用解耦
有的时候由于有着不同的服务或者不同的应用之间互相调用,导致系统的复杂度与耦合度过高,由于我们的每个服务都是独立的,不想因为某些依赖或者引用而要被迫共同使用,所以我们可以考虑使用rabbitmq来对服务间解耦合,这样即使有着相互的依赖,我们也可以通过消息队列对不同的服务进行解耦,保证了服务的独立性
拿我们的业务场景来说,
一个绩效系统,有三个业务模块儿,分别是考勤、考核和绩效,考勤和考核都需要依赖着绩效的模块,这个时候如果我们想使用考勤和考核模块儿就有问题,因为他们依赖着绩效模块用来加分,所以我们不能直接使用,需要先部署绩效模块再去使用其他两个模块儿,这样就会出问题。
但是我们就这个场景使用的消息队列,在考勤和考核模块中都添加了消息队列,把它们需要发送的内容都放在消息队列中,这样绩效模块只需要去队列中进行读的操作,其他两个模块儿对绩效的加分操作也都放在了队列中,这样就起了解耦的作用,使得考勤和考核对绩效模块的耦合变弱。
异步调用
异步调用同样很容易理解,单模块调用多模块儿,将消息做分发,一个动作发送多条消息到不同的队列中,相应的模块对收到的消息依次消费,在异步消费的同时也会提高效率,因为我们将本来串行的内容改为了并行。
继续使用我们实际的场景来举例,我们的业务需要在用户开通项目成功时发送短信和邮件进行通知,这种需求的实时性要求不是很高,所以很适合我们使用异步调用,将发短信和发邮件都放在队列中,交由消息队列去处理即可,我们就不需要再进行直接调用发送短信或者邮件的代码了。
流量削峰
对突然出现的大量流量进行削峰,将发送到服务端的大量请求进行收集到队列中,而且可以控制队列长度,超过阈值直接丢弃,这样就可保证数据平缓输入,控制消息队列在可以承受的范围内依次消费消息,这样就可以为系统减压了
扫描二维码关注公众号,回复: 9068280 查看本文章
对应的实际的使用:
可以在秒杀时根据库存量限定队列的长度来使得只有限定的人数可以买到,在不会给系统造成压力的同时也处理了业务
还有对于日志的存储我们也可以使用消息队列,把所有系统操作的日志都存储在消息队列中,对于这类的不是很重要的内容,使用消息队列也很方便
我们在系统中就将日志全部放入了消息队列中,让消息队列以一个系统可以接受的速度来进行日志的插入,使得数据平缓的输入
其他
这个小节换一个说法,其实也可以说,我们为什么要用rabbitmq,也正是因为如上rabbitmq的特点,我们使用了rabbitmq
rabbitmq的几种工作模式
开始之前先配置好rabbitmq的配置类
@Configuration
public class RabbitConfig {
@Bean
public Queue Queue() { return new Queue("hello"); }
@Bean
public Queue Queue1() { return new Queue("hello1"); }
@Bean
public Queue Queue2() { return new Queue("hello2"); }
@Bean
public Queue Queue3() { return new Queue("object_queue"); }
}
用来测试的测试类
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqTest {
}
一对一
单点发送,单点接收
添加HelloSender
@Component
public class HelloSender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
String context = "hello----" + LocalDateTime.now();
System.out.println("send:" + context);
//往名称为 hello 的queue中发送消息
this.amqpTemplate.convertAndSend("hello", context);
}
}
添加HelloReceiver
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
//消息处理器
@RabbitHandler
public void process(String message){
System.out.println("Receive:"+message);
}
}
在测试类中测试
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
测试结果如下
send:hello----2019-11-27T14:21:34.503
Receive:hello----2019-11-27T14:21:34.503
一对多
添加HelloSender1
@Component
public class HelloSender1 {
@Autowired
private AmqpTemplate amqpTemplate;
//给hello1发送消息,并接受一个计数参数
public void send(int i) {
String context = i + "";
System.out.println(context + "--send");
this.amqpTemplate.convertAndSend("hello1", context);
}
}
添加HelloReceiver1
@Component
@RabbitListener(queues = "hello1")
public class Helloreceiver1 {
//消息处理器
@RabbitHandler
public void process(String message){
System.out.println("Receiver1:"+message);
}
}
添加HelloReceiver2
@Component
@RabbitListener(queues = "hello1")
public class Helloreceiver2 {
//消息处理器
@RabbitHandler
public void process(String message){
System.out.println("Receiver2:"+message);
}
}
在测试类中测试
@Autowired
private HelloSender1 helloSender1;
@Test
public void manyReceiver1() {
for (int i = 0; i < 30; i++) {
helloSender1.send(i);
}
}
测试结果如下
Receiver2:1
Receiver1:0
Receiver2:3
Receiver1:2
Receiver2:5
Receiver1:4
Receiver2:7
Receiver1:6
Receiver2:9
Receiver1:8
为了测试方便,我采取的样本较小,根据这结果我们可以看得出来rabbitmq自动的为我们做了负载均衡,奇数全在Receiver2,偶数全在Receiver1
我们再来修改一下测试类,相同的消息发送两次,再查看以下结果
@Autowired
private HelloSender1 helloSender1;
@Autowired
private HelloSender1 helloSender2;
@Test
public void manyReceiver2() {
for (int i = 0; i < 10; i++) {
helloSender1.send(i);
helloSender2.send(i);
}
}
测试结果如下
Receiver2:0
Receiver1:0
Receiver1:1
Receiver2:1
Receiver1:2
Receiver2:2
Receiver1:3
Receiver2:3
Receiver1:4
Receiver1:5
Receiver2:4
Receiver1:6
Receiver2:5
Receiver1:7
Receiver2:6
Receiver1:8
Receiver2:7
Receiver1:9
Receiver2:8
Receiver2:9
每条消息都发送了两次,但是Receiver1和Receiver2每条消息都接受到了,证明了rabbitmq的负载均衡
发送对象
测试发送一个User对象,其实也相当于发送json数据
添加ObjectSender
@Component
public class ObjectSender {
@Autowired
AmqpTemplate amqpTemplate;
public void sendUser(User user){
System.out.println("Send object:"+user.toString());
this.amqpTemplate.convertAndSend("object_queue",user);
}
}
添加ObjectReceiver
@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver {
@RabbitHandler
public void objectReceiver(User user){
System.out.println("Receiver object:"+user.toString());
}
}
在测试类中测试
@Autowired
private ObjectSender objectSender;
@Test
public void objectReceiver() {
objectSender.sendUser(new User("123", "laozhang", "老张的密码"));
}
测试结果如下
Send object:User(id=123, user_name=laozhang, user_password=老张的密码)
Receiver object:User(id=123, user_name=laozhang, user_password=老张的密码)
Topic Exchange
topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
首先对topic规则配置,这里使用两个队列来测试
添加TopicSender
@Component
public class TopicSender {
@Autowired
AmqpTemplate amqpTemplate;
public void send1(){
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange","topic.message",context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
添加TopicReceiver1
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver topic.message :"+ message);
}
}
添加TopicReceiver2
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver topic.messages: "+ message);
}
}
在测试类中测试
@Autowired
private TopicSender topicSender;
@Test
public void topicReceiver() {
topicSender.send1();
topicSender.send2();
}
测试结果如下
Send object:User(id=123, user_name=laozhang, user_password=老张的密码)
Receiver object:User(id=123, user_name=laozhang, user_password=老张的密码)
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息
发布订阅模式
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
添加FanoutSender
@Component
public class FanoutSender {
@Autowired
AmqpTemplate amqpTemplate;
public void send(){
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
//这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
amqpTemplate.convertAndSend("fanoutExchange","", context);
}
}
添加FanoutReceiverA
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.A: "+message);
}
}
添加FanoutReceiverB
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.B: "+message);
}
}
添加FanoutReceiverC
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String message){
System.out.println("Receiver form fanout.C: "+message);
}
}
在测试类中测试
@Autowired
private FanoutSender fanoutSender;
@Test
public void FanoutReceiver() {
fanoutSender.send();
}
测试结果如下
Sender : hi, fanout msg
Receiver form fanout.A: hi, fanout msg
Receiver form fanout.C: hi, fanout msg
Receiver form fanout.B: hi, fanout msg
三个接收者都可以接收到消息
本系列文章
(一) SpringBoot 项目初始化 + 配置swagger页面
(二) SpringBoot 整合 MyBatis-plus