消息队列避免分布式事物
@Transactional
public void testDistributedTransaction(long goodsid) {
//本地操作
int count = goodsDao.reduceNumber(goodsid);
if (count <= 0) {
throw new RpcException(BizExceptionEnum.DB_UPDATE_RESULT_ERROR.getState(),
BizExceptionEnum.DB_UPDATE_RESULT_ERROR.getMsg());
}
//本地操作
count = orderDao.insertOrder(1000, goodsid, "普通买卖");
if (count != 1) {
throw new RpcException(BizExceptionEnum.DB_UPDATE_RESULT_ERROR.getState(),
BizExceptionEnum.DB_UPDATE_RESULT_ERROR.getMsg());
}
//远程操作通过消息队列
//这里将userService的addScore操作通过消息中心去完成,避开分布式事务。
try {
mqProducer.sendBizMessage(goodsid);
} catch (Exception e) {
//发送消息失败也会回滚事物
throw new RpcException(BizExceptionEnum.INNER_ERROR.getState(),
BizExceptionEnum.INNER_ERROR.getMsg());
}
}
具体的方法如下
//产品Goods的一个Service userService是远程的服务
@Transactional
@Override
public void testDistributedTransaction(long goodsid) {
//这部分的事务可以由本地Spring事务管理到 出错了可以回滚
int count = goodsDao.reduceNumber(goodsid);
if (count <= 0) {
throw new RpcException(BizExceptionEnum.DB_UPDATE_RESULT_ERROR.getState(),
BizExceptionEnum.DB_UPDATE_RESULT_ERROR.getMsg());
}
count = orderDao.insertOrder(1000, goodsid, "普通买卖");
if (count != 1) {
throw new RpcException(BizExceptionEnum.DB_UPDATE_RESULT_ERROR.getState(),
BizExceptionEnum.DB_UPDATE_RESULT_ERROR.getMsg());
}
//这里将userService的addScore操作通过消息中心去完成,避开分布式事务。
try {
mqProducer.sendBizMessage(goodsid);
} catch (Exception e) {
//发送消息失败也会回滚事物
throw new RpcException(BizExceptionEnum.INNER_ERROR.getState(),
BizExceptionEnum.INNER_ERROR.getMsg());
}
}