所谓幂等性,指的是系统A对系统B的接口进行多次调用所产生的结果和调用一次所产生的结果是一致的。举个栗子,比如我们正在开发一个网上下单系统,当用户使用我们的系统下单时,我们就需要对该订单进行发货处理,我们的网上下单系统会调用发货系统进行发货,假如下单系统调用发货系统时网络延迟造成前端没响应,用户又点了几次,这样就会造成重复下单的问题,我就试过,在购买iPhone6S的时候,因为刚刚开卖,系统估计被挤爆了,结果,下了3条订单扣了我3次款,还好,苹果商城后面可以做“后悔”操作,可以改为只购买一台,不然就亏大了……
由此可见,在分布式环境中,操作互斥性问题和幂等性问题非常普遍,在大的电商系统,如京东、淘宝等电商系统,当然有很完善的解决方案(他们也是踩坑过来的),但是很多人接触不到,所以缺少经验,那么,如何解决一般情况下的幂等性问题呢?通常都会想到Zookepper,但是,还有另一种简单的方案可以参考,这里做一个简单的系统,然后一步步改造来说明一下如何解决。
这个简单的系统涉及两个微服务:
- 微服务A: 下单系统
- 微服务B: 发货系统
首先创建两个简单的Spring boot微服务,很简单,自行创建即可。
物流系统很简单,就一个接口:
curl http://localhost:9002/logistic/send/1
package cn.linjk.logisticsservice;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("logistic")
public class ControllerLogisitcs {
// 物流发送调度接口 -1 - 物流调度失败 0 - 物流调度成功
@GetMapping("send/{orderId}")
public String logisticSend(@PathVariable("orderId") Integer orderId) {
try {
log.info("订单" + orderId + "正在进行发货处理......");
Thread.currentThread().sleep(10000); // 休眠10秒,模拟物流调度的接口延时
}
catch (InterruptedException e) {
e.printStackTrace();
log.error("订单" + orderId + "发货失败!");
return "-1";
}
log.info("订单" + orderId + "发货成功!");
return "0";
}
}
接下来就是我们的下单系统:
实体类:
package cn.linjk.orderservice;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "t_order")
public class DomainOrder {
@Id
@Column(name = "id")
private Integer id;
@Column(name = "product_name")
private String productName;
@Column(name = "status")
private Integer status; // -1-删除, 0-已完成, 1-系统正在处理订单, 2-系统已发货
}
控制器如下:
package cn.linjk.orderservice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import javax.transaction.Transactional;
import java.util.Random;
@RestController
@RequestMapping("/order")
public class ControllerOrder {
@Autowired private RestTemplate restTemplate;
@Autowired private IDaoOrder iDaoOrder;
// 下单接口
@GetMapping("take/{product-name}")
@Transactional
public String takeOrder(@PathVariable("product-name") String productName) {
// 1. 系统创建订单
Integer orderId = new Random().nextInt(1000);
DomainOrder domainOrder = DomainOrder.builder()
.id(orderId)
.productName(productName)
.status(1)
.build();
iDaoOrder.save(domainOrder);
// 2. 调用发货接口
String sendResult = restTemplate.getForEntity("http://localhost:9002/logistic/send/" + orderId,
String.class).getBody();
if (sendResult.equals("0")) {
// 3. 更新订单状态
domainOrder.setStatus(2);
iDaoOrder.save(domainOrder);
return "下单发货成功!";
}
return "下单发货失败!";
}
// 查询订单
@GetMapping("query/{orderId}")
public String query(@PathVariable("orderId") Integer orderId) {
return iDaoOrder.findById(orderId).toString();
}
}
注意,因为用到RestTemplate,所以,需要注入bean:
package cn.linjk.orderservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
public class OrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class, args);
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public DataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl("jdbc:mysql://xxxx:xxx/tcc-test?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=round&allowMultiQueries=true");
druidDataSource.setUsername("xxxx");
druidDataSource.setPassword("xxxx");
druidDataSource.setMaxActive(2);
druidDataSource.setMaxWait(3000); // 等待3秒
return druidDataSource;
}
}
测试如下:
数据库结果如下(265数据被我删了,这条重新请求生成的):
注意到,在接口takeOrder中用了事务,这里有一个问题,在这个事务中,调用了物流系统的接口,那里有个10秒的延时,这样,数据库连接就会一直占用着,我们可以把数据库连接最大设为2个试一下(看上面对druiddatasource的bean的配置),2个请求下单,然后第3个请求去查询,这时就会查询失败了:
很明显,无论数据库连接数设置最大多少,都会出现这个问题,当再多一个连接进来时就无法请求到数据库连接资源了,相信这种逻辑的写法,大多数人都会这样写,因为,按照逻辑下来,没毛病啊。这时,对takeorder方法进行改造一下,以免长时间占用原本的数据库连接(经过1个多小时的试验,使用JPA的方法,即使使用编程式事务也还是占用数据库连接,后来改用mybatis就可以了!!具体缺少哪里的配置我还不明白,项目改造如下):
移除jpa依赖,添加mybatis依赖:
以及mybatis的生成插件:
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.2</version>
<configuration>
<configurationFile>${basedir}/src/main/resources/generator/generatorConfig.xml</configurationFile>
<overwrite>true</overwrite>
<verbose>true</verbose>
</configuration>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper</artifactId>
<version>3.3.9</version>
</dependency>
</dependencies>
</plugin>
还要在pom.xml加属性:
<targetJavaProject>${basedir}/src/main/java</targetJavaProject>
<targetResourcesProject>${basedir}/src/main/resources</targetResourcesProject>
然后增加mybatis的生成配置generatorConfig.xml:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE generatorConfiguration
PUBLIC "-//mybatis.org//DTD MyBatis Generator Configuration 1.0//EN"
"http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd">
<generatorConfiguration>
<properties resource="generator/config.properties"/>
<context id="Mysql" targetRuntime="MyBatis3Simple" defaultModelType="flat">
<property name="beginningDelimiter" value="`"/>
<property name="endingDelimiter" value="`"/>
<plugin type="${mapper.plugin}">
<property name="mappers" value="${mapper.Mapper}"/>
</plugin>
<jdbcConnection driverClass="${jdbc.driverClass}"
connectionURL="${jdbc.url}"
userId="${jdbc.user}"
password="${jdbc.password}">
</jdbcConnection>
<javaModelGenerator targetPackage="${package.name}.domain.${module.name}.entity"
targetProject="${targetJavaProject}"/>
<sqlMapGenerator targetPackage="${package.name}.persistence.${module.name}"
targetProject="${targetResourcesProject}"/>
<javaClientGenerator targetPackage="${package.name}.persistence.${module.name}"
targetProject="${targetJavaProject}" type="XMLMAPPER"/>
<table tableName="${table.name}" domainObjectName="${domain.object.name}">
<generatedKey column="id" sqlStatement="Mysql" identity="true"/>
</table>
</context>
</generatorConfiguration>
以及config.properties:
jdbc.driverClass=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://xx.xx.xx.xx:xxx/tcc-test?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=round&allowMultiQueries=true
jdbc.user=xxx
jdbc.password=xxxxxx
#c3p0
jdbc.maxPoolSize=50
jdbc.minPoolSize=10
jdbc.maxStatements=100
jdbc.testConnection=true
mapper.plugin=tk.mybatis.mapper.generator.MapperPlugin
mapper.Mapper=tk.mybatis.mapper.common.Mapper
package.name=cn.linjk.orderservice
module.name=order
table.name=t_order
domain.object.name=Order
项目目录如下:
然后,点击maven插件生成数据库对象和持久层类:
生成如下:
接着,用编程式事务改造原来的下单方法,这里把它抽取到一个service类中:
package cn.linjk.orderservice;
import cn.linjk.orderservice.domain.order.entity.Order;
import cn.linjk.orderservice.persistence.order.OrderMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.client.RestTemplate;
import java.util.Random;
@Service
public class ServiceOrder {
@Autowired private RestTemplate restTemplate;
@Autowired private OrderMapper iDaoOrder;
@Autowired private TransactionTemplate transactionTemplate;
public String takeOrder(String productName) {
Integer orderId = new Random().nextInt(1000);
Order domainOrder = Order.builder()
.id(orderId).productName(productName).status(1)
.build();
// 1. 系统创建订单
transactionTemplate.execute(new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus transactionStatus) {
iDaoOrder.insert(domainOrder);
return null;
}
});
// 2. 调用发货接口, 注意,此时,不占用数据库连接资源了!!!
String sendResult = restTemplate.getForEntity("http://localhost:9002/logistic/send/" + orderId,
String.class).getBody();
transactionTemplate.execute(new TransactionCallback<Object>() {
@Override
public String doInTransaction(TransactionStatus transactionStatus) {
if (sendResult.equals("0")) {
// 3. 更新订单状态
domainOrder.setStatus(2);
iDaoOrder.updateByPrimaryKeySelective(domainOrder);
}
return null;
}
});
if (sendResult.equals("0")) {
return "下单发货成功!";
}
// 发货失败
return "系统已接收订单,正在处理中......";
}
}
可以看到,在设计数据库操作时,就在编程式事务中处理,长时间的接口调用就在外部处理,这样就不会占用数据库连接了。
启动类改造如下:
package cn.linjk.orderservice;
import com.alibaba.druid.pool.DruidDataSource;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.web.client.RestTemplate;
import javax.sql.DataSource;
@SpringBootApplication
@EnableTransactionManagement
@MapperScan("cn.linjk.orderservice")
public class OrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class, args);
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public DataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl("jdbc:mysql://xx.xx.xx.xx:xxx/tcc-test?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=round&allowMultiQueries=true");
druidDataSource.setUsername("xx");
druidDataSource.setPassword("xxxxx");
druidDataSource.setMaxActive(2);
druidDataSource.setMinIdle(2);
druidDataSource.setMaxWait(3000); // 等待3秒
druidDataSource.setTestWhileIdle(true);
druidDataSource.setTestOnBorrow(false);
druidDataSource.setTestOnReturn(false);
druidDataSource.setTimeBetweenEvictionRunsMillis(1000); // 1秒检测关闭空闲连接
druidDataSource.setPoolPreparedStatements(true);
druidDataSource.setValidationQuery("SELECT 1 FROM DUAL");
druidDataSource.setDriverClassName("com.mysql.jdbc.Driver");
druidDataSource.setInitialSize(2);
return druidDataSource;
}
}
再来发送下单请求试下:
可以发现,下单请求接口还没返回,但是查询接口照样可以查询的!!注意,在datasource那里我配置了最大连接数是2个的,使用了编程式事务后,第三个查询请求进来时,可以复用空闲的数据库连接,这样就提高了并发数量了!
好,问题解决,到幂等性等问题了。
// 下单接口
@GetMapping("take/{product-name}")
public String takeOrder(@PathVariable("product-name") String productName) {
// 模拟用户的5次请求:
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
serviceOrder.takeOrder(productName);
}
});
thread.start();
}
return "下单完成";
}
这里模拟用户的前端的5次连续点击,看看实际会下了多少条订单:
这就不对了,我原本只想买1台手机,结果,因为网络响应慢或前端没处理好,结果买了5台……
现在就进行改造如下(其实这样就用到了乐观锁,悲观锁类似 select * for update):
package cn.linjk.orderservice;
import cn.linjk.orderservice.domain.order.entity.Order;
import cn.linjk.orderservice.persistence.order.OrderMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.web.client.RestTemplate;
import java.util.Random;
@Service
public class ServiceOrder {
@Autowired private RestTemplate restTemplate;
@Autowired private OrderMapper iDaoOrder;
@Autowired private TransactionTemplate transactionTemplate;
public String takeOrder(String productName) {
Integer orderId = new Random().nextInt(1000);
// 1. 系统创建订单
Boolean noOrder = false;
synchronized (this) {
noOrder = transactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus transactionStatus) {
// 不存在同样产品,这里不严谨,还需要加下单人和时间判断,这里简单点,就不加这些了
if (iDaoOrder.select(Order.builder().productName(productName).build()).size() == 0) {
Order domainOrder = Order.builder()
.id(orderId).productName(productName).status(1)
.build();
iDaoOrder.insert(domainOrder);
return true;
}
return false;
}
});
}
// 2. 调用发货接口, 注意,此时,不占用数据库连接资源了!!!
if (noOrder) {
String sendResult = restTemplate.getForEntity("http://localhost:9002/logistic/send/" + orderId,
String.class).getBody();
transactionTemplate.execute(new TransactionCallback<Object>() {
@Override
public String doInTransaction(TransactionStatus transactionStatus) {
if (sendResult.equals("0")) {
// 3. 更新订单状态
Order domainOrder = Order.builder()
.id(orderId).productName(productName).status(2)
.build();
iDaoOrder.updateByPrimaryKeySelective(domainOrder);
}
return null;
}
});
}
return "同一时间内已存在该订单";
}
}
5个线程请求,这样发货记录只有一条:
扩展:
在重要的表,如订单表等,可以加一个字段version,初始化为0,用作版本控制,比如上面5个线程,第一个进来把version为0的设为1,后面的线程再进行类似操作就无效了,这样也达到类似效果:
还有其他方案的,只要达到同一目的即可。