日常记录-SpringBoot整合RabbitMQ第四节(消费者确认)

一、auto模式(自动ACK)

RabbitMQ默认是auto模式,当监听消费者方法正常执行完毕后,由Spring自动向RabbitMQ返回ack确认;如果出现异常,就给RabbitMQ返回nack消费失败。

application.yml配置RabbitMQ消费者ACK应答模式

spring:
  rabbitmq:
    listener:
      simple:
        # none(无应答模式) auto(自动应答模式) manual(手动应答模式)
        acknowledge-mode: auto

二、none模式(无ACK)

RabbitMQ认为所有消息都会被成功消费,所以RabbitMQ投递消息后会立即删除消息

application.yml配置RabbitMQ消费者ACK应答模式

spring:
  rabbitmq:
    listener:
      simple:
        # none(无应答模式) auto(自动应答模式) manual(手动应答模式)
        acknowledge-mode: none

三、manual模式(手动ACK)

开发人员在处理完业务后,调用RabbitMQ封装好的API,向RabbitMQ返回ack确认消费成功或者消费失败

application.yml配置RabbitMQ消费者ACK应答模式

spring:
  rabbitmq:
    listener:
      simple:
        # none(无应答模式) auto(自动应答模式) manual(手动应答模式)
        acknowledge-mode: manual

四、消费者失败重试

可以利用Spring本身自动重试的机制,当消费者出现异常后,在消费者内部进行本地重试;而不是让消息立刻重新回到队列,然后让RabbitMQ重新投递,会导致CPU飙升。(这样会导致无限循环 -> 消息一旦出现异常会不断投放回到队列,再重新发送给消费者)。

application.yml配置
在这里插入图片描述

定义队列和交换机

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RetryQueueConfig {
    
    

    //定义direct类型交换机
    @Bean
    public DirectExchange retryExchange() {
    
    
        return ExchangeBuilder.directExchange("retry.exchange").build();
    }


    //定义持久化队列
    @Bean
    public Queue retryQueue() {
    
    
        return new Queue("retry.queue",true,false,false);
    }

    @Bean
    public Binding retryQueueBinding(Queue retryQueue, DirectExchange retryExchange) {
    
    
        return BindingBuilder.bind(retryQueue).to(retryExchange).with("retry");
    }
}

模拟生产者

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RetryQueueTest {
    
    

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //模拟生产者
    @Test
    public void test(){
    
    
        rabbitTemplate.convertAndSend("retry.exchange","retry","模拟消息异常");
    }
}

控制台输出
重试三次后就把消息删除了
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

重试失败后的恢复策略

在刚刚的本地重试中,在达到最大次数后,消息会被丢弃,这是Spring内部机制决定的。

但是,其实在重试多次消费仍然失败后,SpringAMQP提供了MessageRecoverer接口,定义了不同的恢复策略可以用来进一步处理消息:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢失消息。是默认的处理策略

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

实际开发中,比较优雅的一个方案是RepublishMessageRecoverer,将失败消息重新投递到一个专门用于存储异常消息的队列中,等待后续人工处理。

RepublishMessageRecoverer策略代码

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
public class RepublishMessageRecovererConfig {
    
    

    /*
     * 消息消费失败后的恢复策略:使用RepublishMessageRecoverer策略:重试次数耗尽后,将失败消息投递到指定的交换机
     */
    @Bean
    public MessageRecoverer republishMsgRecoverer(RabbitTemplate rabbitTemplate) {
    
    
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
    }


    //定义Topic类型交换机
    @Bean
    public TopicExchange errorExchange() {
    
    
        return ExchangeBuilder.topicExchange("error.exchange").build();
    }

    //定义队列
    @Bean
    public Queue errorQueue() {
    
    
        return QueueBuilder.durable("error.queue").build();
    }

    //队列和交换机绑定
    @Bean
    public Binding errorQueueBinding(TopicExchange errorExchange, Queue errorQueue) {
    
    
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error.#");
    }


}

在这里插入图片描述
这样就实现了异常消息重试耗尽后,就会投递到指定的异常队列中去,等待人工处理了

猜你喜欢

转载自blog.csdn.net/qq407995680/article/details/132108634