一、项目结构:
1、 父工程
pom.xml:(父工程只有一个pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lucifer</groupId>
<artifactId>rocketmq-transaction</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>order-service</module>
<module>storage-service</module>
<module>base-framework-mysql-support</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、base-framework-mysql-support:数据库相关的配置
2.1 MybatisPlusConfig :mybatis-plus的相关配置
package com.lucifer.config;
import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author lucifer
* @date 2020/4/14 21:54
* @description mybatis-plus 配置
*/
@Configuration
@MapperScan(value = {"com.lucifer.mapper"})
public class MybatisPlusConfig {
/**
* SQL执行效率插件
*/
@Bean
// @Profile({"dev", "test"})// 设置 dev test 环境开启
public PerformanceInterceptor performanceInterceptor() {
return new PerformanceInterceptor();
}
}
2.2 RedissonConfig: redisson相关配置
package com.lucifer.config;
import org.apache.commons.lang3.StringUtils;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SentinelServersConfig;
import org.redisson.config.SingleServerConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class RedissonConfig {
@Resource
private RedissonProperties redissonProperties;
/**
* 单机模式自动装配
* @return
*/
@Bean
@ConditionalOnProperty(prefix ="redisson",name="single-is",havingValue="true")
public RedissonClient getSingleRedisson(){
Config config = new Config();
String singlePassword = redissonProperties.getSinglePassword();
SingleServerConfig serverConfig = config.useSingleServer().setAddress("redis://" + redissonProperties.getSingleAddress());
System.out.println("redis:=================="+serverConfig.getAddress());
if(StringUtils.isNotBlank(singlePassword)){
serverConfig.setPassword(singlePassword);
}
return Redisson.create(config);
}
/**
* 哨兵模式自动装配
* @return
*/
@Bean
@ConditionalOnProperty(prefix ="redisson",name="sentinel-is",havingValue="true")
public RedissonClient getSentinelRedisson(){
Config config = new Config();
SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress(redissonProperties.getSentinelAddresses()).setMasterName(redissonProperties.getSentinelMasterName());
String sentinelPassword = redissonProperties.getSentinelPassword();
if(StringUtils.isNotBlank(sentinelPassword)) {
serverConfig.setPassword(sentinelPassword);
}
return Redisson.create(config);
}
}
2.3 RedissonProperties:读取application.yml的自定义配置
package com.lucifer.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author lucifer
* @date 2020/4/21 14:09
* @description TODO
*/
@Data
@Component
@ConfigurationProperties(prefix = "redisson")
public class RedissonProperties {
private String singleAddress;
private String singlePassword;
private String sentinelMasterName;
private String sentinelAddresses;
private String sentinelPassword;
}
2.4 pom.xml: jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-transaction</artifactId>
<groupId>com.lucifer</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>base-framework-mysql-support</artifactId>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.39</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
3、order-service 订单微服务
3.1 OrderController:控制层 用于测试
package com.lucifer.controller;
import com.lucifer.pojo.Order;
import com.lucifer.service.OrderService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.UUID;
/**
* @author lucifer
* @date 2020/4/14 19:32
* @description TODO
*/
@RestController
@RequestMapping(value = "order")
public class OrderController {
@Resource
OrderService orderService;
/**
* 下单:插入订单表、扣减库存,模拟回滚
*
* @return
*/
@GetMapping("/placeOrder/commit")
public Boolean placeOrderCommit() {
//将uuid作为事务id,发送到mq
String uuid = UUID.randomUUID().toString();
Order order = new Order();
order.setCommodityCode("product-1");
order.setUserId("1");
order.setCount(1);
order.setTxNum(uuid);
order.setMoney(new BigDecimal(12.5));
System.out.println("准备下单了=======》" + order);
orderService.sendOrder(order);
return true;
}
}
3.2 service 接口:
package com.lucifer.service;
import com.lucifer.pojo.Order;
public interface OrderService {
/**
* 发送订单消息
*
* @param order
*/
void sendOrder(Order order);
/**
* 新增订单
*
* @param order
*/
void insertOrder(Order order) throws Exception;
}
3.2.1 service 实现类:
package com.lucifer.service.impl;
import com.alibaba.fastjson.JSON;
import com.lucifer.mapper.OrderMapper;
import com.lucifer.mapper.TxLogMapper;
import com.lucifer.pojo.Order;
import com.lucifer.pojo.Storage;
import com.lucifer.pojo.TxLog;
import com.lucifer.service.OrderService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.redisson.api.RBucket;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* @author lucifer
* @date 2020/4/14 19:31
* @description
*/
@Service
public class OrderServiceImpl implements OrderService {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource
private OrderMapper orderMapper;
@Resource
private TxLogMapper txLogMapper;
@Resource
private RedissonClient redissonClient;
@Override
public void sendOrder(Order order) {
String str = JSON.toJSONString(order);
Message<String> message = MessageBuilder.withPayload(str).build();
/**
* 发送一条事务消息
* String txProducerGroup: 生产组
* String destination:topic
* Message<?> message: 消息内容
* Object arg: 参数
*/
rocketMQTemplate.sendMessageInTransaction("producer_group_tx1", "topic_tx", message, null);
}
@Transactional(rollbackFor = Exception.class)
@Override
public void insertOrder(Order order) throws Exception {
//创建锁对象
RLock lock = redissonClient.getLock("placeOrder:" + order.getUserId() + order.getCommodityCode());
try {
//尝试去获取锁
RFuture<Boolean> booleanRFuture = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);
Boolean aBoolean = booleanRFuture.get();
//如果获取到了锁
if (aBoolean) {
//获取redis中的库存
RBucket<Storage> storageBucket = redissonClient.getBucket("placeOrder");
Storage storage = storageBucket.get();
System.out.println("剩余库存:=================" + storage.getCount());
if (storage.getCount() <= 0) {
throw new RuntimeException("商品:" + order.getCommodityCode() + ",库存为空");
}
//用事务id幂等处理
if (txLogMapper.selectById(order.getTxNum()) != null) {
return;
}
orderMapper.insert(order);
//插入事务日志
TxLog txLog = new TxLog();
txLog.setTxNum(order.getTxNum());
System.out.println("order.getTxNum():" + order.getTxNum());
txLog.setCreateTime(new Date());
txLogMapper.insert(txLog);
}
} finally {
//释放锁
lock.unlock();
}
}
}
3.3. mapper接口:
package com.lucifer.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.TxLog;
public interface TxLogMapper extends BaseMapper<TxLog> {
}
package com.lucifer.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.Order;
public interface OrderMapper extends BaseMapper<Order> {
}
3.4. 实体类:
package com.lucifer.pojo;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
/**
* 订单表
*/
@Data
@NoArgsConstructor
@TableName("order_tbl")
public class Order {
@TableId(type = IdType.AUTO)
private Integer id;
private String userId;
private String commodityCode;
private Integer count;
private BigDecimal money;
@TableField(exist = false)
private String txNum;
}
package com.lucifer.pojo;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* 库存表
*/
@Data
@Accessors(chain = true)
@TableName("storage_tbl")
public class Storage implements Serializable {
private Long id;
private String commodityCode;
private Long count;
}
import java.util.Date;
/**
* @author lucifer
* @date 2020/4/15 13:04
* @description 事务日志表
*/
@Data
//@Builder
@NoArgsConstructor
//@Accessors(chain = true)
@TableName("tx_log")
public class TxLog {
@TableId
private String txNum;
private Date createTime;
}
3.5、ProducerTransactionListener 是去实现 RocketMQLocalTransactionListener接口(重要)
package com.lucifer.transaction;
import com.alibaba.fastjson.JSON;
import com.lucifer.mapper.TxLogMapper;
import com.lucifer.pojo.Order;
import com.lucifer.pojo.Storage;
import com.lucifer.pojo.TxLog;
import com.lucifer.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
/**
* @author lucifer
* @date 2020/4/15 0:59
* @description TODO
*/
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "producer_group_tx1")
public class ProducerTransactionListener implements RocketMQLocalTransactionListener {
@Resource
private OrderService orderService;
@Resource
private TxLogMapper txLogMapper;
@Resource
private RedissonClient redissonClient;
/**
* 事务消息发送mq成功后的回调方法
*
* @param msg
* @param arg
* @return 返回事务状态
* RocketMQLocalTransactionState.COMMIT:提交事务,提交后broker才允许消费者使用
* RocketMQLocalTransactionState.ROLLBACK:回滚事务,回滚后消息将被删除,并且不允许别消费
* RocketMQLocalTransactionState.UNKNOWN:中间状态,表示MQ需要核对,以确定状态
*/
@Transactional(rollbackFor = Exception.class)
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String str = new String((byte[]) msg.getPayload());
Order order = JSON.parseObject(str, Order.class);
//下单
orderService.insertOrder(order);
//扣减redis中库存
RBucket<Storage> storageBucket = redissonClient.getBucket("placeOrder");
Storage storage = storageBucket.get();
long count = storage.getCount() - order.getCount();
storage.setCount(count);
storageBucket.set(storage);
//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit,mq将消息的状态改为可消费
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务状态回查,查询是否下单成功
*
* @param msg
* @return 返回事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String str = new String((byte[]) msg.getPayload());
Order order = JSON.parseObject(str, Order.class);
//事务id
String txNo = order.getTxNum();
TxLog txLog = txLogMapper.selectById(txNo);
if (txLog != null) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
3.6. 启动类
package com.lucifer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* @author lucifer
* @date 2020/4/14 19:28
* @description TODO
*/
@EnableDiscoveryClient
@SpringBootApplication
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
3.7.application.yml:
server:
port: 8081
spring:
application:
name: order-service
datasource:
druid:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.160.131:3306/order?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&serverTimezone=UTC
username: root
password: 123456
cloud:
nacos:
discovery:
server-addr: 192.168.160.131:8848
# main:
# allow-bean-definition-overriding: true
logging:
level:
com.lucifer.mapper: debug
rocketmq:
producer:
group: producter_tx
name-server: 192.168.160.131:9876
redisson:
single-is: true
single-address: 192.168.160.131:6379
single-password:
sentinel-is: false
sentinel-master-name: business-master
sentinel-addresses: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381
sentinel-password:
3.8. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>rocketmq-transaction</artifactId>
<groupId>com.lucifer</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>order-service</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.lucifer</groupId>
<artifactId>base-framework-mysql-support</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
<!-- nacos -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>0.2.2.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4.storage-service 库存微服务
4.1 项目启动就会将商品 "product-1"的库存信息查询出来,放到redis当中,这里用于测试
package com.lucifer.config;
import com.lucifer.pojo.Storage;
import com.lucifer.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class StorageInitApplicationRunner implements ApplicationRunner {
@Resource
private RedissonClient redissonClient;
@Resource
private StorageService storageService;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
//从数据库查询抢购秒杀商品信息
Storage storage = storageService.getStorage("product-1");
//获取redis中key为storage对象信息
RBucket<Storage> storageBucket = redissonClient.getBucket("placeOrder");
//如果key存在,就设置key的值为新值value
//如果key不存在,就设置key的值为value
storageBucket.set(storage);
log.info("商品数据初始化完成!");
}
}
4.2 service接口
package com.lucifer.service;
import com.lucifer.pojo.Storage;
import java.util.concurrent.ExecutionException;
public interface StorageService {
/**
* 扣减库存
*
* @param commodityCode
* @param count
* @param txNum 事务id
*/
void deduct(String commodityCode, int count, String txNum) throws ExecutionException, InterruptedException, Exception;
/**
* 获取商品信息
*
* @param commodityCode
* @return
*/
Storage getStorage(String commodityCode);
}
4.2.1 实现类
package com.lucifer.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.lucifer.mapper.StorageMapper;
import com.lucifer.mapper.TxLogMapper;
import com.lucifer.pojo.Storage;
import com.lucifer.pojo.TxLog;
import com.lucifer.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author lucifer
* @date 2020/4/14 20:07
* @description TODO
*/
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
@Resource
private StorageMapper storageMapper;
@Resource
private TxLogMapper txLogMapper;
@Resource
private RedissonClient redissonClient;
@Transactional(rollbackFor = Exception.class)
@Override
public void deduct(String commodityCode, int count, String txNum) throws Exception {
RLock lock = redissonClient.getLock("placeOrder:" + commodityCode);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
try {
if (res.get()) {
log.info("扣减库存,商品编码:{},数量:{}", commodityCode, count);
TxLog txLog = txLogMapper.selectById(txNum);
if (txLog != null) {
return;
}
//扣减库存
QueryWrapper<Storage> wrapper = new QueryWrapper<>();
wrapper.setEntity(new Storage().setCommodityCode(commodityCode));
Storage storage = storageMapper.selectOne(wrapper);
if (storage == null) {
throw new RuntimeException("商品" + commodityCode + ",不存在");
}
//扣减MySQL中的库存
storage.setCount(storage.getCount() - count);
System.out.println("剩余库存数量:" + storage.getCount());
storageMapper.updateById(storage);
//添加事务记录,用于幂等
TxLog tLog = new TxLog();
tLog.setTxNum(txNum);
tLog.setCreateTime(new Date());
txLogMapper.insert(tLog);
}
} finally {
lock.unlock();
}
}
@Override
public Storage getStorage(String commodityCode) {
QueryWrapper<Storage> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("commodity_code", commodityCode);
Storage storage = storageMapper.selectOne(queryWrapper);
System.out.println("剩余库存数:" + storage.getCount());
return storage;
}
}
4.3 mapper接口
package com.lucifer.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.Storage;
public interface StorageMapper extends BaseMapper<Storage> {
}
package com.lucifer.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.lucifer.pojo.TxLog;
public interface TxLogMapper extends BaseMapper<TxLog> {
}
4.4 实体类
Order、Storage、TxLog 三个实体类同order-service中;
4.5 ConsumerTransactionListener 实现RocketMQListener接口,消费消息
package com.lucifer.transaction;
import com.alibaba.fastjson.JSON;
import com.lucifer.pojo.Order;
import com.lucifer.service.StorageService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author lucifer
* @date 2020/4/15 0:59
* @description TODO
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "consumer_group_tx2", topic = "topic_tx")
class ConsumerTransactionListener implements RocketMQListener<String> {
@Resource
private StorageService storageService;
@Override
public void onMessage(String message) {
log.info("开始消费消息:{}", message);
//解析消息
Order order = JSON.parseObject(message, Order.class);
if(order!=null){
//扣减库存
try {
storageService.deduct(order.getCommodityCode(), order.getCount(), order.getTxNum());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("扣减库存失败");
}
}
}
}
4.6启动类
package com.lucifer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
/**
* @author lucifer
* @date 2020/4/14 20:23
* @description 库存服务
*/
@EnableConfigurationProperties
@EnableDiscoveryClient
@SpringBootApplication
public class StorageApplication {
public static void main(String[] args) {
SpringApplication.run(StorageApplication.class, args);
}
}
4.7application.yml:
server:
port: 8082
spring:
application:
name: storage-service
datasource:
druid:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.160.131:3306/storage?autoReconnect=true&useUnicode=true&createDatabaseIfNotExist=true&characterEncoding=utf8&serverTimezone=UTC
username: root
password: 123456
cloud:
nacos:
discovery:
server-addr: 192.168.160.131:8848
# main:
# allow-bean-definition-overriding: true
logging:
level:
com.lucifer.mapper: debug
rocketmq:
producer:
group: consumer_tx
name-server: 192.168.160.131:9876
redisson:
single-is: true
single-address: 192.168.160.131:6379
single-password:
sentinel-is: false
sentinel-master-name: business-master
sentinel-addresses: 127.0.0.1:26379,127.0.0.1:26380,127.0.0.1:26381
sentinel-password:
4.8 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>rocketmq-transaction</artifactId>
<groupId>com.lucifer</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>storage-service</artifactId>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.lucifer</groupId>
<artifactId>base-framework-mysql-support</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
<!-- nacos -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>0.2.2.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>fastjson</artifactId>
<groupId>com.alibaba</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
二、测试
(1)场景一:正常情况:使用jmter测试:用20个线程去模拟20个用户同时去购买某个商品:
目前库存只有10个:
查看控制台: (当10个订单下单成功后,第11个订单。。。。)
准备下单了=======》Order(id=null, userId=1, commodityCode=product-1, count=1, money=12.5, txNum=7a0702f8-8814-4d6a-a492-04ccd1bda31b)
剩余库存:=================0
java.lang.RuntimeException: 商品:product-1,库存为空
查看数据库:只有10个订单下单成功,并且库存为0,并没有成为负数
(2)场景2:模拟异常的发生,在order-service中:
删除redis数据、清空数据库刚产生的数据,将库存数调回10,重启库存微服务;
模拟20个用户去访问,order-service 微服务控制台:
剩余库存:=================6
2020-04-21 19:20:59.502 DEBUG 22056 --- [nio-8081-exec-1] c.lucifer.mapper.TxLogMapper.selectById : ==> Preparing: SELECT tx_num,create_time FROM tx_log WHERE tx_num=?
2020-04-21 19:20:59.509 DEBUG 22056 --- [nio-8081-exec-1] c.lucifer.mapper.TxLogMapper.selectById : ==> Parameters: 4dadbd02-735f-43f3-b10e-5863c833406c(String)
2020-04-21 19:20:59.520 DEBUG 22056 --- [nio-8081-exec-1] c.lucifer.mapper.TxLogMapper.selectById : <== Total: 0
Time:17 ms - ID:com.lucifer.mapper.TxLogMapper.selectById
Execute SQL:SELECT tx_num,create_time FROM tx_log WHERE tx_num='4dadbd02-735f-43f3-b10e-5863c833406c'
java.lang.RuntimeException: 人为异常
当订单下了4单,剩余库存为6时,人为异常抛出,此时数据库:
(2)场景3:模拟异常的发生,在storage-service中:
库存微服务 控制台:一直打印,去扣减MySQL的库存,不过因为此处人为制造异常,库存只要为5,就会去抛异常
java.lang.RuntimeException: 扣减库存失败
at com.lucifer.transaction.ConsumerTransactionListener.onMessage(ConsumerTransactionListener.java:37) ~[classes/:na]
at com.lucifer.transaction.ConsumerTransactionListener.onMessage(ConsumerTransactionListener.java:18) ~[classes/:na]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:308) ~[rocketmq-spring-boot-2.0.2.jar:2.0.2]
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) [rocketmq-client-4.4.0.jar:4.4.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_192]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_192]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_192]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_192]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_192]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
2020-04-21 19:31:31.243 DEBUG 15160 --- [essageThread_12] c.lucifer.mapper.TxLogMapper.selectById : ==> Parameters: f60a6c44-292b-47b7-8bfd-6aedaeb26a8e(String)
java.lang.RuntimeException: 人为异常
此时,order数据库中有10个订单,而库存数据库中库存为6,有6单是扣减失败的,所以此时库存微服务会不停的去尝试扣减库存(尝试次数好像是16次),一般情况下,不会让rocketmq重试那么多次,重试几次差不多了,还是无法扣减只能人工扣减了。
此时将IDEA中的人为异常注释掉,重新编译,会发现,库存微服务扣减成功了,然后数据库中订单数为10,库存数扣减为0 了。