上一篇Spring Boot单元测试我们讲解了如何对Spring boot项目进行单元测试。
下面我们继续讲解Com模块其他功能的实现,这篇我们讲解的是RabbitMq的集成和封装功能。
前言
ACK机制
下面是我看到网友对rabbitMq的ack机制的总结
1、什么是消息确认ACK。
答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。
2、ACK的消息确认机制。
答:ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。
如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务。
消息永远不会从RabbitMQ中删除,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的。
3、ACK机制的开发注意事项。
答:如果忘记了ACK,那么后果很严重。当Consumer退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内容,由于RabbitMQ会长时间运行,因此这个"内存泄漏"是致命的。
虚拟机
rabbitMq的默认虚拟机,有时候开发环境不同,或者不同业务场景使用不同虚拟机进行处理。
rabbitMq的虚拟机维护
#添加虚拟机
./rabbitmqctl add_vhost xx
#添加用户
./rabbitmqctl add_user name password
#分配角色
./rabbitmqctl set_user_tags name administrator
#设置权限
./rabbitmqctl set_permissions -p xx name ".*" ".*" ".*"
Spring boot虚拟机指定
spring:
rabbitmq:
username: name
password: password
virtual-host: XX
接收和消费流程
代码实现
首先引入POM文件,包含了单元测试的
<!-- api -->
<dependency>
<groupId>com.clark.daxian</groupId>
<artifactId>com-provider-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
confirmCallback和returnCallback
/**
* 设置rabbit配置
* @author 大仙
*/
@Slf4j
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate<String,String> redisTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this::confirm);
rabbitTemplate.setReturnCallback(this::returnedMessage);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//发送成功
if(ack){
//不做处理,等待消费成功,清楚缓存
log.info(correlationData.getId()+":发送成功");
}else{
//持久化到数据库
log.error(correlationData.getId()+":发送失败");
log.info("备份内容:"+redisTemplate.opsForValue().get(correlationData.getId()));
try {
}catch (Exception e){
log.error("记录mq发送端错误日志失败",e);
}
}
//不管成功与否读删除redis里面备份的数据
redisTemplate.delete(correlationData.getId());
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息主体 message : "+message);
log.error("描述:"+replyText);
log.error("消息使用的交换器 exchange : "+exchange);
log.error("消息使用的路由键 routing : "+routingKey);
}
}
消费流程
/**
* 消费者
* @author 大仙
*/
public abstract class DefaultListener<T extends MqMessage> {
protected static Logger logger = LoggerFactory.getLogger(DefaultListener.class);
private static final SerializerMessageConverter SERIALIZER_MESSAGE_CONVERTER = new SerializerMessageConverter();
private static final String ENCODING = Charset.defaultCharset().name();
public static final String PARENT_MESSAGE_CLASS = "com.clark.daxian.api.mq.entity.MqMessage";
@Autowired
private RedisTemplate<String,String> redisTemplate;
/**
* 业务执行方法
* @param content
*/
protected abstract void execute(T content)throws Exception;
/**
* 失败执行
* @param content
*/
protected abstract void failExecute(T content);
/**
* 接收到的消息处理
* @param message
* @param channel
* @throws IOException
*/
protected void receiveMessage(Message message, Channel channel) throws IOException{
/**
* 防止重复消费,可以根据传过来的唯一ID先判断缓存数据库中是否有数据
* 1、有数据则不消费,直接应答处理
* 2、缓存没有数据,则进行消费处理数据,处理完后手动应答
* 3、如果消息处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)
*/
try{
T content = getContent(message);
//已经消费,直接返回
if(canConsume(content,message.getMessageProperties().getConsumerQueue())){
logger.info(message.getMessageProperties().getConsumerQueue()+"已经消费过");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
//消费当前消息
execute(content);
logger.info(message.getMessageProperties().getConsumerQueue()+"消费成功");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
//消费成功删除key
//单个消息控制
String redisCountKey = "retry" + message.getMessageProperties().getConsumerQueue() + content.getId();
//队列控制
String queueKey = "retry" + message.getMessageProperties().getConsumerQueue();
redisTemplate.delete(redisCountKey);
redisTemplate.delete(queueKey);
}
}catch (Exception e){
e.printStackTrace();
try {
if(dealFailAck(message,channel)){
logger.info("回归队列:"+message);
}else{
logger.error("消费失败:"+message);
failExecute(getContent(message));
}
}catch (Exception e1){
//扔掉数据
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
logger.error("重试消费失败:"+message);
failExecute(getContent(message));
}
}
}
/**
*
* @param message
* @return
*/
private T getContent(Message message) {
String body = getBodyContentAsString(message);
Class<T> contentClass = null;
try {
contentClass = (Class<T>) (((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
}catch (Exception e){
throw new MqException("缺失泛型");
}
if(contentClass!=null&&contentClass.getName().equals(PARENT_MESSAGE_CLASS)){
throw new MqException("请指定相应的消息类型");
}
T content = JSONObject.toJavaObject(JSONObject.parseObject(body),contentClass);
return content;
}
/**
* 获取message的body
* @param message
* @return
*/
private String getBodyContentAsString(Message message) {
if (message.getBody() == null) {
return null;
}
try {
String contentType = (message.getMessageProperties() != null) ? message.getMessageProperties().getContentType() : null;
if (MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT.equals(contentType)) {
return SERIALIZER_MESSAGE_CONVERTER.fromMessage(message).toString();
}
if (MessageProperties.CONTENT_TYPE_TEXT_PLAIN.equals(contentType)
|| MessageProperties.CONTENT_TYPE_JSON.equals(contentType)
|| MessageProperties.CONTENT_TYPE_JSON_ALT.equals(contentType)
|| MessageProperties.CONTENT_TYPE_XML.equals(contentType)) {
return new String(message.getBody(), ENCODING);
}
}
catch (Exception e) {
// ignore
}
// Comes out as '[[email protected]' (so harmless)
return message.getBody().toString() + "(byte[" + message.getBody().length + "])";
}
/**
* 失败ACK
* @param message
* @param channel
* @return
* @throws IOException
* @throws InterruptedException
*/
private Boolean dealFailAck(Message message,Channel channel) throws IOException, InterruptedException{
T content = getContent(message);
//单个消息控制
String redisCountKey = "retry"+message.getMessageProperties().getConsumerQueue()+content.getId();
String retryCount = redisTemplate.opsForValue().get(redisCountKey);
long basic = 1000L;
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//队列控制
String queueKey = "retry"+message.getMessageProperties().getConsumerQueue();
//没有重试过一次
if(StringUtils.isBlank(retryCount)){
if(!redisTemplate.opsForValue().setIfAbsent(queueKey,"lock",25,TimeUnit.SECONDS)) {
channel.basicNack(deliveryTag,false,true);
logger.info("deliveryTag:"+deliveryTag);
return true;
}
//预防队列太长,造成延迟时间过长
redisTemplate.opsForValue().setIfAbsent(redisCountKey,"1",5,TimeUnit.MINUTES);
logger.info("开始第一次尝试:");
}else{
switch (Integer.valueOf(retryCount)){
case 1:
redisTemplate.opsForValue().set(redisCountKey,"2");
logger.info("开始第二次尝试:");
break;
case 2:
redisTemplate.opsForValue().set(redisCountKey,"3");
logger.info("开始第三次尝试:");
break;
case 3:
redisTemplate.opsForValue().set(redisCountKey,"4");
logger.info("开始第四次尝试:");
break;
default:
//扔掉消息,准备持久化
redisTemplate.delete(redisCountKey);
redisTemplate.delete(queueKey);
channel.basicNack(deliveryTag,false,false);
return false;
}
}
channel.basicNack(deliveryTag,false,true);
return true;
}
/**
* 是否能消费,防止重复消费
* @param content
* @param queueName
* @return
*/
private Boolean canConsume(T content,String queueName) {
if(redisTemplate.opsForValue().get(queueName+":"+content.getId())==null){
return false;
}else{
//存储消费标志
redisTemplate.opsForValue().set(queueName+":"+content.getId(),"lock",30, TimeUnit.SECONDS);
return true;
}
}
}
注解开启插拔
/**
* rabbitConfig相关配置
* @author 大仙
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(RabbitConfig.class)
public @interface EnableRabbitMq {
}
/**
* @Author: 朱维
* @Date 11:43 2019/12/10
*/
/**
* direct直连模型
* fanout无路由模式,使用场景广播消息
* topic 模糊路由模式,适用业务分组
* fanout>direct>topic 这里是多消费模式,topic和fanout都能实现,通过性能对比选择fanout 11>10>6
*/
public class RabbitConfig {
/**
* 初始化相关配置
* @return
*/
@Bean
public RabbitTemplateConfig rabbitTemplateConfig(){
RabbitTemplateConfig rabbitTemplateConfig = new RabbitTemplateConfig();
return rabbitTemplateConfig;
}
/**
* 提供者
* @return
*/
@Bean
public ProducerService producerService(){
ProducerServiceImpl producerService = new ProducerServiceImpl();
return producerService;
}
/**
* 测试队列名
*/
public static final String TEST_QUEUE = "test.q";
/**
* 测试交换机
*/
public static final String TEST_EXCHANGE = "test.ex";
/**
* 测试的路由器
*/
public static final String TEST_ROUTINGKEY = "test.rk";
@Bean
public DirectExchange testDirectExchange(){
DirectExchange directExchange = new DirectExchange(TEST_EXCHANGE);
return directExchange;
}
@Bean
public Queue testQueue(){
Queue queue = new Queue(TEST_QUEUE);
return queue;
}
/**
* 绑定
* @return
*/
@Bean
public Binding bindingTest() {
return BindingBuilder.bind(testQueue()).to(testDirectExchange()).with(TEST_ROUTINGKEY);
}
}
本来消息队列想在配置文件进行配置,然后进行动态绑定的。后来偷懒就没这么做,有想法的的可以尝试下。
到此rabbitmq相关功能封装完毕。
源码地址: