通用分布式幂等组件
之前一篇文章中提到了幂等性的一些解决办法,但是基本上是单机上的,今天我们来弄一个支持分布式的通用幂等性组件
【以下有不对的地方,一定要指出哦,共同成长。】
一、背景
分布式系统由众多微服务组成,微服务之间必然存在大量的网络调用。比如一个服务间调用异常的例子,用户提交表单之后,请求到A服务,A服务落单之后,开始调用B服务,但是在A调用B的过程中,存在很多不确定性,例如B服务执行超时了,RPC直接返回A请求超时了,然后A返回给用户一些错误提示,但实际情况是B有可能执行是成功的,只是执行时间过长而已。
用户看到错误提示之后,往往会选择在界面上重复点击,导致重复调用,如果B是个支付服务的话,用户重复点击可能导致同一个订单被扣多次钱。不仅仅是用户可能触发重复调用,定时任务、消息投递和机器重新启动都可能会出现重复执行的情况。在分布式系统里,服务调用出现各种异常的情况是很常见的,这些异常情况往往会使得系统间的状态不一致,所以需要容错补偿设计,最常见的方法就是调用方实现合理的重试策略,被调用方实现应对重试的幂等策略。
相信大家都知道,并且也都遇到过类似的问题以及有自己的一套解决方案。
基本上所有业务系统中的幂等都是各自进行处理,也不是说不能统一处理,统一处理的话需要考虑的内容会比较多。
个人认为核心的业务还是业务方自己去处理,比如订单支付,会有个支付记录表,一个订单只能被支付一次,通过支付记录表就可以达到幂等的效果。
而那些非核心的业务,也有幂等的需求。比如网络问题,多次重试。用户点击多次等场景。这种场景下可以用一个通用的幂等框架来处理,会让业务开发更加简单。
二、回顾一下:什么是幂等?
对于幂等,有一个很常见的描述是:对于相同的请求应该返回相同的结果,所以查询类接口是天然的幂等性接口。举个例子:如果有一个查询接口是查询订单的状态,状态是会随着时间发生变化的,那么在两次不同时间的查询请求中,可能返回不一样的订单状态,这个查询接口还是幂等接口吗?
幂等的定义直接决定了我们如何去设计幂等方案,如果幂等的含义是相同请求返回相同结果,那实际上只需要缓存第一次的返回结果,即可在后续重复请求时实现幂等了。但问题真的有这么简单吗?
而我更赞同这种定义:幂等指的是相同请求(identical request)执行一次或者多次所带来的副作用(side-effects)是一样的。
这就主要考虑三个方向:什么是相同的请求?哪些情况会有副作用?该如何避免副作用?
三、设计
- 相同的请求:通过分布式id生成器生成唯一id作为标识同一请求的key。
- 通用存储:通过Redis与Mysql做统一存储,Redis做资源锁组件。
- 使用简单:支持代码与注解两种形式,注入对应的类即可实现幂等,屏蔽加锁,记录判断等逻辑。
- 多级存储:采用Redis作为一级存储,优点是性能高,通过设置一定的失效时间,让 Key 自动失效。Mysql或者Mongo 作为二级存储,适用于时间长或者永久存储的场景。
- 封装Runnable与Supplier,以及自定义处理注解@IdempotentHandler标识为幂等失败的处理函数。
- 并发读写:因为多级存储,必会涉及到并发读写的场景,主要支持两种方式,顺序和并发。顺序就是先写一级存储,再写二级存储,读也是一样。这样的问题在于性能会有点损耗。并发就是多线程同时写入,同时读取,提高性能。
- 幂等执行流程:
四、实现
主要还是以那位大佬的为主,添加自己的特色。
1. 封装Runnable与Supplier
/**
* @program: idempotence
* @description: Runnable包装
* @author: Mr.Liu
* @create: 2020-10-06 21:06
**/
public interface IdempotentRunnable extends Runnable{
/**
* 包装
* @param key 幂等键
* @param ex 异常
*/
default void run(String key, IdempotentException ex){
runTask(key,ex);
}
/**
* 任务体
* @param key 幂等键
* @param ex 异常
*/
public abstract void runTask(String key, IdempotentException ex);
/**
* 实现父类的
* 不用的方法
*/
@Override
@Deprecated
default void run() {
}
}
/**
* @program: idempotence
* @description: Supplier包装
* @author: Mr.Liu
* @create: 2020-10-06 21:12
**/
public interface IdempotentSupplier<T> extends Supplier<T> {
/**
* 具体需要被调用的
* @param key 幂等键
* @param ex 异常
* @return
*/
default T get(String key, IdempotentException ex){
return runTask(key,ex);
}
/**
* 任务体
* @param key 幂等键
* @param ex 异常
* @return
*/
public abstract T runTask(String key, IdempotentException ex);
/**
* 实现父类的
* 不用的方法
* @return
*/
@Override
@Deprecated
default T get() {
return null;
}
}
2. 基于Redisson的Redis锁和MySQL的锁。
接口如下,有两个实现类DistributedLockMysql.class和DistributedLockRedis.class,以Redis为首要锁中间件,MySQL为备用锁组件。但是我MySQL的还没有去实现,主要是使用Redis。
/**
* @program: idempotence
* @description: 分布式锁接口,锁的释放时间一定要考虑好,不然业务处理时间太长了,导致锁被释放了,然后又调用了unLock()方法,就会出现错误
* @author: Mr.Liu
* @create: 2020-10-04 19:42
**/
public interface DistributedLock {
/**
* 加锁
* @param key 锁key
* @param waitTime 尝试加锁,等待时间(ms)
* @param leaseTime 上锁后的失效时间(ms)
* @param success 锁成功执行的逻辑
* @param fail 锁失败执行的逻辑
* @param <T>
* @return
*/
<T> T lock(String key, int waitTime, int leaseTime, IdempotentSupplier<T> success, IdempotentSupplier<T> fail);
/**
* 加锁,加锁失败立即返回
* @param key 锁key
* @param leaseTime 上锁后的失效时间(ms)
* @param success 锁成功执行的逻辑
* @param fail 锁失败执行的逻辑
* @param <T>
* @return
*/
<T> T lock(String key, int leaseTime, IdempotentSupplier<T> success, IdempotentSupplier<T> fail);
/**
* 加锁,加锁失败立即返回
* @param key 锁key
* @param leaseTime 上锁后的失效时间(ms)
* @param timeUnit 时间单位
* @param success 锁成功执行的逻辑
* @param fail 锁失败执行的逻辑
* @param <T>
* @return
*/
<T> T lock(String key, int leaseTime, TimeUnit timeUnit, IdempotentSupplier<T> success, IdempotentSupplier<T> fail);
/**
* 加锁
* @param key 锁key
* @param waitTime 尝试加锁,等待时间(ms)
* @param leaseTime 上锁后的失效时间(ms)
* @param success 锁成功执行的逻辑
* @param fail 锁失败执行的逻辑
*/
void lock(String key, int waitTime, int leaseTime, IdempotentRunnable success, IdempotentRunnable fail);
/**
* 加锁,加锁失败立即返回
* @param key 锁key
* @param leaseTime 上锁后的失效时间(ms)
* @param success 锁成功执行的逻辑
* @param fail 锁失败执行的逻辑
*/
void lock(String key, int leaseTime, IdempotentRunnable success, IdempotentRunnable fail);
/**
* 加锁,加锁失败立即返回
* @param key 锁key
* @param leaseTime 上锁后的失效时间
* @param timeUnit 时间单位
* @param success 锁成功执行的逻辑
* @param fail 锁失败执行的逻辑
*/
void lock(String key, int leaseTime, TimeUnit timeUnit, IdempotentRunnable success, IdempotentRunnable fail);
}
3. 代码与注解使用事例
@RestController
@RequestMapping("/")
public class Test {
@Autowired
private DistributedLock distributedLock;
@RequestMapping(value = "test",method = RequestMethod.GET)
public ResposeData test(@RequestParam("key") String key){
//锁的释放时间一定要考虑好,不然业务处理时间太长了,导致锁被释放了,然后又调用了unLock()方法,就会出现错误
distributedLock.lock(key,300,(k,ex)->{
System.out.println("成功:"+k);
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
},(k,ex)->{
System.out.println("失败"+k);
});
return ResposeData.success("欧克");
}
/**
* 这个key应该用一个分布式id生成器来生成,不能由用户自己提供
* @param key
* @return
*/
@RequestMapping(value = "an",method = RequestMethod.GET)
@Idempotent(lockName = "test",spelKey = "#key", idempotentHandler = "idempotentHandler",readWriteType = ReadWriteTypeEnum.ORDER)
public ResposeData an(@RequestParam("key") String key){
System.out.println("我已经进来了");
return ResposeData.success("success");
}
/**
* 固定参数格式
* @param request 请求
* @param e 异常,需要判断是否为null
*/
@IdempotentHandler
public ResposeData idempotentHandler(IdempotentRequest request, IdempotentException e){
System.out.print(request.getKey() + ": idempotentHandler已经执行过了,");
if (e != null){
System.out.println(e.toString());
}else {
System.out.println();
}
return ResposeData.success("error");
}
}
4. 切面编程
@Around(value = "@annotation(idempotent)")
public Object around(ProceedingJoinPoint joinPoint, Idempotent idempotent) throws Throwable{
Object[] args = joinPoint.getArgs();
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
String key = "";
if (StringUtils.hasText(idempotent.spelKey())){
// 这个key应该用一个分布式id生成器来生成,不能由用户自己提供
key = parseKey(idempotent.spelKey(), method, args);
}else {
key = ContextHolder.getRequestContext().get("globalIdempotentId");
}
String userInputKey = idempotent.lockName();
if (!StringUtils.hasText(userInputKey)){
userInputKey = method.getName();
}
String idempotentKey = userInputKey + ":" + key;
IdempotentRequest request = IdempotentRequest.builder().key(idempotentKey)
.firstLevelExpireTime(idempotent.firstLevelExpireTime())
.secondLevelExpireTime(idempotent.secondLevelExpireTime())
.timeUnit(idempotent.timeUnit())
.lockExpireTime(idempotent.lockExpireTime())
.readWriteType(idempotent.readWriteType())
.build();
if (key == null){
return tis(joinPoint, idempotent, method, request,new IdempotentException("未获取到key"));
}
try {
return distributedIdempotent.execute(request,(k,ex)->{
try {
return joinPoint.proceed();
}catch (Throwable e){
log.error("幂等执行异常");
//throw new IdempotentException(e);
return tis(joinPoint, idempotent, method, request,new IdempotentException(e));
}
},(k,ex)->{
//throw new IdempotentException("重复请求");
log.error("重复请求,执行幂等处理");
return tis(joinPoint, idempotent, method, request,ex);
});
}catch (IdempotentException ex){
return handleIdempotentException(joinPoint, idempotent, ex);
}
}
5. 注解失败逻辑处理
处理逻辑函数限制了参数类型为IdempotentRequest request, IdempotentException e。
/**
* 执行幂等处理函数
* @param joinPoint 切点
* @param idempotent 主解
* @param method 方法
* @param request 请求
* @return
*/
private Object tis(ProceedingJoinPoint joinPoint, Idempotent idempotent, Method method, IdempotentRequest request, IdempotentException ex){
/**获取当前被切的那个类**/
Class targetClass = ReflectionUtils.getDeclaringType(joinPoint.getTarget().getClass(), method.getName(), method.getParameterTypes());
// 获取该类(不包括父类)的public,private, protected, default (package)方法
Method[] methods = targetClass.getDeclaredMethods();
for (Method m : methods){
if (m.getName().equals(idempotent.idempotentHandler())){
// 执行幂等处理
Class<?>[] classes = m.getParameterTypes();
// 参数类型限制为两种:IdempotentRequest request, IdempotentException e
List s = Arrays.asList(classes);
if (s.size() == 2 && s.contains(IdempotentRequest.class) && s.contains(IdempotentException.class)){
log.debug("执行幂等处理函数");
try {
return m.invoke(joinPoint.getTarget(),request,ex);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IdempotentException(e);
}
}
}
}
log.error("未获取到key,并且处理函数格式错误[{},{}]",IdempotentRequest.class, IdempotentException.class);
throw new IdempotentException("未获取到key,并且处理函数格式错误");
}