RabbitMq在我们日常开发中不可或缺,作为主流消息中间件,可以用于项目中的应用解耦、流量削峰、异步处理(非主流任务交由队列下发处理)等,本文着重介绍运用于项目中流量峰值时,依据服务器的消费能力进行削峰,最大限度保障服务器不宕机。
前期准备:安装rabbitMq、新建一个springboot项目
略…
第一步:pom文件中导入amqp依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步:yml中配置
此处生产者与消费者放到一个项目中,可以依据项目的需求调整生产者和消费者进行拆分。
除了通用配置外,还有两点说明:
1.生产者消息确认配置项,确认消息发送到交换机和队列
2.消费者配置手动确认配置项,默认消息是自动确认的,正常业务都需要手动确认(不手动确认,消息一直在)
生产者与消费者的详细完整配置如下:
spring:
#配置rabbitMQ
rabbitmq:
host: 192.168.144.133 #rabbitmq服务地址
port: 5672
username: admin
password: 123456
#消费者 手动确认配置项
listener:
type: simple
simple:
acknowledge-mode: MANUAL #消息确认方式 MANUAL手动确认 NONE不确认 AUTO自动确认
retry:
enabled: true #开启重试
max-attempts: 3 #最大重试次数
initial-interval: 5000ms #重试间隔时间
#生产者 消息确认配置项
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
第三步:生产者配置
配置主题型交换机、队列(交换机是消息队列传输的载体),并且将队列和交换机绑定,并且设置绑定路由键
/**
* 主题型交换机、队列配置
*/
@Configuration
public class TopicRabbitConfig {
@Bean
public Queue geoQueue() {
return new Queue("geo.inout.queue");
}
@Bean
public TopicExchange geoExchange() {
return new TopicExchange("geo-exchange");
}
/**
* 将队列和交换机绑定,而且绑定的键值为geo.key.inout
* 这样只要是消息携带的路由键是geo.key.inout,才会分发到该队列
*/
@Bean
Binding bindingGeoExchange() {
return BindingBuilder.bind(geoQueue()).to(geoExchange()).with("geo.key.inout");
}
}
消息确认回调函数配置(确认消息正常发送到RabbitMq上)
/**
* 消息确认回调函数配置
*/
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("ConfirmCallback: "+"相关数据:"+correlationData);
log.info("ConfirmCallback: "+"确认情况:"+ack);
log.info("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("ReturnCallback: "+"消息:"+returnedMessage.getMessage());
log.info("ReturnCallback: "+"回应码:"+returnedMessage.getReplyCode());
log.info("ReturnCallback: "+"回应信息:"+returnedMessage.getReplyText());
log.info("ReturnCallback: "+"交换机:"+returnedMessage.getExchange());
log.info("ReturnCallback: "+"路由键:"+returnedMessage.getRoutingKey());
}
});
return rabbitTemplate;
}
}
发送实体JSON序列化配置(防止消息乱码)
/**
* 发送实体JSON序列化配置
*/
@Configuration
public class RabbitProviderConfig implements InitializingBean {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void afterPropertiesSet() throws Exception {
//使用JSON序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
}
}
第四步:消费者配置
消费者消息接收监听类,说明:
1.调用channel.basicAck()方法为消费端执行消息手动确认,即消息被消费
2.因为RabbitMq要保持有序性,只有前面消费完了,后面才能消费,有可能出现消息消费慢的问题(接口反应处理慢)。这就需要在消费端开启多线程监听队列,
具体设置concurrency为开启多线程监听队列,concurrency = “5-8”:表示开启5个线程监听队列,最大为8个线程
/**
* 主题型-消息接收监听类
*/
@Slf4j
@Component
public class GeoMQReceiver {
/**
* 默认是单线程监听队列,消息消费会慢
* 设置concurrency为开启多线程监听队列,concurrency = "5-8":表示开启5个线程监听队列,最大为8个线程
*/
@RabbitHandler
@RabbitListener(queues = "geo.inout.queue", concurrency = "5-8")
public void process(Map<String,Object> map, Message message, Channel channel) throws IOException {
try {
// TODO 你的业务处理
// 手动确认消息
// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 确认失败 将消息重新放回队列,让别人消费
// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
接收实体JSON序列化配置(防止消息乱码)
/**
* 接收实体JSON序列化配置
*/
@Configuration
public class RabbitMQConfig {
@Bean
public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
}
第五步:编写测试进行测试
提供两个方法进行模拟,方法一为正常调用接口,方法二为通过消息中间件调用接口
@RestController
@RequestMapping("/geo")
public class GeoController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 正常调用接口
*/
@PostMapping("/test1")
public void test1(@RequestBody Map<String, Object> params){
business(params);
}
/**
* 通过消息中间件调用接口
*/
@PostMapping("/test2")
public void test2(@RequestBody Map<String, Object> params){
rabbitTemplate.convertAndSend("geo-exchange", "geo.key.inout", params);
}
/**
* 业务处理方法
*/
public void business(Map<String, Object> params){
//你的实际业务代码块
try {
String id = (String)params.get("id");
log.info("获取到的参数:"+id);
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消息接收监听类中处理业务方法
/**
* 主题型-消息接收监听类
*/
@Slf4j
@Component
public class GeoMQReceiver {
@Autowired
private GeoController geoController;
/**
* 默认是单线程监听队列,消息消费会慢
* 设置concurrency为开启多线程监听队列,concurrency = "5-8":表示开启5个线程监听队列,最大为8个线程
*/
@RabbitHandler
@RabbitListener(queues = "geo.nb.inout.queue", concurrency = "5-8")
public void process(Map<String,Object> map, Message message, Channel channel) throws IOException {
try {
geoController.business(map);
// 手动确认消息
// 第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 确认失败 将消息重新放回队列,让别人消费
// 第二个参数,true会重新放回队列,所以需要自己根据业务逻辑判断什么时候使用拒绝
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
第六步:使用JMeter进行压力测试
略…
总结:当项目并发压力上来的时候,可以通过增加服务器性能、服务集群、数据缓存批量处理、数据库分库等多种方案进行提升服务并发性能,基于消息中间件削峰只是其中的一个技术方案,可以依据自己的项目需求选择合理的并发削峰方案。建议做成配置性选择走常规路由或消息中间件,后期只需要根据业务数据请求量在配置中心改配置就可以实现了,保障服务不会宕机。