目录
为什么有ack机制了,还提出一个事物消息。rocketmq事物消息能给我带来什么?
学习之前先了解一下什么是rocketMq的事务消息?
秉承懒人的性格。这里直接看别人写的博客吧! rocketmq事务消息
大致的流程总结
1) 发送方向 RocketMQ 发送“待确认” (Prepare) 消息。
2 ) RocketMQ 将收到的“待确认” (一般写入一个 HalfTopic 主题<RMQ_SYS_TRANS_HALF_TOPIC>)消息持化成功后, 向发送方回复消息已经发送成功, 此
时第一阶段消息发送完成。
发送方开始执行本地事件逻辑.
3) 发送方根据事件执行结果向 RocketMQ 发送二次确认(Commit 还是 Rollback) 消息 RocketMQ 收到 Commit 则将第一阶段消息标记为可投递(这些
消息才会进入生产时发送实际的主题 RealTopic), 订阅方将能够收到该消息; 收到 Rollback 状态则删除第一阶段的消息, 订阅方接收不到该消息。
4) 如果出现异常情况, 步骤 3 提交的二次确认最终未到达 RocketMQ,服务器在经过固定时间段后将对“待确认” 消息、 发起回查请求.
5) 发送方收到消息回查请求后(如果发送一阶段消息的 Producer 不能工作, 回查请求将被发送到和 Producer 在同一个 Group 里的其他 Producer ),
通过检查对应消息的本地事件执行结果返回 Commit Roolback 状态。
为什么有ack机制了,还提出一个事物消息。rocketmq事物消息能给我带来什么?
rocketMq三种发送方式对比
三种发送方式,除了单向发送,无发送错误反馈。其余二中都能知道消息是否发送到服务端。
当然。前二种也有返回发送消息失败。实际消息已经发送到mq中的情况。这个时候需要消费者自己保证幂等性。这里不考虑。
那么都有消息确认机制了。为啥还有事物消息。什么场景能使用到事物消息?
思前想后,自己模拟了一个场景。业务大概是消费成功后推送消息扣减库存。如果简单使用消息确认机制。可能会有一下情况。
1.消费成功。发送队列成功 。没毛病,大概率这个情况
2.消费失败。直接回滚了也不会推送消息。
3.消费成功,发送队列返回失败。额。这怎么处理。
回滚消费业务?人家都付款成功了就因为发送队列失败。你把吃到肉吐出去。这个显然不可能。
不处理?不处理不扣库存,不发积分。你这么跟你领导回答看你们领导打不打你。
发送异常,插入发送失败表?这个好像可能。而且好像很多也是这么做的,这么做有问题吗?大概率没问题。不过也有可能insert失败表插入失败的情况。不过是小概率。
这里事物消息的场景就体现出来了。事物消息。也可以理解为可靠消息。或者一种解决方案。至少这里我是这么理解的
那么一言不合上代码吧!
Springboot 集成RocketMq发送事物消息
Pom引入 这里有boot-starter不就直接使用原生的client了如果先要看看实现,直接搜下面的入口类
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
言归正传
模拟业务,订单交易成功,订单表order_info插入数据,order_slip 流水表插入记录。并且推送消息队列。
public interface IOrderInfoService extends IService<OrderInfo> {
/**
* 交易成功 插入流水
* @param orderInfo
*/
void insertOrderInfo(OrderInfo orderInfo);
}
public interface IOrderSlipService extends IService<OrderSlip> {
/**
* 是否交易成功
* @param orderId
* @return
*/
boolean isExistTx(String orderId);
}
@Service
public class OrderInfoServiceImpl extends ServiceImpl<OrderInfoMapper, OrderInfo> implements IOrderInfoService {
@Resource
private IOrderSlipService orderSlipService;
@Override
@Transactional(rollbackFor = Exception.class)
public void insertOrderInfo(OrderInfo orderInfo) {
baseMapper.insert(orderInfo);
OrderSlip orderSlip = new OrderSlip();
orderSlip.setOrderId(orderInfo.getOrderId());
orderSlip.setStatus(1);
orderSlipService.save(orderSlip);
}
}
@Service
public class OrderSlipServiceImpl extends ServiceImpl<OrderSlipMapper, OrderSlip> implements IOrderSlipService {
@Override
public boolean isExistTx(String orderId) {
QueryWrapper<OrderSlip> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("order_id",orderId);
queryWrapper.eq("status",1);
OrderSlip orderSlip = baseMapper.selectOne(queryWrapper);
return (orderSlip!=null);
}
}
rocketMqTransactionListener实现
@Component
@RocketMQTransactionListener
public class RocketMqTransactionListenerImpl implements RocketMQLocalTransactionListener{
@Resource
private IOrderInfoService orderInfoService;
@Resource
private IOrderSlipService orderSlipService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
try{
// 执行业务逻辑 控制本地事务
String jsonString = new String((byte[]) message.getPayload());
// 比如 订单交易-插入流水 (同一事务)
OrderInfo orderInfo = JSON.parseObject(jsonString, OrderInfo.class);
orderInfoService.insertOrderInfo(orderInfo);
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
System.out.println(e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
RocketMQLocalTransactionState state;
String jsonString = new String((byte[]) msg.getPayload());
OrderInfo orderInfo = JSON.parseObject(jsonString, OrderInfo.class);
boolean existTx = orderSlipService.isExistTx(orderInfo.getOrderId());
if(existTx){
state = RocketMQLocalTransactionState.COMMIT;
}else{
state = RocketMQLocalTransactionState.UNKNOWN;
}
return state;
}
}
Controller实现
@RestController
@RequestMapping("/mq/product")
public class RocketMqProductController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/test")
public void myTestTran(){
OrderInfo orderInfo = new OrderInfo();
orderInfo.setGoodId(11);
orderInfo.setNum(2);
String orderId = UUID.randomUUID().toString().replaceAll("-","");
orderInfo.setOrderId(orderId);
Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(orderInfo)).build();
TransactionSendResult tx_product_msg = rocketMQTemplate.sendMessageInTransaction("tx_product_msg", message, null);
System.out.println("发送成功");
}
}
这里就是一个demo。以及大概的实现。当然生产中不可能这么简单需要具体按照业务去考虑。rocketMq只是提供了一个模板。
简单表述一下,就是如果业务成功之后。rocket commit失败。会有定时任务调用流水接口查看订单是否成功,当然现实中不可能只判断有没有这个记录这么简单。
这只是本人的观点。如果有错误请大家指出。再次感谢