SpringBoot集成RabbitMQ
RabbitMQ
概念
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。
Windows环境安装
- RabbitMq Windows 环境安装参考https://blog.csdn.net/zhm3023/article/details/82217222
简述
-
queue:队列,每个队列可以有多个消费者,但是一条消息只会被一个消费者消费
-
exchange:交换机,队列可以绑定交换机,交换机根据路由或者其他匹配信息将消息发送至queue。
模式
-
simple模式:不需要交换机,直连模式。一个队列只有一个消费者
-
work模式:一个队列多个消费者
-
direct模式:需要交换机,通过交换机的路由key,精确匹配queue,并发送至对应的queue
-
topic模式:通过路由与路由key,模糊匹配的模式。可用通配符。比如key.1会被绑定路由key.*的queue获取到
-
fanout: 广播模式,不需要路由key,给所有绑定到交换机的queue
SpringBoot集成RabbitMq
简单实例
引入Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
创建队列、交换机
@Configuration
public class RebbitMqConfig {
/**
* 队列名
*/
private static final String LOG_QUEUE = "log_queue";
/**
* 交换机名
*/
public static final String LOG_EXCHANGE = "log_exchange";
public static final String MESSAGE_QUEUE = "message_queue";
public static final String MESSAGE_EXCHANGE = "message_exchange";
public static final String DIRECT_ROUTINGKEY = "test";
/**
* 创建队列
* @return
*/
@Bean
public Queue createLogQueue() {
return new Queue(LOG_QUEUE);
}
/**
* 创建队列
* @return
*/
@Bean
public Queue createMessageQueue() {
return new Queue(MESSAGE_QUEUE);
}
/**
* 创建交换机
* @return
*/
@Bean
public FanoutExchange logExchange() {
return new FanoutExchange(LOG_EXCHANGE);
}
/**
* 创建交换机
* @return
*/
@Bean
public DirectExchange messageExchange() {
return new DirectExchange(MESSAGE_EXCHANGE);
}
/**
* 队列与交换机进行绑定
* @return
*/
@Bean
public Binding bindingFanout() {
return BindingBuilder.bind(createLogQueue()).to(logExchange());
}
/**
* 队列与交换机绑定并添加路由key(direct和topic模式)
* @return
*/
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(createMessageQueue()).to(messageExchange()).with(DIRECT_ROUTINGKEY);
}
}
生产者简单实例
@RestController
@RequestMapping("/rabbit")
public class RabbitMqProduct {
/**
* 队列名
*/
private static final String LOG_QUEUE = "log_queue";
/**
* 交换机名
*/
public static final String LOG_EXCHANGE = "log_exchange";
public static final String DIRECT_ROUTINGKEY = "test";
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/test1")
public void sendMessage() {
String context = "simple---> " + new Date();
//如果没有配置默认交换机,直接传入queue的name
rabbitTemplate.convertAndSend(LOG_QUEUE, context);
//如果配置了默认的交换机,(交换机,queue_name,内容)
rabbitTemplate.convertAndSend("", DIRECT_ROUTINGKEY, context);
}
@RequestMapping("/test2")
public void sendMessages() {
String context = "direct---> " + new Date();
//(交换机名称,路由的key,内容)
rabbitTemplate.convertAndSend(LOG_EXCHANGE, DIRECT_ROUTINGKEY, context);
}
}
消费者简单实例
@Component
public class RabbitMqConsume {
//基础注解,指定queue的名称,可以多个。如果是simple不需要交换机的直接写队列名称就好。
//如果是其他的也想只指定一个queues——name的话,需要上面的配置类配置queue或者其他绑定关系
@RabbitListener(queues = "log_queue")
@RabbitHandler
public void processSimpleMsg(String message) {
System.out.println("########################received simple" + message);
}
}
简单集成完成。。。