- 本地方法发送消息至borker
- borker 执行TransactionListener的方法。执行本地事务executeLocalTransaction
- 如果返回unkow状态,borker会回调checkLocalTransaction检查事务的结果,返回提交还是回滚
- borker设置消息对外可见,等待消费或主动推送
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
事务的流程如下图简述
实例代码
/**
* 基础事务消息
*
* @return TransactionMQProducer
* @throws MQClientException 抛出魔法
*/
@Bean
@Lazy
public TransactionMQProducer defaultTransactionMQProducer() throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer(TsitMQGroup.PROVIDER_INDUSTRY_GROUP);
producer.setNamesrvAddr(ipcProps.getRocketMQServer());
producer.start();
log.info("init ipc defaultTransactionMQProducer");
return producer;
}
发送事务消息
// 自定义线程池
transactionMQProducer.setExecutorService(ThreadFactory.getInstance());
transactionMQProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
List<TCmFp> list = fp.stream().peek(it -> {
it.setId(idUtils.nextIdStr());
it.setOrganId(user.getOrganId());
}).collect(Collectors.toList());
boolean flag = cmFpMapper.saveBatch(list) != 0;
// 需要borker回查就返回unkow
return flag ? LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 此方法 borker并不会回查,因为在本地事务就返回了回滚还是提交
// 当本地事务执行时返回unkow,borker回调用该该方法检查事务结果
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionMQProducer.sendMessageInTransaction(
new Message(TsitMQTopic.INDUSTRY_TOPIC, TsitMQTag.FP_CALLBACK, JSONUtil.toJsonStr(dto).getBytes(StandardCharsets.UTF_8)),
null);