前言
前面几篇RabbitMQ的模式几乎介绍清楚了RabbitMQ的基础架构,以及消息确认机制,这里打算将RabbitMQ集成到springboot中,便于日后工作查找
1、项目搭建
在idea中构建一个springboot项目,同时引入web和RabbitMQ模块。
2、引入配置
application.properties文件中加入如下配置,一部分是连接配置,另一部分是exchange的名称和routing key的配置。
#对于rabbitMQ的支持
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#rabbitmq
spring.rabbitmq.learn.exchange.name=learn:info:mq:exchange
spring.rabbitmq.learn.queue.name=learn:info:mq:queue
spring.rabbitmq.learn.routing.key.name=learn:info:mq:routing:key
新建一个RabbitConfig,将这个类作为一个Configuration交给spring托管,如下所示:
/**
* autor:liman
* createtime:2019/10/27
* comment:
*/
@Configuration
public class RabbitConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private Environment env;
@Bean
public ConnectionFactory connectionFactory() {
String host = env.getProperty("spring.rabbitmq.host");
int port = Integer.valueOf(env.getProperty("spring.rabbitmq.port"));
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
}
/**
* 获取队列A
* @return
*/
@Bean
public Queue learnQueue() {
return new Queue(env.getProperty("spring.rabbitmq.learn.queue.name"), true); //队列持久
}
@Bean
public Binding binding() {
return BindingBuilder.bind(learnQueue()).to(defaultExchange())
.with(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
}
}
也就是在这个configuration中完成了RabbitTemplate的实例化。
3、发送消息
在一个测试controller中发送消息
/**
* autor:liman
* createtime:2019/10/27
* comment:
*/
@RestController
public class RabbitController {
private static final Logger log = LoggerFactory.getLogger(RabbitController.class);
private static final String prefix = "/rabbit";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private Environment env;
@RequestMapping(value=prefix+"/learn",method = RequestMethod.GET)
public String sendMessage(String message){
log.info("待发送消息:{}",message);
try{
//这里即开始发送消息,设置交换机和绑定键,完成消息的发送
rabbitTemplate.setExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend(msg);
}catch (Exception e){
log.error("发送消息异常:{}",e.fillInStackTrace());
}
return "success";
}
}
4、两种消费消息的方式
其实消息的消费,我个人理解为有两种方式,一种是@RabbitListener注解,另一种是实现ChannelAwareMessageListener接口
4.1@RabbitListener注解消费消息
该种消费消息的方式比较简单,在任意方法中引入@RabbitListener注解即可,这里作为消费端,只需要指定敢兴趣的queue即可
@Component
public class AnnotationListener {
private static final Logger log = LoggerFactory.getLogger(AnnotationListener.class);
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(queues = "${spring.rabbitmq.learn.queue.name}")
public void consumerMessage(@Payload byte[] message){
String result = null;
try {
result = objectMapper.readValue(message, String.class);
log.info("注解的方式消费消息:{}",result);
} catch (IOException e) {
e.printStackTrace();
}
log.info("接收到的消息:{}",result);
}
}
4.2实现ChannelAwareMessageListener接口消费消息
该方法似乎比较复杂,分为两步,新增一个实体类并实现ChannelAwareMessageListener接口
/**
* autor:liman
* createtime:2019/10/27
* comment:
*/
@Component
public class ImplementListener implements ChannelAwareMessageListener {
private static final Logger log = LoggerFactory.getLogger(ImplementListener.class);
@Autowired
private ObjectMapper objectMapper;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long tag = message.getMessageProperties().getDeliveryTag();
try{
byte[] body = message.getBody();
String result = objectMapper.readValue(body, String.class);
log.info("非注解方式开始消费消息:{}",result);
}catch (Exception e){
log.error("消费消息异常,异常信息为:{}",e.fillInStackTrace());
}
}
}
在RabbitConfig中绑定消费者,如下所示:
@Autowired
private ImplementListener implementListener;
@Bean
public SimpleMessageListenerContainer getListenerContainer(@Qualifier("learnQueue") Queue queue){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueues(queue);
container.setMessageListener(implementListener);
return container;
}
测试
两个都能正常消费消息,但是这里不贴出简单的消费测试结果,而是在同时引入两种消费消息的方式的时候的测试结果:
修改了一下消息发送的controller
package com.learn.springbootrabbitmqblog.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* autor:liman
* createtime:2019/10/27
* comment:
*/
@RestController
public class RabbitController {
private static final Logger log = LoggerFactory.getLogger(RabbitController.class);
private static final String prefix = "/rabbit";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private Environment env;
@RequestMapping(value = prefix + "/learn", method = RequestMethod.GET)
public String sendMessage(String message) {
log.info("待发送消息:{}", message);
try {
//多线程发送消息
for(int i =0 ;i<100;i++){
new Thread(new sendThread(rabbitTemplate,objectMapper,message+i)).start();
}
} catch (Exception e) {
log.error("发送消息异常:{}", e.fillInStackTrace());
}
return "success";
}
private class sendThread implements Runnable {
private RabbitTemplate rabbitTemplate;
private ObjectMapper objectMapper;
private String message;
public sendThread(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper, String message) {
this.rabbitTemplate = rabbitTemplate;
this.objectMapper = objectMapper;
this.message = message;
}
@Override
public void run() {
try {
rabbitTemplate.setExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
Message msg = null;
msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend(msg);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
会通过负载均衡的方式发送消息,运行结果如下
总结
本篇博客只是记录了springboot中对RabbitMQ简单的集成,但是还没有用到较好的业务场景,后续会补充总结springboot中的消息确认机制。