该代码来自于jeecg-boot,觉得设计的不错,分享给大家
结果展示
我们在消费mq的时候都会做消息确认机制,消息要是消费过程中发送异常,将消息回退。
正常代码
通常我们会直接try/catch我们的方法,如下
@RabbitListener(queuesToDeclare = @Queue(value = "simple.queue",durable = "true")) // queuesToDeclare 自动声明队列
public void holloWordListener(String msg, Channel channel, Message message) throws IOException {
try {
// 消息消费
System.out.println("msg = " + msg);
// 消息消费。。。。。。
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息消费异常,重回队列");
// 回退
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
// 确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
但是这样会有一个缺点,要是我们有很多个消费者,重复的代码太多。
封装后的代码
封装后我们只需要处理消息内容
@RabbitListener(queues = "rabbitListener")
public void nonTicketUploadHandler(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(json, deliveryTag, channel, new MqListener<String>() {
@Override
public void handler(String msg, Channel channel) throws IOException {
// 消息消费
System.out.println("msg = " + msg);
// 消息消费。。。。。。
}
});
}
上封装代码
封装消息确认
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import java.io.IOException;
@Slf4j
public class BaseRabbiMqHandler<T> {
public void onMessage(T t, Long deliveryTag, Channel channel, MqListener mqListener) {
try {
// 通过实现这个接口,处理我们的业务代码
mqListener.handler(t, channel);
/**
* 手动确认消息
* deliveryTag:该消息的index
* false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除 )
*/
channel.basicAck(deliveryTag, false);
}
catch (Exception e) {
log.error("接收消息失败,重新放回队列,异常原因:{}",e.getMessage());
try {
/**
* 重回队列
* deliveryTag:该消息的index
* multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
* requeue:被拒绝的是否重新入队列
*/
// channel.basicAck(deliveryTag, false);
channel.basicNack(deliveryTag, false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
mqListener接口
业务通过实现这个接口处理
import com.rabbitmq.client.Channel;
import java.io.IOException;
public interface MqListener<T> {
// 业务通过实现这个接口处理
default void handler(T map, Channel channel) throws IOException {
}
}
mq消费者
消费者代码示例
import com.rabbitmq.client.Channel;
import org.jeecg.boot.starter.rabbitmq.core.BaseRabbiMqHandler;
import org.jeecg.boot.starter.rabbitmq.listenter.MqListener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import java.io.IOException;
public class TestListener extends BaseRabbiMqHandler<String> {
// 继承封装的消息确认类
@RabbitListener(queues = "rabbitListener")
public void nonTicketUploadHandler(String json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
super.onMessage(json, deliveryTag, channel, new MqListener<String>() {
// 实现父类的消息方法里的接口
@Override
public void handler(String msg, Channel channel) throws IOException {
// 业务代码写这里,try/catch不用再写了
System.out.println("msg = " + msg);
// 消息消费。。。。。。
}
});
}
}
rabbitmq详细讲解、
各个模式代码示例
延迟队列、死信队列、ttl队列等。
看这篇,链接: https://blog.csdn.net/qq_48721706/article/details/125194646?spm=1001.2014.3001.5502
给**的推荐受影响凑字数。。。。
给**的推荐受影响凑字数。。。。
给**的推荐受影响凑字数。。。。
给**的推荐受影响凑字数。。。。
给**的推荐受影响凑字数。。。。