RocketMQ和其他消息中间件最大的一个区别是支持了事务消息,这也是分布式事务里面的基于消息的最终一致性方案。
1.事务消息是什么?
事务消息:具有事务特性的消息,即Producer发送到broker后,该消息可以回滚或者提交(提交后Consumer才可见)。
2.事务消息有什么用?
RocketMQ官方示例:用户A发起订单,支付100块钱操作完成后,能得到100积分,账户服务和会员服务是两个独立的微服务模块,有各自的数据库,按照上文提及的问题可能性,将会出现这些情况:
-
如果先扣款,再发消息,可能钱刚扣完,宕机了,消息没发出去,结果积分没增加。
-
如果先发消息,再扣款,可能积分增加了,但钱没扣掉,白送了100积分给人家。
// 先扣款,再加积分伪代码:
@Transational
pay() {
mysql.payMoney() // 数据库中添加用户信息
reduceRepo() // 数据库中减少库存
}
-------------------------------------// 若先发消息再加积分,那在这行宕机怎么办?
producer.send(msg) // 发送100积分
==> 所以上述方式不可行,我们可以先发送消息(加积分)到Broker,但将消息置为Consumer不可见状态
- 若本地事务(扣款)处理成功了再让Consumer可见
- 若本地事务(扣款)失败了就回滚当前消息
这里可能会存在一个问题,生产者本地事务成功后,发送事务确认消息到broker上失败了怎么办?
这个时候意味着消费者无法正常消费到这个消息。所以RocketMQ提供了消息回查机制,如果事务消息一直处于中间状态,broker会发起重试去查询broker上这个事务的处理状态。一旦发现事务处理成功,则把当前这条消息设置为可见
整体的模型图如下:
从上例我们看见,事务消息一般先于本地事务使用。这里也可以理解成嵌套事务,发消息是外层事务,本地事务是内存事务。
扫描二维码关注公众号,回复: 12741944 查看本文章
3.Java使用事务消息?
针对上面的示例,我们来看看如何通过具体的代码实现。
TransactionProducer
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 这里用的是事务Producer(TransactionMQProducer)
TransactionMQProducer transactionProducer=new TransactionMQProducer("tx_producer_group");
transactionProducer.setNamesrvAddr("43.105.136.120:9876");
// 自定义线程池,用于异步执行事务操作
transactionProducer.setExecutorService(Executors.newFixedThreadPool(10); );
// 核心!!添加事务消息监听
transactionProducer.setTransactionListener(new TransactionListenerLocal());
transactionProducer.start();
for(int i=0;i<20;i++) {
String orderId= UUID.randomUUID().toString();
String body="{'operation':'doOrder','orderId':'"+orderId+"'}";
// 构建消息
Message message = new Message("pay_tx_topic", "TagA",orderId, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息, 注:是发送事务消息
transactionProducer.sendMessageInTransaction(message, orderId+"&"+i);
Thread.sleep(1000); // 1秒一次
}
}
}
TransactionListenerLocal(事务消息核心)
// 本地事务监听,实现TransactionListener接口
public class TransactionListenerLocal implements TransactionListener {
private static final Map<String,Boolean> results=new ConcurrentHashMap<>();
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println(":执行本地事务:"+arg.toString());
String orderId=arg.toString();
// 模拟数据入库操作(成功/失败)
boolean rs=saveOrder(orderId);
return rs? LocalTransactionState.COMMIT_MESSAGE:LocalTransactionState.UNKNOW;
// 这个返回状态表示告诉broker这个事务消息是否被确认,允许给到consumer进行消费
// LocalTransactionState.ROLLBACK_MESSAGE 回滚
// LocalTransactionState.UNKNOW 未知
}
// 提供事务执行状态的回查方法,提供给broker回调
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId=msg.getKeys();
System.out.println("执行事务执行状态的回查,orderId:"+orderId);
boolean rs=Boolean.TRUE.equals(results.get(orderId));
System.out.println("回调:"+rs);
return rs?LocalTransactionState.COMMIT_MESSAGE:
LocalTransactionState.ROLLBACK_MESSAGE;
}
private boolean saveOrder(String orderId){
//如果订单取模等于0,表示成功,否则表示失败
boolean success=Math.abs(Objects.hash(orderId))%2==0;
results.put(orderId,success);
return success;
}
}
TransactionConsumer
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException, IOException {
DefaultMQPushConsumer defaultMQPushConsumer=new
DefaultMQPushConsumer("tx_consumer_group");
defaultMQPushConsumer.setNamesrvAddr("43.105.136.120:9876");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_O FFSET);
defaultMQPushConsumer.subscribe("pay_tx_topic","*");
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)
(msgs, context) -> {
msgs.stream().forEach(messageExt -> {
try {
String orderId=messageExt.getKeys();
// 拿到消息
String body=new String(messageExt.getBody(),
RemotingHelper.DEFAULT_CHARSET);
// 扣减库存
System.out.println("收到消息:"+body+",开始扣减库存:"+orderId);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
defaultMQPushConsumer.start();
System.in.read();
}
}
4.事务消息的三种状态?
在上面的 TransactionListenerLocal 类中,我们看见重写的两个方法都需要返回 LocalTransactionState,表示告诉broker对于这条已经存在了的事务消息如何处理:
-
ROLLBACK_MESSAGE:回滚事务
当executeLocalTransaction方法返回ROLLBACK_MESSAGE时,表示直接回滚事务
-
COMMIT_MESSAGE: 提交事务
-
UNKNOW: broker会定时的回查Producer消息状态,直到彻底成功或失败。
当返回UNKNOW时,Broker会在一段时间之后回查checkLocalTransaction,根据 checkLocalTransaction返回状态执行事务的操作(回滚或提交)
如示例中,当返回 ROLLBACK_MESSAGE 时消费者不会收到消息,且不会调用回查函数,当返回 COMMIT_MESSAGE 时事务提交,消费者收到消息,当返回UNKNOW时,在一段时间之后调用回查函数,并根据status判断返回提交或回滚状态,返回提交状态的消息将会被消费者消费,所以此时消费者可以消费部分消息。