引入maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
添加配置:
spring:
rabbitmq:
host: 192.168.56.10
port: 5672
#虚拟主机
virtual-host: /
#开启发送端发送数据到broker确认接收到数据
publisher-confirms: true
#开启发送端确认,确认数据已经抵达队列
publisher-returns: true
template:
#抵达队列,以异步模式优先回调组合ReturnCallback
mandatory: true
listener:
simple:
#手动ack消息 手动确认收货 手动确认模式 防止消息丢失
acknowledge-mode: manual
添加配置类
/**
* 运行之前,一定要小心,否则要删除队列/交换机重新运行 麻烦!
*
* 解决消息丢失(最怕)
* 1 做好消息确认机制(publisher,consumer【手动ack】)
* 2 每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一次
* 解决消息重复
* 1 幂等性
* 2 防重表
* 3 RabbitMQ自带redelivered (做法过于暴力)
* 解决消息积压
* 1 增加更多的消费者
* 2 上线专门的队列消费服务,取出来,记录到数据库,离线慢慢处理
*/
//开启RabbitMQ消息队列
@EnableRabbit
@Configuration
public class MyRabbitMQConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "order.release.order.queue")
public void listening(OrderEntity entity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单,准备关闭订单。order:"+entity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
//容器中的组建Queue Exchange Binding 都会自动创建(前提是RabbitMQ没有)
@Bean
public Queue orderDelayQueue() {
// String name, boolean durable, boolean exclusive, boolean autoDelete,
// @Nullable Map<String, Object> arguments
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");//死信交换机
arguments.put("x-dead-letter-routing-key", "order.release.order");//死信路由键
arguments.put("x-message-ttl", 60000);//消息过期时间 ms 1分钟
return new Queue("order.delay.queue", true, false, false, arguments);
}
@Bean
public Queue orderReleaseOrderQueue() {
//普通队列
return new Queue("order.release.order.queue", true, false, false);
}
@Bean
public Exchange orderEventExchange() {
// String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
//普通交换机
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Binding orderCreateOrderBinding() {
//和延时队列绑定
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
@Bean
public Binding orderReleaseOrderBinding() {
//和普通队列绑定
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
@Bean
public Binding orderReleaseOtherBinding() {
//订单释放直接和库存释放进行绑定
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
//
@Bean
public Queue orderSeckillOrderQueue() {
return new Queue("order.seckill.order.queue", true, false, false);
}
@Bean
public Binding orderSeckillOrderQueueBinding() {
return new Binding("order.seckill.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.seckill.order",
new HashMap<>());
}
//
// /**
// * 下面全都是基础配置
// */
//
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
//
// /**
// * 定制rabbitTemplate
// * 1.publisher-confirms: true
// * 3.消费端确认 (保证每个消息被正确消费 此时才可以braker删除这个消息)
// * 1.默认是自动确认的 只要消息接收到 客户端自动确认服务端就要移除这个消息
// * 问题 :
// * 收到很多消息 自动回复给服务器ack 只有一个消息处理成功 宕机了 发现消息丢失
// * 手动确认模式: 只要我们没有确认高随MQ 货物被签收 没有ack
// * 消息就一直是unacked状态 即使Consumer宕机 消息不会丢失 会重新变成ready
// * 2.如果签收
// */
@PostConstruct //MyRabbitConfig对象创建完成以后执行这个方法
public void initRabbitTemplate() {
//设置确认回调 消息到了队列
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 1、消息抵达服务器 ack=true
* @param correlationData 当前消息唯一关联的数据 这个是消息唯一id
* @param ack 消息是否成功收到
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//服务器收到了
System.out.println("消息抵达服务器confirm....correlationData[" + correlationData + "]==>ack[" + ack + "]cause>>>" + cause);
}
});
//设置消息队列的确认回调 发送了,但是队列没有收到
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列 就触发这个失败回调
* @param message 投递失败的消息详细信息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 当时这个消息发给那个交换机
* @param routingKey 当时这个消息用那个路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//报错误 未收到消息
System.out.println("Fail!! Message[" + message + "]==>[" + exchange + "]==>routingKey[" + routingKey + "]");
}
});
}
}
RabbitMQ确认消息机制,可靠消息
发送端:
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍
为此引入确认机制
publisher confirmCallback确认消息是否已经抵达broker
publisher returnCallback 如果消息未投递到queue则触发
confirmCallback 和 returnCallback 已经在配置类里面添加了。
可靠抵达-Ack消息确认机制
接收端:
- 消费者获取消息,成功处理,可回复ack给broker
basic.ack用于肯定确认;broker将移除此消息。
basic.nack用于否定确认,可以指定broker是否丢弃此消息,可以批量操作
basic.reject用于否定确认,同上,但不能批量操作 - 默认,消息被消费者收到,就会从broker的queue中移除
消费者收到消息,默认会自动ack,但是如果无法确定此消息是否被处理完成,或者成功处理,我们可以开启手动ack模式。
消息处理成功,ack(),接收下一个消息,此消息broker就会移除
消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack。
消息一直没有调用ack/nack()方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人。
接收端代码:
//这个类能接受hello-java-queue消息
@RabbitListener(queues = {
"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
@Override
public PageUtils queryPage(Map<String, Object> params) {
IPage<OrderItemEntity> page = this.page(
new Query<OrderItemEntity>().getPage(params),
new QueryWrapper<OrderItemEntity>()
);
return new PageUtils(page);
}
/**
* 监听消息
* queues 声明需要监听的所有队列
* org.springframework.amqp.core.Message
* <p>
* 参数可以写一下类型
* 1、Message message: 原生消息详细信息。头+体
* 2、发送的消息的类型: OrderReturnReasonEntity content;
* 3、Channel channel:当前传输数据的通道
* <p>
* Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只能有一个收到此消息
* 1)、订单服务启动多个:同一个消息,只能有一个客户端收到
* 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一个消息
*/
// @RabbitListener(queues = {"hello-java-queue"})
// 这个类的这个方法才能接受hello-java-queue消息
@RabbitHandler
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {
//拿到消息体
// byte[] body = message.getBody();
//拿到消息头
// MessageProperties properties = message.getMessageProperties();
System.out.println("接收到消息:" + content);
//消息处理完 手动确认 deliveryTag在Channel内按顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag->" + deliveryTag);
try {
if (deliveryTag % 2 == 0) {
//确认签收 队列删除该消息 false非批量模式
channel.basicAck(deliveryTag, false);
System.out.println("签收了货物");
} else {
//拒收退货 第三个参数 -> true:重新入队 false:丢弃
channel.basicNack(deliveryTag, false, true);
System.out.println("没有签收货物");
}
} catch (IOException e) {
//网络中断
}
}
如何保证消息可靠性-消息丢失,消息重复
1、消息丢失:
- 消息发送出去之后,由于网络问题没有抵达服务器。
- 做好容错方法(try-catch)发送消息可能会网络失败,失败后要有重试机制,可记录到数据库。采用定期扫描重发的方式
- 做好日志记录,每个消息状态是否都被服务器收到应该记录。
- 做好定期重发,如果消息没有发送成功,定期取数据库扫描未成功的消息进行重发。
消息抵达Broke,Broke要将消息写入磁盘(持久化)才算是成功,此时Broke尚未持久化完成,宕机。 - publish也必须加入确认机制,确认成功的消息,修改数据库消息状态。
- 自动ACK的状态下,消费者收到消息,但没来得及消息宕机。
一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新协入队
2、消费重复
- 消费成功,事务小女已经提交,ack时,机器宕机,导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
- 消费失败,由于重试机制,自动又将消息发送出去。
- 成功消费,cak时宕机,消息由unack变为ready,Broke又重新发送。
消费者业务消费接口应该设计为幂等性,比如。