SpringBoot整合RabbitMQ
一、auto模式(自动ACK)
RabbitMQ默认是auto模式,当监听消费者方法正常执行完毕后,由Spring自动向RabbitMQ返回ack确认;如果出现异常,就给RabbitMQ返回nack消费失败。
application.yml配置RabbitMQ消费者ACK应答模式
spring:
rabbitmq:
listener:
simple:
# none(无应答模式) auto(自动应答模式) manual(手动应答模式)
acknowledge-mode: auto
二、none模式(无ACK)
RabbitMQ认为所有消息都会被成功消费,所以RabbitMQ投递消息后会立即删除消息
application.yml配置RabbitMQ消费者ACK应答模式
spring:
rabbitmq:
listener:
simple:
# none(无应答模式) auto(自动应答模式) manual(手动应答模式)
acknowledge-mode: none
三、manual模式(手动ACK)
开发人员在处理完业务后,调用RabbitMQ封装好的API,向RabbitMQ返回ack确认消费成功或者消费失败
application.yml配置RabbitMQ消费者ACK应答模式
spring:
rabbitmq:
listener:
simple:
# none(无应答模式) auto(自动应答模式) manual(手动应答模式)
acknowledge-mode: manual
四、消费者失败重试
可以利用Spring本身自动重试的机制,当消费者出现异常后,在消费者内部进行本地重试;而不是让消息立刻重新回到队列,然后让RabbitMQ重新投递,会导致CPU飙升。(这样会导致无限循环 -> 消息一旦出现异常会不断投放回到队列,再重新发送给消费者)。
application.yml配置
定义队列和交换机
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RetryQueueConfig {
//定义direct类型交换机
@Bean
public DirectExchange retryExchange() {
return ExchangeBuilder.directExchange("retry.exchange").build();
}
//定义持久化队列
@Bean
public Queue retryQueue() {
return new Queue("retry.queue",true,false,false);
}
@Bean
public Binding retryQueueBinding(Queue retryQueue, DirectExchange retryExchange) {
return BindingBuilder.bind(retryQueue).to(retryExchange).with("retry");
}
}
模拟生产者
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RetryQueueTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//模拟生产者
@Test
public void test(){
rabbitTemplate.convertAndSend("retry.exchange","retry","模拟消息异常");
}
}
控制台输出
重试三次后就把消息删除了
重试失败后的恢复策略
在刚刚的本地重试中,在达到最大次数后,消息会被丢弃,这是Spring内部机制决定的。
但是,其实在重试多次消费仍然失败后,SpringAMQP提供了MessageRecoverer接口,定义了不同的恢复策略可以用来进一步处理消息:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢失消息。是默认的处理策略
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
实际开发中,比较优雅的一个方案是RepublishMessageRecoverer,将失败消息重新投递到一个专门用于存储异常消息的队列中,等待后续人工处理。
RepublishMessageRecoverer策略代码
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RepublishMessageRecovererConfig {
/*
* 消息消费失败后的恢复策略:使用RepublishMessageRecoverer策略:重试次数耗尽后,将失败消息投递到指定的交换机
*/
@Bean
public MessageRecoverer republishMsgRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
//定义Topic类型交换机
@Bean
public TopicExchange errorExchange() {
return ExchangeBuilder.topicExchange("error.exchange").build();
}
//定义队列
@Bean
public Queue errorQueue() {
return QueueBuilder.durable("error.queue").build();
}
//队列和交换机绑定
@Bean
public Binding errorQueueBinding(TopicExchange errorExchange, Queue errorQueue) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error.#");
}
}
这样就实现了异常消息重试耗尽后,就会投递到指定的异常队列中去,等待人工处理了