很简单, 我们在生产者生产消息的时候, 都要给消息加一个messageId作为消息的唯一值, 就是为了消费方避免重复消费
@RabbitListener(queues = ORDER_SUBMIT)
@RabbitHandler
@Transactional
public void orderSubmitHandler(@Payload Map<Object, Object> map, Message message, Channel channel) throws IOException {
Object order = map.get("messageData");
String messageId = (String) map.get("messageId");
Object exist = redisUtils.get(messageId);
if (null != exist) {
// 丢弃消息至死信队列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
return;
}
// 业务代码
}
开启手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手动确认消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); //拒绝消息 false 拒绝消息重新排队
/**
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being rejected.
* @see com.rabbitmq.client.AMQP.Basic.Reject
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;