SpringBoot集成RocketMQ实现事务消息 结合实际业务出发 多业务多场景分析 附带示例代码和解决方案
前言
之前看了几篇博客写的RocketMQ事务消息使用场景分析,个人感觉分析的业务不太贴切,这里会对多个业务场景进行事务问题分析,本文不过多的探讨RocketMQ事务消息的实现原理,网上有很多资料,这里只对真实的业务场景重点分析,集成代码包会使用rocketmq-spring-boot-starter 2.2.3。
前期准备
-
如果还没有安装RocketMQ服务端可以参考下面两篇博客
- 使用Docker部署RocketMQ
https://blog.csdn.net/weixin_44606481/article/details/129758920 - 使用docker-compose部署RocketMQ
https://blog.csdn.net/weixin_44606481/article/details/129780540
- 使用Docker部署RocketMQ
-
需要查看Spring Boot集成RocketMQ全部种类消息可以参考下面一篇博客
- Spring Boot集成RocketMQ全部种类消息代码实现
https://blog.csdn.net/weixin_44606481/article/details/129804267
- Spring Boot集成RocketMQ全部种类消息代码实现
SpringBoot集成RocketMQ实现事务消息示例代码
POM
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--要使用RocketMQ5.x的自定义时间延时消息必须要使用2.2.3及以上的版本-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.2.5</version>
</dependency>
</dependencies>
添加application.yml配置信息
server:
port: 8888
rocketmq:
# name-server服务地址多个用;隔开 例如127.0.0.1:9876;127.0.0.1:9877
name-server: 192.168.10.220:9876
producer: # 生产者配置
group: group1 # 生产者分组
send-message-timeout: 3000 # 消费者发送消息超时时间单位毫秒
生产者
@Slf4j
@Component
public class SimpleTransationMessageProducer {
@Autowired
RocketMQTemplate rocketMQTemplate;
/** 发送事务消息 */
public void send() {
String msg = "发送事务消息"+RandomUtil.randomInt(100);
// 发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
// 第三个参数作为业务参数传递,后续业务场景分析-以及代码实现中会重点使用到这个参数
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("simple-transation-topic-01", MessageBuilder.withPayload(msg).build(), null);
String transactionId = result.getTransactionId();
String status = result.getSendStatus().name();
log.info("发送消息成功 transactionId={} status={} ",transactionId,status);
}
}
生产者事务消息监听器
发送事务消息必须要有该监听器不然发送时会抛出异常。
@Slf4j
@RocketMQTransactionListener
public class SimpleTransactionMsgListener implements RocketMQLocalTransactionListener {
// 事务消息共有三种状态,提交状态、回滚状态、中间状态:
// RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
// RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
// RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。
/** 执行本地事务(在发送消息成功时执行) */
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
//TODO 开启本地事务(实际就是我们的jdbc操作)
//TODO 执行业务代码(例如插入订单数据库表等)
//TODO 提交或回滚本地事务
//模拟一个处理结果
int index=2;
/**
* 模拟返回事务状态
*/
switch (index){
case 1:
//处理业务
String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
log.info("本地事务回滚,回滚消息,"+jsonStr);
//返回ROLLBACK状态的消息会被丢弃
return RocketMQLocalTransactionState.ROLLBACK;
case 2:
//返回UNKNOW状态的消息会等待Broker进行事务状态回查
log.info("需要等待Broker进行事务状态回查");
return RocketMQLocalTransactionState.UNKNOWN;
default:
log.info("事务提交,消息正常处理");
//返回COMMIT状态的消息会立即被消费者消费到
return RocketMQLocalTransactionState.COMMIT;
}
}
/**
* 检查本地事务的状态
* 回查间隔时间:系统默认每隔60秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。
* 第一次消息回查最快
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transactionId = message.getHeaders().get("__transactionId__").toString();
log.info("检查本地事务状态,transactionId:{}", transactionId);
return RocketMQLocalTransactionState.COMMIT;
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "simple-transation-group-01", topic = "simple-transation-topic-01")
public class SimpleTransationMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("收到消息: msgId={} topic={} queueId={} body={}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), body);
// 业务执行完毕没有抛出异常则由代理类自动提交ACK
}
}
前置说明
在这里我们会使用SpringBoot项目作为基础来衍生各个功能进行说明,在项目中我们使用最多的事务处理方式有两种自动事务@Transactional和手动事务,对于不同的事务方式,发送Rocket事务消息的逻辑也有不同,这里会根据用户签到获取积分业务展开。
事务消息处理流程图
业务场景分析-以及问题解决方案代码实现
一、用户签到时发送MQ消息增加用户积分
场景1(在事务内部发送增加用户积分消息)
在事务内部发送普通消息,这种方式也是我们用的最多的,调用的service方法会加上@Transactional注解整个方法中执行的操作都在一个事务中,我见过大部分的业务都是这样发送的MQ消息,如果不出意外情况这样是没有什么问题,但是出问题了那就是致命的。
问题分析
- 增加积分消息发送成功之后如果事务commit失败,创建签到记录会回滚,但是消息已经成功发出,积分系统会将消息消费。
- 出现原因:数据库服务出现异常,网络问题导致提交超时事物自动回滚、应用服务断电宕机等…
场景2(在事务提交之后发送增加用户积分消息)
问题分析
- 事物提交成功,但是在发送消息时出现异常,没有给用户添加上对应积分。
- 出现原因:RocketMQ服务端出现异常、网络问题导致发送超时、应用服务断电宕机等…
问题解决方案
上面两种场景无非都是要么事物没有提交成功,要么事物提交成功消息没有发送出去,如果我们可以解决这两种问题那系统就可以闭环了,针对这两种场景可以直接使用RocketMQ的事物消息实现。
代码实现方案1
消息对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SignInMsgDTO {
/** 用户编号 */
private String customerNo;
/** 签到日期 */
private String signInDate;
/** 积分 */
private Long point;
}
业务逻辑代码+生产者
这里会使用到函数对象Function作为事务消息的第三个参数传递到全局事务消息监听器中的executeLocalTransaction(Message message, Object o)方法的Object o,在executeLocalTransaction方法中将Object o转换成Function函数调用apply方法传入事务ID,作为业务的回查标记,执行业务本地事务也会在function.apply中执行。
@Slf4j
@Service
public class SignInService {
@Autowired
private SignInCmdExe signInCmdExe;
@Autowired
RocketMQTemplate rocketMQTemplate;
/** 签到 */
public boolean doSignIn(String customerNo){
// 查询当天签到积分 这里模拟当天积分是100
long point=100;
// 构建签到增加积分
SignInMsgDTO signInMsgDTO = new SignInMsgDTO(customerNo,"20230401",point);
// 构建消息体
// 这里有两种方法可以选择
// 1:自己生成事务ID写入消息体中这种方式要注意生成事务ID重复问题
// 2:交由RocketMQ服务端自动生成我们在executeLocalTransaction方法中直接获取即可
// String transactionId = UUID.randomUUID().toString();
// Message<SignInMsgDTO> message = MessageBuilder.withPayload(signInMsgDTO).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build();
Message<SignInMsgDTO> message = MessageBuilder.withPayload(signInMsgDTO).build();
// 构建事务消息第三个参数,这里以Function函数为例 可以解决事务消息id回传问题,也可以自定义一个事务id写入消息体中
Function function = new Function<String,Boolean>(){
@Override
public Boolean apply(String transactionId) {
boolean execute = signInCmdExe.execute(customerNo, transactionId);
return execute;
}
} ;
// 发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("sign-in-transation-topic-01", message,function);
// 事务消息提交状态
String localTransactionState = result.getLocalTransactionState().name();
// 如果消息被回滚代表业务执行异常本地事务提交失败
if (LocalTransactionState.ROLLBACK_MESSAGE.name().equals(localTransactionState)) {
log.info("发送事务消息失败 transactionId={} localTransactionState={} ", result.getTransactionId(), localTransactionState);
return false;
}
log.info("发送事务消息成功 transactionId={} localTransactionState={} ", result.getTransactionId(), localTransactionState);
return true;
}
}
SignInCmdExe
@Slf4j
@Service
public class SignInCmdExe {
/**
* 执行签到逻辑
* 这里使用Spring自动事务管理也可以使用手动事务
*/
@Transactional(rollbackFor = Exception.class)
public boolean execute(String customerNo,String transactionId){
// 判断用户今天是否已经签到 已经签到抛出错误信息交由上一步处理
// status 0:未签到 1:已签到
int status = 0;
if(status == 1){
log.info("用户已签到,不能重复签到 customerNo={}",customerNo);
return false;
}
// 用户还未签到,记录签到信息,以传入的transactionId作为主键或者单独使用一个字段记录该事务id需要添加唯一索引用于消息回查
// 记录这个信息是要保证原则性,这里有一个并发问题,可以通过分布式锁或者数据库用户编号加当天日期设置唯一索引,这里不展开说明。
log.info("用户签到成功 customerNo={} 签到记录transactionId={}",customerNo,transactionId);
return true;
}
}
全局事务消息监听
@Slf4j
@RocketMQTransactionListener
public class SignInTransactionMsgListener implements RocketMQLocalTransactionListener {
// 事务消息共有三种状态,提交状态、回滚状态、中间状态:
// RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
// RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
// RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。
/** 执行本地事务(在发送消息成功时执行)*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
// 1、获取事务ID
String transactionId = null;
try{
transactionId = message.getHeaders().get("__transactionId__").toString();
log.info("执行本地事务 transactionId={}",transactionId);
// 2、判断传入函数对象是否为空,如果为空代表没有要执行的业务直接抛弃消息
if (o == null) {
//返回ROLLBACK状态的消息会被丢弃
log.info("事务消息回滚,没有需要处理的业务 transactionId={}",transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
// 将Object o转换成Function对象
Function<String, Boolean> function = (Function<String, Boolean>) o;
// 执行业务 事务也会在function.apply中执行
Boolean apply = function.apply(transactionId);
if (apply) {
log.info("事务提交,消息正常处理 transactionId={}",transactionId);
//返回COMMIT状态的消息会立即被消费者消费到
return RocketMQLocalTransactionState.COMMIT;
}
log.info("事务消息回滚,业务本地事务执行失败回滚事务消息 transactionId={}",transactionId);
//返回ROLLBACK状态的消息会被丢弃
}catch (Exception e){
log.info("出现异常 返回ROLLBACK transactionId={}",transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
/**
* 检查本地事务的状态
* 回查间隔时间:系统默认每隔60秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。
* 第一次消息回查最快
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transactionId = message.getHeaders().get("__transactionId__").toString();
log.info("检查本地事务状态,transactionId:{}", transactionId);
// 以事务ID为查询条件,查询本地事务执行情况
if (isSuccess(transactionId)) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
// 模拟查询本地事务状态
private boolean isSuccess(String transactionId) {
// 查询数据库 select from 签到记录表
return true;
}
}
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "sign-in-transation-group-01", topic = "sign-in-transation-topic-01")
public class SignInTransationMessageConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("收到用户签到消息: msgId={} topic={} queueId={} body={}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getQueueId(), body);
// 业务执行完毕没有抛出异常则由代理类自动提交ACK
}
}