【RocketMQ】高级使用:四个问题详解事务消息

RocketMQ和其他消息中间件最大的一个区别是支持了事务消息,这也是分布式事务里面的基于消息的最终一致性方案。

1.事务消息是什么?

事务消息:具有事务特性的消息,即Producer发送到broker后,该消息可以回滚或者提交(提交后Consumer才可见)。

2.事务消息有什么用?

RocketMQ官方示例:用户A发起订单,支付100块钱操作完成后,能得到100积分,账户服务和会员服务是两个独立的微服务模块,有各自的数据库,按照上文提及的问题可能性,将会出现这些情况:

  • 如果先扣款,再发消息,可能钱刚扣完,宕机了,消息没发出去,结果积分没增加。

  • 如果先发消息,再扣款,可能积分增加了,但钱没扣掉,白送了100积分给人家。

// 先扣款,再加积分伪代码:
@Transational
pay() {
    mysql.payMoney() // 数据库中添加用户信息
    reduceRepo() // 数据库中减少库存  
}
-------------------------------------// 若先发消息再加积分,那在这行宕机怎么办?
producer.send(msg) // 发送100积分

==> 所以上述方式不可行,我们可以先发送消息(加积分)到Broker,但将消息置为Consumer不可见状态

  • 若本地事务(扣款)处理成功了再让Consumer可见
  • 若本地事务(扣款)失败了就回滚当前消息

这里可能会存在一个问题,生产者本地事务成功后,发送事务确认消息到broker上失败了怎么办?
这个时候意味着消费者无法正常消费到这个消息。所以RocketMQ提供了消息回查机制,如果事务消息一直处于中间状态,broker会发起重试去查询broker上这个事务的处理状态。一旦发现事务处理成功,则把当前这条消息设置为可见

整体的模型图如下:

在这里插入图片描述

从上例我们看见,事务消息一般先于本地事务使用。这里也可以理解成嵌套事务,发消息是外层事务,本地事务是内存事务。

扫描二维码关注公众号,回复: 12741944 查看本文章

3.Java使用事务消息?

针对上面的示例,我们来看看如何通过具体的代码实现。

TransactionProducer

public class TransactionProducer {
    public static void main(String[] args) throws Exception {  
        // 这里用的是事务Producer(TransactionMQProducer)
        TransactionMQProducer transactionProducer=new TransactionMQProducer("tx_producer_group");
        transactionProducer.setNamesrvAddr("43.105.136.120:9876");    
        // 自定义线程池,用于异步执行事务操作     
        transactionProducer.setExecutorService(Executors.newFixedThreadPool(10); );  
        // 核心!!添加事务消息监听     
        transactionProducer.setTransactionListener(new TransactionListenerLocal());
        transactionProducer.start();   
        
        for(int i=0;i<20;i++) {       
            String orderId= UUID.randomUUID().toString();    
            String body="{'operation':'doOrder','orderId':'"+orderId+"'}";     
            // 构建消息
            Message message = new Message("pay_tx_topic", "TagA",orderId, body.getBytes(RemotingHelper.DEFAULT_CHARSET));  
            // 发送消息, 注:是发送事务消息
            transactionProducer.sendMessageInTransaction(message, orderId+"&"+i);   
            Thread.sleep(1000); // 1秒一次
        }
    } 
}

TransactionListenerLocal(事务消息核心)

// 本地事务监听,实现TransactionListener接口
public class TransactionListenerLocal implements TransactionListener {
    private static final Map<String,Boolean> results=new ConcurrentHashMap<>();
    
    // 执行本地事务   
    @Override   
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
        System.out.println(":执行本地事务:"+arg.toString());    
        String orderId=arg.toString();  
        // 模拟数据入库操作(成功/失败)
        boolean rs=saveOrder(orderId);
        return rs? LocalTransactionState.COMMIT_MESSAGE:LocalTransactionState.UNKNOW;  
        // 这个返回状态表示告诉broker这个事务消息是否被确认,允许给到consumer进行消费    
        // LocalTransactionState.ROLLBACK_MESSAGE 回滚    
        // LocalTransactionState.UNKNOW  未知   
    }  
    
    // 提供事务执行状态的回查方法,提供给broker回调   
    @Override   
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {  
        String orderId=msg.getKeys();     
        System.out.println("执行事务执行状态的回查,orderId:"+orderId);  
        boolean rs=Boolean.TRUE.equals(results.get(orderId));   
        System.out.println("回调:"+rs);     
        return rs?LocalTransactionState.COMMIT_MESSAGE:     
      							  LocalTransactionState.ROLLBACK_MESSAGE;  
    }   
    
    private boolean saveOrder(String orderId){    
        //如果订单取模等于0,表示成功,否则表示失败  
        boolean success=Math.abs(Objects.hash(orderId))%2==0;   
        results.put(orderId,success);    
        return success;  
    } 
}

TransactionConsumer

public class TransactionConsumer {
    public static void main(String[] args) throws MQClientException, IOException {   
        DefaultMQPushConsumer defaultMQPushConsumer=new      
            DefaultMQPushConsumer("tx_consumer_group");   
        defaultMQPushConsumer.setNamesrvAddr("43.105.136.120:9876");       
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_O FFSET);   
        defaultMQPushConsumer.subscribe("pay_tx_topic","*");    
         
        defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently)
                  (msgs, context) -> {           
                      msgs.stream().forEach(messageExt -> {    
                          try {                 
                              String orderId=messageExt.getKeys();
                              // 拿到消息
                              String body=new String(messageExt.getBody(), 
                                                     RemotingHelper.DEFAULT_CHARSET); 
                              // 扣减库存
                              System.out.println("收到消息:"+body+",开始扣减库存:"+orderId);  
                          } catch (UnsupportedEncodingException e) {    
                              e.printStackTrace();                
                          }             
                      });   
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    
                  });     
        defaultMQPushConsumer.start();    
        System.in.read();  
    }
}

4.事务消息的三种状态?

在上面的 TransactionListenerLocal 类中,我们看见重写的两个方法都需要返回 LocalTransactionState,表示告诉broker对于这条已经存在了的事务消息如何处理:

  1. ROLLBACK_MESSAGE:回滚事务

    当executeLocalTransaction方法返回ROLLBACK_MESSAGE时,表示直接回滚事务

  2. COMMIT_MESSAGE: 提交事务

  3. UNKNOW: broker会定时的回查Producer消息状态,直到彻底成功或失败。

    当返回UNKNOW时,Broker会在一段时间之后回查checkLocalTransaction,根据 checkLocalTransaction返回状态执行事务的操作(回滚或提交)

如示例中,当返回 ROLLBACK_MESSAGE 时消费者不会收到消息,且不会调用回查函数,当返回 COMMIT_MESSAGE 时事务提交,消费者收到消息,当返回UNKNOW时,在一段时间之后调用回查函数,并根据status判断返回提交或回滚状态,返回提交状态的消息将会被消费者消费,所以此时消费者可以消费部分消息。

猜你喜欢

转载自blog.csdn.net/qq_33762302/article/details/114858933