1.定时关单功能实现
-
利用rabbitmq的延迟队列来做用户超过30分钟未支付订单关单功能
-
业务实现流程图
2.超时关单实现步骤
-
我这边使用的是rocketmq来实现
-
配置
<!--add dependency in pom.xml--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
# rocketmq配置 rocketmq: producer: retry-times-when-send-async-failed: 4 # 当出现网络问题,可以自动重试,同步发送消息重试次数,默认为 2 group: releaseStock
-
30分钟未支付关单实现
// 锁定库存成功后,发送关单消息 /** * String destination 目的地 topic:tag * Message<?> message 消息内容 * long timeout 消息发送超时时间 * int delayLevel 延迟等级 */ rocketMQTemplate.syncSend(RocketMqConstant.ORDER_RELEASE_DESTINATION, MessageBuilder.withPayload(orderCreateTo.getOrder()).build(), 3000, 4);
-
监听器实现
@Component @Slf4j @RocketMQMessageListener(consumerGroup = "orderGroup", topic = RocketMqConstant.ORDER_RELEASE_TOPIC, selectorExpression = "*", messageModel = MessageModel.CLUSTERING, // 集群模式下消息消费失败后可以重试 selectorType = SelectorType.TAG) public class OrderCloseListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { @Autowired private OrderService orderService; @Override public void onMessage(MessageExt message) { log.info("收到消息:top={},tag={},body={}", message.getTopic(), message.getTags(), new String(message.getBody())); Order order = JSONObject.parseObject(new String(message.getBody()), Order.class); // 消息出现异常 底层会返回状态ConsumeConcurrentlyStatus.RECONSUME_LATER // 会消费重试然后我下面设置了消息最大次数5,一旦超过5次后会进入死信队列 orderService.closeOrder(order); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 消息消费失败后设置重试消费最大次数 consumer.setMaxReconsumeTimes(5); // 消息消费超时时间 // consumer.setConsumeTimeout(); } }
-
service实现
@Override public void closeOrder(Order order) { String orderSn = order.getOrderSn(); order = getOne(Wrappers.<Order>lambdaQuery().eq(Order::getOrderSn, orderSn)); log.info("关闭订单,查询最新订单数据:{}", order); // 只有代付款状态才能关单 if (order.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) { order.setStatus(OrderStatusEnum.CANCLED.getCode()); updateById(order); log.info("关闭订单成功,发送消息通知扣减空库存"); OrderVo orderVo = BeanUtil.copyProperties(order, OrderVo.class); // 消息发送失败,默认重试4次 yml配置 rocketMQTemplate.syncSend(RocketMqConstant.STOCK_RELEASE_ORDER_STOCK_LOCKED, MessageBuilder.withPayload(orderVo).build()); } }
3.解锁库存实现步骤
-
锁定库存后,在库存工单表记录一下,作用:在锁定库存后业务出现异常,通过定时任务能找到锁定了多少件然后回滚库存,保证数据最终一致性
-
监听器实现
@Slf4j @Component @RocketMQMessageListener(consumerGroup = "stockGroup", topic = RocketMqConstant.STOCK_RELEASE_TOPIC, messageModel = MessageModel.CLUSTERING, selectorType = SelectorType.TAG) public class StockReleaseListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { @Autowired private WareSkuService wareSkuService; @Override public void onMessage(MessageExt messageExt) { log.info("收到消息:topic={},tag={},body={}", messageExt.getTopic(), messageExt.getTags(), new String(messageExt.getBody())); if ("stock-locked".equals(messageExt.getTags())) { // 解锁库存 订单取消后异常未发送消息未解锁库存 兜底 StockLockedTo stockLockedTo = JSONObject.parseObject(new String(messageExt.getBody()), StockLockedTo.class); wareSkuService.unlockStock(stockLockedTo); } else if ("order-stock-locked".equals(messageExt.getTags())) { // 订单取消后发送过来的消息 OrderVo orderVo = JSONObject.parseObject(new String(messageExt.getBody()), OrderVo.class); wareSkuService.unlockStock(orderVo); } } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 消息消费失败后设置重试消费最大次数 consumer.setMaxReconsumeTimes(5); // 消息消费超时时间 // consumer.setConsumeTimeout(); } }
-
service层实现
/** * 解锁 * 1、查询数据库关于这个订单锁定库存信息 * 有:证明库存锁定成功了 * 解锁:订单状况 * 1、没有这个订单,必须解锁库存 * 2、有这个订单,不一定解锁库存 * 订单状态:已取消:解锁库存 * 已支付:不能解锁库存 */ @Override public void unlockStock(StockLockedTo stockLockedTo) { WareOrderTaskDetail wareOrderTaskDetail = wareOrderTaskDetailService.getById(stockLockedTo.getDetailTo().getId()); if (wareOrderTaskDetail != null && wareOrderTaskDetail.getLockStatus() == 1) { WareOrderTask wareOrderTask = wareOrderTaskService.getById(stockLockedTo.getId()); String orderSn = wareOrderTask.getOrderSn(); R r = orderFeignService.queryOrderByOrderSn(orderSn); if (r.getCode() == 0) { OrderVo orderVo = r.getData(new TypeReference<OrderVo>() { }); if (orderVo == null || orderVo.getStatus() == 4) { // 解锁库存,修改detail状态 baseMapper.unLockSkuStock(wareOrderTaskDetail.getSkuId(), wareOrderTaskDetail.getWareId(), wareOrderTaskDetail.getSkuNum()); wareOrderTaskDetail.setLockStatus(2); wareOrderTaskDetailService.updateById(wareOrderTaskDetail); } } else { throw new RuntimeException(); } } } // 处理关单成功后,发送过来的消息 @Override @Transactional public void unlockStock(OrderVo orderVo) { String orderSn = orderVo.getOrderSn(); WareOrderTask wareOrderTask = wareOrderTaskService.getOne(Wrappers.<WareOrderTask>lambdaQuery().eq(WareOrderTask::getOrderSn, orderSn)); Long orderTaskId = wareOrderTask.getId(); List<WareOrderTaskDetail> wareOrderTaskDetails = wareOrderTaskDetailService.list(Wrappers.<WareOrderTaskDetail>lambdaQuery() .eq(WareOrderTaskDetail::getTaskId, orderTaskId) .eq(WareOrderTaskDetail::getLockStatus, 1)); for (WareOrderTaskDetail wareOrderTaskDetail : wareOrderTaskDetails) { baseMapper.unLockSkuStock(wareOrderTaskDetail.getSkuId(), wareOrderTaskDetail.getWareId(), wareOrderTaskDetail.getSkuNum()); wareOrderTaskDetail.setLockStatus(2); wareOrderTaskDetailService.updateById(wareOrderTaskDetail); } }
4.使用RocketMq延迟队列带来的问题
-
消息发送失败后,如何处理?
- 生产者发送消息失败后,可以重新投递消息给broker
- yml配置:retry-times-when-send-async-failed: 3 # 当出现网络问题,可以自动重试,同步发送消息重试次数,默认为 2
-
消息发送到broker中,broker持久化机制?
- 将broker设置为同步刷盘,flushDiskType=SYNC_FLUSH
- 同步刷盘的缺点是吞吐量太低了,每来一条消息就需要将数据写入磁盘中,异步刷盘吞吐量高,但有丢失消息的风险
-
消费者消费消息失败了,如何解决?
-
消息消费失败后,会有一个重试机制(仅在集群模式下有效)
-
-
消费到重复消息,怎么处理?
- 重复消息无法避免,可以在消息发送时给一个业务标识Id记录在消息表中,消费消息时查看此条消息是否有被消费过,我这边是通过业务幂等来处理的,无论你发过来多少条消息,只要数据状态被幂等了就不会去消费