一. 分布式事务概念
说到分布式事务,就会谈到那个经典的”账户转账”问题:2个账户,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?
一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。
但这里面有个问题:A是先update DB,后发送消息呢?还是先发送消息,后update DB?假设先update DB成功,发送消息网络失败,重发又失败,怎么办? 假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?
所以,这里下个结论:只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。
那这个问题怎么解决呢??为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。
具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。
具体来说,上面的2个步骤,被分解成3个步骤:
(1) 发送Prepared消息
(2) update DB
(3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。
可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?
有了上述的概念,我们详细解释一下事务消息交互的过程。
- 首先,MQ发送方向MQ服务(即RocketMQ的Broker)发送半消息。
- MQ服务端会将消息做持久化处理,并发送ACK确认消息已经发送成功。
- MQ发送方执行本地事务
- MQ发送方根据本地事务执行的结果向MQ服务提交二次确认:如果本地事务执行成功,则提交消息状态为Commit,否则为Rollback。MQ服务端收到Commit状态的消息将消息标记为可投递状态,订阅方最终会收到该条消息。如果收到的是Rollback,最终MQ服务端会删除该条半消息,订阅方不会接收到这条消息。
- 如果出现网络闪断、应用重启等情况,4阶段替提交的二次确认最终并未能到达MQ服务端,一定时间之后,MQ服务端会对此消息发起回查操作,确认发送方本地事务的执行状态。
- 发送方需要实现服务回查逻辑供MQ服务端进行回调。当发送方收到回查后,需要检查对应消息的本地事务执行的最终结果,此处也需要根据本地事务的成功或失败返回Commit或者Rollback,即再次提交消息状态的二次确认,MQ服务端仍会按照步骤4对该半消息进行操作。
注意 1-4 为事务消息的发送过程, 5-6 为事务消息的回查过程。
二. RocketMQ分布式事务Demo
MQ发送方代码:
package com.rocketmq.yy.producer.transaction;
import com.rocketmq.yy.constants.Const;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.omg.CORBA.PUBLIC_MEMBER;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("text_tx_producer_group_name");
//线程池
ExecutorService executorService = new ThreadPoolExecutor(2,5,100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
new ThreadFactory(){
@Override
public Thread newThread(Runnable r){
Thread thread = new Thread(r);
thread.setName("text_tx_producer_group_name"+"check-thread"); //一般线程都要起名字,方便排错日志查询等
return thread;
}
});
producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE); //RocketMQ的ip地址,放在Const类中了,本文没贴出来
producer.setExecutorService(executorService);
// 这个对象主要做两个事情,1:一步执行本地事务;2:做回查
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.start();;
Message message = new Message("test_tx_topic","TagA", "Key",
("hello rocketmq 4 tx!").getBytes(RemotingHelper.DEFAULT_CHARSET)/*实际的消息体*/);
producer.sendMessageInTransaction(message,"我是回调的参数");
Thread.sleep(Integer.MAX_VALUE);
producer.shutdown();
}
}
TransactionListener(本地事务与回查函数)
package com.rocketmq.yy.producer.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
String callArg = (String)arg;
System.out.println("callArg:" + callArg);
System.out.println("message:" + message);
//tx.begin
// 数据库落库操作
// tx.commit
return LocalTransactionState.COMMIT_MESSAGE;
}
//check回调
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("-------回调消息检查-------" + msg);
return LocalTransactionState.COMMIT_MESSAGE;
}
}
MQ接收方代码
package com.rocketmq.yy.producer.transaction;
import com.rocketmq.yy.constants.Const;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_tx_consumer_group_name");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("test_tx_topic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt me = msgs.get(0);
try{
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
String body = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("收到事务消息,topic:" + topic + ",tags:" + tags + ", keys:" + keys + ",body" + body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
});
consumer.start();
System.out.println("tx consumer started...");
}
}
然后我们开启RocketMQ的四个主从节点,启动消费者
然后开启发送者:
我们现在再看消费者,就可以发现消费者接收到信息了、
如果我们将TransactionListener类中的执行本地事务方法的返回值改为
启动producer,执行完本地方法后,producer发给MQ是个unknown状态,所以MQ并不会把此条消息发给consumer,而是向producer进行轮询,producer会启动TransactionListener类中的回调方法,回调方法返回给MQ的是commit,则MQ接收到信息后就会把消息发给consumer。
下图是produce的控制台图,第一幅是produce执行本地事务输出的,第二幅是一分钟后MQ对producer进行轮询,producer执行回调函数输出的。
三 项目应用
我们先来看看业务流程图,红色框的部分就是我们要编写的业务逻辑
这里我们只填service层的业务代码,主要弄清业务逻辑和RocketMQ的基本编写
- PayServiceImpl:业务层,调用各种封装类方法完成整个逻辑
- TransactionProducer:封装类,发送消息
- TransactionListenerImpl:实现TransactionListener类,负责A账户的扣款逻辑业务,和发送消息给支付B
- CallbackService:回调给订单的业务逻辑类
- SyncProducer:分装类,同步发送消息
PayServiceImpl
package com.yy.paya.service.impl;
//jar包省略
public class PayServiceImpl implements PayService{
public static final String TX_PAY_TOPIC = "tx_pay_topic";
public static final String TX_PAY_TAGS = "pay";
@Autowired
private CustomerAccountMapper customerAccountMapper;
@Autowired
private TransactionProducer transactionProducer;
@Autowired
private CallbackService callbackService;
@Override
public String payment(String userId, String orderId, String accountId, double money) {
String paymentRet = "";
try {
// 最开始有一步 token验证操作(重复提单问题--比如老公老婆用同一账户在同一时间下单)
BigDecimal payMoney = new BigDecimal(money);
// 加锁开始(获取)每个acoount账号都有一个锁,所以不会引起并发问题
// 这里加锁是为了保证获取版本号和获取余额操作的原子性,这样可以保证余额不足的不会发消息给MQ
CustomerAccount old = customerAccountMapper.selectByPrimaryKey(accountId);
BigDecimal currentBalance = old.getCurrentBalance();
int currentVersion = old.getVersion();
// 要对大概率事件进行提前预判(小概率事件我们做放过,但是最后保障数据的一致性即可)
// 业务出发:
// 1 当前一个用户账户 只允许一个线程(一个应用端访问)
// 技术出发:
// 1 redis去重 分布式锁
// 2 数据库乐观锁去重
// 做扣款操作的时候:获得分布式锁,看一下能否获得
BigDecimal newBalance = currentBalance.subtract(payMoney);
//加锁结束(释放)
if(newBalance.doubleValue() > 0 ) { // 或者一种情况获取锁失败
// 1.组装消息
// 1.执行本地事务
String keys = UUID.randomUUID().toString() + "$" + System.currentTimeMillis();
Map<String, Object> params = new HashMap<>();
params.put("userId", userId);
params.put("orderId", orderId);
params.put("accountId", accountId);
params.put("money", money); //100
Message message = new Message(TX_PAY_TOPIC, TX_PAY_TAGS, keys, FastJsonConvertUtil.convertObjectToJSON(params).getBytes());
// 可能需要用到的参数
params.put("payMoney", payMoney);
params.put("newBalance", newBalance);
params.put("currentVersion", currentVersion);
// 同步阻塞
CountDownLatch countDownLatch = new CountDownLatch(1);
params.put("currentCountDown", countDownLatch);
// 本地的事务执行+消息发送
TransactionSendResult sendResult = transactionProducer.sendMessage(message, params);
countDownLatch.await();
// 当消息发送成功,本地事务也执行成功时
if(sendResult.getSendStatus() == SendStatus.SEND_OK
&& sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) {
// 回调order通知支付成功消息
callbackService.sendOKMessage(orderId, userId);
paymentRet = "支付成功!";
} else {
paymentRet = "支付失败!";
}
} else {
paymentRet = "余额不足!";
}
} catch (Exception e) {
e.printStackTrace();
paymentRet = "支付失败!";
}
return paymentRet;
}
}
package com.yy.paya.service.producer;
//jar包省略
/**
* 一个注入的封装类
*/
@Component
public class TransactionProducer implements InitializingBean {
private TransactionMQProducer producer;
private ExecutorService executorService; //回调check检查时用
@Autowired
private TransactionListenerImpl transactionListenerImpl;
private static final String NAMESERVER = "172.20.10.7:9876";
private static final String PRODUCER_GROUP_NAME = "tx_pay_producer_group_name";
//构造函数
private TransactionProducer() {
this.producer = new TransactionMQProducer(PRODUCER_GROUP_NAME);
this.executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(PRODUCER_GROUP_NAME + "-check-thread");
return thread;
}
});
this.producer.setExecutorService(executorService);
this.producer.setNamesrvAddr(NAMESERVER);
}
//继承了InitializingBean接口实现的方法,当我们的类TransactionProducer初始化完毕后再将transactionListenerImpl注入进来
@Override
public void afterPropertiesSet() throws Exception {
this.producer.setTransactionListener(transactionListenerImpl);
start(); //启动
}
private void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public void shutdown() {
this.producer.shutdown();
}
//真正发消息的方法
public TransactionSendResult sendMessage(Message message, Object argument) {
TransactionSendResult sendResult = null;
try {
sendResult = this.producer.sendMessageInTransaction(message, argument);
} catch (Exception e) {
e.printStackTrace();
}
return sendResult;
}
}
TransactionListenerImpl :
package com.yy.paya.service.producer;
//jar包省略
@Component
public class TransactionListenerImpl implements TransactionListener {
@Autowired
private CustomerAccountMapper customerAccountMapper;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.err.println("执行本地事务单元------------");
CountDownLatch currentCountDown = null;
try {
Map<String, Object> params = (Map<String, Object>) arg;
String userId = (String)params.get("userId");
String accountId = (String)params.get("accountId");
String orderId = (String)params.get("orderId");
BigDecimal payMoney = (BigDecimal)params.get("payMoney"); // 当前的支付款
BigDecimal newBalance = (BigDecimal)params.get("newBalance"); // 前置扣款成功的余额
int currentVersion = (int)params.get("currentVersion");
currentCountDown = (CountDownLatch)params.get("currentCountDown");
//updateBalance 传递当前的支付款 数据库操作:
Date currentTime = new Date();
int count = this.customerAccountMapper.updateBalance(accountId, newBalance, currentVersion, currentTime);
if(count == 1) {
currentCountDown.countDown();
return LocalTransactionState.COMMIT_MESSAGE;
} else {
currentCountDown.countDown();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
e.printStackTrace();
currentCountDown.countDown();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// TODO Auto-generated method stub
return null;
}
}
CallbackService :
package com.yy.paya.service.producer;
//jar包省略
/**
* 发给order系统,更改订单状态(已经付款)
*/
@Service
public class CallbackService {
public static final String CALLBACK_PAY_TOPIC = "callback_pay_topic";
public static final String CALLBACK_PAY_TAGS = "callback_pay";
public static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
@Autowired
private SyncProducer syncProducer;
public void sendOKMessage(String orderId, String userId) {
Map<String, Object> params = new HashMap<>();
params.put("userId", userId);
params.put("orderId", orderId);
params.put("status", "2"); //ok
String keys = UUID.randomUUID().toString() + "$" + System.currentTimeMillis();
Message message = new Message(CALLBACK_PAY_TOPIC, CALLBACK_PAY_TAGS, keys, FastJsonConvertUtil.convertObjectToJSON(params).getBytes());
SendResult ret = syncProducer.sendMessage(message);
}
}
SyncProducer:
package com.yy.paya.service.producer;
//jar包省略
@Component
public class SyncProducer {
private DefaultMQProducer producer;
private static final String NAMESERVER = "192.168.11.121:9876;192.168.11.122:9876;192.168.11.123:9876;192.168.11.124:9876";
private static final String PRODUCER_GROUP_NAME = "callback_pay_producer_group_name";
private SyncProducer() {
this.producer = new DefaultMQProducer(PRODUCER_GROUP_NAME);
this.producer.setNamesrvAddr(NAMESERVER);
this.producer.setRetryTimesWhenSendFailed(3); //失败后可以重发3次
start();
}
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public SendResult sendMessage(Message message) {
SendResult sendResult = null;
try {
sendResult = this.producer.send(message);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return sendResult;
}
public void shutdown() {
this.producer.shutdown();
}
}