什么是RabbitMQ
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛.
RabbitMQ官方地址: https://www.rabbitmq.com/
RabbitMQ的工作模式
RabbitMQ提供了6种模式:简单模式,work工作队列模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics通配符模式,RPC远程调用模式(远程调用,不太算MQ;不作介绍);
简单模式
消费者代码
@Component
public class SimpleConsumer {
@RabbitListener(queues = "SimpleQueue")
public void listener(String msg){
System.out.println("收到消息:"+msg);
}
}
生产者代码
@Configuration
public class SimpleProducer {
@Autowired
AmqpTemplate amqpTemplate;
/***
* 声明队列
*/
@Bean
public Queue simpleQueue(){
return QueueBuilder.durable("SimpleQueue").build();
}
/**
* 发送消息
* @param msg
*/
public void sendMsg(String msg){
amqpTemplate.convertAndSend("SimpleQueue",msg);
}
}
测试方法
@SpringBootTest
public class SimpleTest{
@Autowired
SimpleProducer simpleProducer;
@Test
public void simpleTest() {
simpleProducer.sendMsg("简单模式的消息");
}
}
结果
总结
- 生产者:发送消息的程序
- 消费者:消息的接收和处理程序,会一直等待消息的到来
- 队列:类似于一个邮箱,可以缓存消息;生产者向其投递消息,消费者从中取出消息
Work queues(工作队列)模式
消费者代码
@Component
public class WorkConsumer {
@RabbitListener(queues = "WorkQueue")
public void listener1(String msg){
System.out.println("第一个消费者收到消息:"+msg);
}
@RabbitListener(queues = "WorkQueue")
public void listener2(String msg){
System.out.println("第二个消费者收到消息:"+msg);
}
}
生产者代码
@Configuration
public class WorkProducer {
/***
* 声明队列
*/
@Bean
public Queue workQueue(){
return QueueBuilder.durable("WorkQueue").build();
}
@Autowired
AmqpTemplate amqpTemplate;
public void sendMsg(String msg){
amqpTemplate.convertAndSend("WorkQueue",msg);
}
}
测试方法
@SpringBootTest
public class WorkTest {
@Autowired
WorkProducer workProducer;
@Test
public void workTest() {
for (int i = 0; i < 10; i++) {
workProducer.sendMsg("工作队列消息" + i);
}
}
}
结果
总结
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
- Work queues模式与简单模式相比,多了一个或多个消费端,多个消费端共同消费同一个队列中的消息.对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度.
Publish/Subscribe(发布/订阅)模式
消费者代码
@Component
public class FanoutConsumer {
@RabbitListener(queues = "FanoutQueue1")
public void listener1(String msg){
System.out.println("第一个消费者收到消息:"+msg);
}
@RabbitListener(queues = "FanoutQueue2")
public void listener2(String msg){
System.out.println("第二个消费者收到消息:"+msg);
}
}
生产者代码
@Configuration
public class FanoutProducer {
/***
* 声明交换机
*/
@Bean
public Exchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange("FanoutExchange").durable(true).build();
}
/***
* 声明队列
*/
@Bean
public Queue fanoutQueue1(){
return QueueBuilder.durable("FanoutQueue1").build();
}
/***
* 声明队列
*/
@Bean
public Queue fanoutQueue2(){
return QueueBuilder.durable("FanoutQueue2").build();
}
/***
* 队列绑定到交换机上
*/
@Bean
public Binding fanoutQueueExchange1(@Qualifier("fanoutQueue1") Queue queue, @Qualifier("fanoutExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
/***
* 队列绑定到交换机上
*/
@Bean
public Binding fanoutQueueExchange2(@Qualifier("fanoutQueue2") Queue queue, @Qualifier("fanoutExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
@Autowired
RabbitTemplate rabbitTemplate;
public void sendMsg(String msg){
rabbitTemplate.convertAndSend("FanoutExchange","",msg);
}
}
测试方法
@SpringBootTest
public class FanoutTest {
@Autowired
FanoutProducer fanoutProducer;
@Test
public void fanoutTest() {
for (int i = 0; i < 5; i++) {
fanoutProducer.sendMsg("发布与订阅模式消息" + i);
}
}
}
结果
总结
- 交换机需要与队列进行绑定,绑定之后,一个消息可以被多个消费者都收到.
- 每个消费者监听自己的队列
- 生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
- 与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机.
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机).
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置.实际上Work queues队列模式会将队列绑 定到默认的交换机.
Routing(路由)模式
消费者代码
@Component
public class RoutingKeyConsumer {
@RabbitListener(queues = "RoutingQueue1")
public void listener1(String msg){
System.out.println("routing1收到消息:"+msg);
}
@RabbitListener(queues = "RoutingQueue2")
public void listener2(String msg){
System.out.println("routing2收到消息:"+msg);
}
}
生产者代码
@Configuration
public class RoutingProducer {
/***
* 声明交换机
*/
@Bean
public Exchange routingExchange(){
return ExchangeBuilder.directExchange("RoutingExchange").durable(true).build();
}
/***
* 声明队列
*/
@Bean
public Queue routingQueue1(){
return QueueBuilder.durable("RoutingQueue1").build();
}
/***
* 声明队列
*/
@Bean
public Queue routingQueue2(){
return QueueBuilder.durable("RoutingQueue2").build();
}
/***
* 队列绑定到交换机上
*/
@Bean
public Binding routingQueueExchange1(@Qualifier("routingQueue1") Queue queue, @Qualifier("routingExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing1").noargs();
}
/***
* 队列绑定到交换机上
*/
@Bean
public Binding routingQueueExchange2(@Qualifier("routingQueue2") Queue queue, @Qualifier("routingExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing2").noargs();
}
@Autowired
RabbitTemplate rabbitTemplate;
public void sendMsg(String routingKey,String msg){
rabbitTemplate.convertAndSend("RoutingExchange",routingKey,msg);
}
}
测试方法
@SpringBootTest
public class RoutingTest {
@Autowired
RoutingProducer routingProducer;
@Test
public void routingTest() {
for (int i = 0; i < 5; i++) {
routingProducer.sendMsg("routing1", "给绑定routing1的队列发送的消息" + i);
routingProducer.sendMsg("routing2", "给绑定routing2的队列发送的消息" + i);
}
}
}
结果
总结
- 与发布/订阅模式的对比
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey.
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息.
Topics(通配符)模式
消费者代码
@Component
public class TopicConsumer {
@RabbitListener(queues = "TopicQueue1")
public void listener1(String msg){
System.out.println("topicQueue1收到消息:"+msg);
}
@RabbitListener(queues = "TopicQueue2")
public void listener2(String msg){
System.out.println("topicQueue2收到消息:"+msg);
}
}
生产者代码
@Configuration
public class TopicProducer {
/***
* 声明交换机
*/
@Bean
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange("TopicExchange").durable(true).build();
}
/***
* 声明队列
*/
@Bean
public Queue topicQueue1(){
return QueueBuilder.durable("TopicQueue1").build();
}
/***
* 声明队列
*/
@Bean
public Queue topicQueue2(){
return QueueBuilder.durable("TopicQueue2").build();
}
/***
* 队列绑定到交换机上
*/
@Bean
public Binding topicQueueExchange1(@Qualifier("topicQueue1") Queue queue, @Qualifier("topicExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("TopicRoutingKey.#").noargs();
}
/***
* 队列绑定到交换机上
*/
@Bean
public Binding topicQueueExchange2(@Qualifier("topicQueue2") Queue queue, @Qualifier("topicExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("TopicRoutingKey.*").noargs();
}
@Autowired
RabbitTemplate rabbitTemplate;
public void sendMsg(String routingKey,String msg){
rabbitTemplate.convertAndSend("TopicExchange",routingKey,msg);
}
}
测试方法
@SpringBootTest
public class TopicTest {
@Autowired
TopicProducer topicProducer;
@Test
public void routingTest(){
for (int i = 0; i < 5; i++) {
topicProducer.sendMsg("TopicRoutingKey.a.b","这条消息只有TopicQueue1收到");
topicProducer.sendMsg("TopicRoutingKey.a","这条消息全部都可以收到");
}
}
}
结果
总结
- 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key配置的规则将消息发送到对应的队列
- 与Routing模式的对比:都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
通配符规则 | 作用 | 示例 |
---|---|---|
# | 匹配一个词或者多个词 | example.#:example.a,example.a.b都匹配 |
* | 匹配不多不少恰好一次词 | example.*:example.a匹配,example.a.b不匹配 |