前言
上一篇博客中,我们只是简单总结了Spring boot中整合RabbitMQ的操作,针对消息消费的两种方式只是简单给了一个实例,这篇博客,我们进一步总结关于Spring boot消息消费的相关功能。其实简单来看,spring boot中有两种方式,第一种是@RabbitListener注解,第二种是实现ChannelAwareMessageListener接口。
准备工作
上篇博客为了简单说明spring boot对RabbitMQ的集成,只是简单使用了一个配置文件,这次,我们可以将RabbitMQ的配置信息独立出去
1、在resources目录下建立一个新的application-rabbitmq.properties文件,其中存放RabbitMQ的相关配置,结构如下所示:
2、剩下的任务就是将这个配置文件引入到application.properties文件中,这个时候就需要用到—— spring.profiles.include属性了。在application.properties文件中加入这个属性,指定值为rabbitmq即可完成配置文件的引入,如下所示:
@RabbitListener注解
上一篇博客中已经总结过这个注解的基本使用,但是只是在其中使用了@Queue注解,其他的一些注解都没有更深入的探讨。
1、简单生产者的编写
这里还是撸一个简单的生产者
/**
* author:liman
* createtime:2019/10/29
*/
@Service
public class BasicProducerService {
@Autowired
private Environment env;
//springboot中只需在properties中配置了访问RabbitMQ的用户名和密码之后,直接引入rabbitTemplate即可
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
public void producerSimpleMessage(String message) throws JsonProcessingException {
rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name"));
Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();
rabbitTemplate.convertAndSend(msg);
}
}
2、一个简单的消费者
/**
* author:liman
* createtime:2019/10/29
*/
@Component
public class BasicConsumerService {
private static final Logger log= LoggerFactory.getLogger(BasicConsumerService.class);
@Autowired
private Environment env;
//RabbitListener注解中一个bindings就是一个绑定关系,这个关系绑定了交换机和队列。
@RabbitListener(
bindings = @QueueBinding(value=@Queue(value="${basic.info.mq.queue.name}",durable = "true")
,exchange = @Exchange(value="${basic.info.mq.exchange.name}",type=ExchangeTypes.TOPIC)
,key="${basic.info.mq.routing.key.name}")
)
//@Header注解直接从消息中取出tag
public void consumeMessage(@Payload String message,@Header (AmqpHeaders.DELIVERY_TAG) long delivertTag, Channel channel){
log.info("当前线程:{},收到的简单消息:{}",Thread.currentThread().getName(),message);
try {
//消息确认
log.info("{},消息确认完成",message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
这里代码中可以直观的看到,这里主要配置的就是@RabbitListener注解,这个注解中@QueueBinding注解将@Exchange指定的交换机和@Queue指定的队列完成了绑定,同时指定了交换机的类型(这里为topic类型的交换机) 。在上一篇博客中这个绑定操作是在一个configuration容器中完成的。
ChannelAwareMessageListener接口
另一种方式在上篇博客中也提到过,实现ChannelAwareMessageListener接口,但是这种方式下,消息队列与交换机的绑定关系需要我们在configuration类中配置。
这里我们先贴出Config类中的代码,不再贴出生产者。
/**
* author:liman
* createtime:2019/10/29
*/
@Configuration
public class RabbitConfig {
private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
private Environment env;
//springboot 会为我们自动初始化这个CachingConnectionFactory
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public DirectExchange productExchange() {
return new DirectExchange(env.getProperty("product.robbing.mq.exchange.name"));
}
@Bean
public Queue productQueue() {
return new Queue(env.getProperty("product.robbing.mq.queue.name"));
}
//在此完成交换机和队列的绑定
@Bean
public Binding bindProductExchangeQueue(){
return BindingBuilder.bind(productQueue()).to(productExchange())
.with(env.getProperty("product.robbing.mq.routing.key.name"));
}
@Autowired
private ProductConsumerListener productConsumerListener;
/**
* 个人理解为SimpleMessageListenerContainer就是一个消费者的容器配置类
* @return
*/
@Bean(name="simpleMessageListenerContainer")
public SimpleMessageListenerContainer productMessageListenerContainerFactory(){
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setQueues(productQueue());//设置感兴趣的队列
listenerContainer.setMessageListener(productConsumerListener);//这里设置了消息的消费者
listenerContainer.setPrefetchCount(5);//设置预取消息的个数
return listenerContainer;
}
}
多个消费者处理批量消息
其实默认情况下,springboot中是开启了多个消费者对消息进行批处理的。只是还可以进一步进行配置
SimpleRabbitListenerContainerFactory
我个人理解为,是针对使用@RabbitListener注解的一种多消费者工厂配置类,可以在@RabbitListener注解中指定我们配置的SimpleRabbitListenerContainerFactory实例。同时,我们引入多线程的配置。如下所示
@RabbitListener(
bindings = @QueueBinding(value=@Queue(value="${basic.info.mq.queue.name}",durable = "true")
,exchange = @Exchange(value="${basic.info.mq.exchange.name}",type=ExchangeTypes.TOPIC)
,key="${basic.info.mq.routing.key.name}")
//指定我们配置的containerFactory
,containerFactory = "simpleRabbitListenerContainerFactory"
)
public void consumeMessage(@Payload String message,@Header (AmqpHeaders.DELIVERY_TAG) long delivertTag, Channel channel){
log.info("当前线程:{},收到的简单消息:{}",Thread.currentThread().getName(),message);
try {
Thread.sleep(2000L);//模拟处理耗时较长的场景
channel.basicAck(delivertTag,true);
log.info("{},消息确认完成",message);
} catch (Exception e) {
e.printStackTrace();
}
}
自己配置的ContainerFactory
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(){
SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory);
listenerContainerFactory.setConcurrentConsumers(15);
listenerContainerFactory.setMaxConcurrentConsumers(10);
listenerContainerFactory.setPrefetchCount(1);//预处理消息个数
listenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//开启消息确认机制
return listenerContainerFactory;
}
弄一个controller用于测试:
@RestController
public class SimpleMessageController {
private static final Logger log= LoggerFactory.getLogger(SimpleMessageController.class);
@Autowired
private BasicProducerService basicProducerService;
@RequestMapping("/send/simple")
public String sendSimpleMessage(@RequestParam String message) throws JsonProcessingException {
log.info("请求消息为:{}",message);
for(int i=0;i<1000;i++){//多线程的方式去发送数据
new Thread(new SendMessageThread(message+i,basicProducerService)).start();
}
basicProducerService.producerSimpleMessage(message);
return "success";
}
private class SendMessageThread implements Runnable{
private String message;
private BasicProducerService basicProducerService;
public SendMessageThread(String message, BasicProducerService basicProducerService) {
this.message = message;
this.basicProducerService = basicProducerService;
}
@Override
public void run() {
try {
basicProducerService.producerSimpleMessage(message);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
运行结果:
RabbitMQ控制台方面,根据设置的多线程个数不同,下图会有不同的表现:
控制台方面,可以明显的看到有多个消费者线程在处理批量的消息:
线程名明显不同。
SimpleMessageListenerContainer
我个人理解和上面介绍的SimpleRabbitListenerContainerFactory一样,这里就不再赘述作用,直接贴上配置
/**
* 个人理解为SimpleMessageListenerContainer就是一个消费者的容器配置类
* @return
*/
@Bean(name="simpleMessageListenerContainer")
public SimpleMessageListenerContainer productMessageListenerContainerFactory(){
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setConcurrentConsumers(10);
listenerContainer.setMaxConcurrentConsumers(20);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
listenerContainer.setQueues(productQueue());
listenerContainer.setMessageListener(productConsumerListener);//这里设置了消息的消费者
listenerContainer.setPrefetchCount(5);
return listenerContainer;
}
总结
似乎没什么难的,就是有点凌乱,稍后附上代码地址吧:https://download.csdn.net/download/liman65727/11943408
顺便补充一点:
当Exchange和RoutingKey相同、queue不同时,所有消费者都能消费同样的信息;
当Exchange和RoutingKey、queue都相同时,消费者中只有一个能消费信息,其他消费者都不能消费该信息。