「这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战」
分布式锁的简介
锁 是一种用来解决多个执行线程 访问共享资源 错误或数据不一致问题的工具。
锁的本质:同一时间只允许一个用户操作共享数据。
为什么需要分布式锁
一般情况下,我们使用分布式锁主要有两个场景:
- 避免不同节点重复相同的工作:比如用户执行了某个操作需要输入验证码不同节点没有相互通信会发送多条验证码;
- 避免破坏数据的正确性:如果两个节点在同一条数据上同时进行操作,可能会造成数据错误或不一致的情况出现;
Java 中实现分布式锁的常见方式
-
基于 MySQL 中的锁:
MySQL
本身有自带的悲观锁for update
关键字,也可以自己实现悲观/乐观锁来达到目的; -
基于 Zookeeper 有序节点:
Zookeeper
允许临时创建有序的子节点,这样客户端获取节点列表时,就能够根据当前子节点列表中的序号来判断是否能够获得锁; -
基于 Redis 的单线程:由于
Redis
是单线程,所以命令会以串行的方式执行,并且本身提供了像SETNX(set if not exists)
这样的指令,并且还扩展了SET
命令;
每个方案都有各自的优缺点,例如
MySQL
虽然直观理解容易,但是实现起来却需要额外考虑 锁超时、加事务 等问题,并且性能局限于数据库;这边我们以Redis
为主进行分析。
分布式锁应该具备的特性
- 原子性
- 互斥性
- 独占性:自己的锁只能自己解开
- 可重入性
- 超时与续期
分布式锁特性场景解析
-
超时场景说明:
假如有两个服务 A 和 B,其中服务 A 在 获取锁之后 由于不可抗力因素宕机了(例如:机房停电),因为锁被服务 A 持有,就会导致 B 服务就永远无法获取到锁了,这样显然是不合理的,所以我们需要额外设置一个超时时间,来保证避免这种情况的发生。
-
独占性场景的说明
延续上面的场景,我们在考虑这种场景,如果在加锁和释放锁之间的逻辑比较复杂,执行时间较长,以至于超出了锁的超时限制,也会出现问题。这时候线程 A 持有锁过期了,而临界区的逻辑还没有执行完,因为锁过期了,所以
Redis
会自动将这个锁对应key
给删除掉;这个时候线程 B 就可以获得这个分布式锁,当线程 B 刚获取到自己的锁,原本超时的 A 执行到释放自己锁的代码,A 的锁其实已经过期了现在的key
是 B 的锁,A 现在就会把 B 的锁给释放掉,其实 B 才刚刚获取到锁还没有执行自己的逻辑,所以锁应该有独占性,自己的锁应该只能自己解开。实现方式:将锁的
value
值设置为一个随机串,释放锁时先匹配随机串是否一致,然后再删除key
。 -
续期场景说明
为了避免线程没有处理完自己业务就过期的问题,加锁时,先设置一个过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源处理业务逻辑还没有完成,那么就自动对锁进行 续期操作,重新设置过期时间。
Redisson
使用的就是这种机制,它将其成为看门狗。 -
原子性方面
匹配
value
和删除key
在Redis
中并不是一个原子性的操作,也没有类似保证原子性的指令,所以可能需要使用像Lua
这样的脚本来处理了,因为Lua
脚本可以 保证多个指令的原子性执行。 -
互斥性方面
一个
key
被一个实例获取之后,其他的实例就不能再次获取了。 -
可重入性方面
同一个线程方法 A 调用方法 B ,A B 都需要获得锁,A 获得锁之后,执行自己逻辑调用 B ,如果不是重入锁的话,就会发生死锁。
在
Java
编程中synchronized
和ReentrantLock
都是可重入锁
使用 Redis 实现分布式锁
Redis 相关命令解析
SETEX
:
语法:SETEX KEY_NAME TIMEOUT VALUE
版本:redis 版本 >= 2.0.0
作用:setex 命令为指定的 key 设置值及其过期时间。如果 key 已经存在, SETEX 命令将会替换旧的值。
复制代码
SETNX
: (SET if Not eXists
)
语法:SETNX KEY_NAME VALUE
版本:redis 版本 >= 2.0.0
作用:Setnx 命令在指定的 key 不存在时,为 key 设置指定的值。
返回值:设置成功,返回 1 ;设置失败,返回 0 。
复制代码
我们注意到 setnx
是可以满足我们没有值时候设置成功,有值的时候设置失败的需求的,但是了解完上面章节中介绍的分布式锁应该考虑的问题这边应该需要设置一个超时时间,所以在 Redis 2.6.12
之后扩展了 set
命令:
SET
:
# 一条命令保证原子性执行 在 setnx 的基础上 设置超时时间
set key value [EX seconds|PX milliseconds] [NX|XX]
- [EX seconds] 设置过期时间单位为 秒
- [PX milliseconds] 设置过期时间单位为 毫秒
- [NX] key 不存在时设置value, 成功返回OK,失败返回 (nil)
- [XX] key 存在时设置value, 成功返回OK,失败返回 (nil)
set key value EX 过期时间 NX
127.0.0.1:6379> SET lock 1 EX 10 NX
OK
复制代码
自定义 Redis 分布式锁
上文中提到 SETNX
是原子性操作,但是没有办法同时完成 EXPIRE
操作,不能保证 SETNX
和 EXPIRE
的原子性。这个可以使用 SET
命令来实现并且保证原子性。
package com.aha.train.test.lock.distributed;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 使用 redis 自定义分布式锁
*
* @author WT
* @date 2021/10/25
*/
@Slf4j
@Service
public class MyRedisLock {
// 释放锁 执行的 LUA 脚本
public static final String RELEASE_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";
//默认的 lock key 的过期时间 10s
private static final int DEFAULT_LOCK_EXPIRE_TIME_MILLIS = 100 * 1000;
//节点客户端
private RedisTemplate<Object,Object> redisTemplate;
public MyRedisLock(RedisTemplate<Object,Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 尝试加锁
*
* @param acquireTimeout 尝试加锁等待时间
* @return 是否加锁成功
* @throws InterruptedException 线程中断异常
*/
public String acquire(Integer acquireTimeout, String lockKey) throws InterruptedException {
if (acquireTimeout <= 0) {
throw new IllegalArgumentException("加锁的超时时间必须大于0");
}
try {
// 获取锁的超时时间,超过这个时间则放弃获取锁
long end = System.currentTimeMillis() + acquireTimeout;
// 随机生成一个 value
String value = UUID.randomUUID().toString();
while (System.currentTimeMillis() < end) {
// 加锁并设置超时时间 redis 底层使用的应该是 SET 命令 setNX 是没有办法保证 set 和 expire 同时的原子性的
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, value , DEFAULT_LOCK_EXPIRE_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (result != null && result) {
log.info("加锁成功:{}",value);
return value;
}
// 延时 100ms 继续尝试加锁,直到到达加锁的超时时间
Thread.sleep(100);
log.info("获取锁失败,再次尝试获取锁");
}
} catch (Exception e) {
log.error("acquire lock due to error", e);
}
return null;
}
/**
* 主动释放锁
* @param lockValue 主动释放锁的 value
* @return 是否成功释放锁
*/
public boolean release(String lockKey, String lockValue) {
// 这边得使用 long 类型 来接收 LUA 脚本执行的数据 因为 redis 中的 int 类型对应 java 中的 long 类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LUA_SCRIPT, Long.class);
/*
* 第一个参数是 执行的脚本内容,第二个参数是 lua 脚本中 key 的集合,第三个参数是 lua 脚本中 args 的集合
* result = jedis.eval(script, Collections.singletonList(lockKey),Collections.singletonList(identify)); 如果使用 jedis 客户端是这种形式的执行脚本
*/
Long result = redisTemplate.execute(redisScript, Arrays.asList(lockKey, lockValue));
return result != null && result > 0L;
}
}
复制代码
测试自定义分布式锁
package com.aha.train.test.lock.distributed;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.*;
@RestController
@Slf4j
public class TestLock {
@Autowired
private MyRedisLock myRedisLock;
@GetMapping("/acquire")
public void acquire () throws InterruptedException {
String lockValue = myRedisLock.acquire(200, "TEST_LOCK");
Thread.sleep(1000);
boolean release = myRedisLock.release("TEST_LOCK", lockValue);
log.info("申请锁成功:{},解锁:{}",lockValue,release);
}
@GetMapping("/multiple/thread")
public void testMultipleThread () {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
(100, 200, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(() -> {
try {
String key = "aha_key";
String value = myRedisLock.acquire(20000, key);
if (value != null) {
Thread.sleep(10);
log.info("执行业务逻辑");
if (myRedisLock.release(key, value))
log.info("解锁成功");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
复制代码
Redisson 实现分布式锁
导入 Redisson
的依赖
<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.1</version>
</dependency>
复制代码
编写配置文件
server:
port: 8002
spring:
# redis 缓存
redis:
# 使用 redisson 配置
redisson:
# 新版本的 redisson 配置文件没有办法使用这种形式了 可以使用下面这种形式
# config: classpath:config/redisson-single.yaml
config: |
# 单机模式
singleServerConfig:
# 连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
# 连接超时,单位:毫秒
connectTimeout: 10000
# 命令等待超时,单位:毫秒
timeout: 3000
# 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
# 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1000
# 密码
password: Aha@3166
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
# 客户端名称
clientName: null
# 节点地址
address: redis://10.8.18.115:30379
# 发布和订阅连接的最小空闲连接数
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小
subscriptionConnectionPoolSize: 50
# 最小空闲连接数
connectionMinimumIdleSize: 32
# 连接池大小
connectionPoolSize: 64
# 数据库编号
database: 4
# DNS监测时间间隔,单位:毫秒
dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
# threads: 0
# Netty线程池数量,默认值: 当前处理核数量 * 2
# nettyThreads: 0
# 编码
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode : "NIO"
复制代码
测试分布式锁
package com.aha.distributedlock.redisson;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 测试使用 redisson 创建分布式锁
*
* @author WT
* date 2021/11/6
*/
@Slf4j
@RestController
@RequestMapping("/redisson")
public class TestMyRedisLock {
private final RedissonClient redissonClient;
public TestMyRedisLock(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
@GetMapping("/acquire")
public void acquire () {
// RLock 可重入锁
// 获取锁对象(可以为"可重入锁"、"公平锁",如果redis是集群模式,还可以使用"红锁")
// 公平锁
// RLock redissonLock = redissonClient.getFairLock("TEST_REDISSON_LOCK");
// 非公平锁
RLock redissonLock = redissonClient.getLock("TEST_REDISSON_LOCK");
try {
// 尝试加锁,最多等待 100 秒,上锁之后 10 秒之后会自动解锁
// 我们会发现这边在 redis 中设置的值是一个 hash 类型 key 为上面指定的 key
boolean res = redissonLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
log.info("申请锁成功");
log.info("执行对应的业务逻辑..");
redissonLock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@GetMapping("/multiple/thread")
public void testMultipleThread () {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
(100, 200, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
for (int i = 0; i < 100; i++) {
threadPoolExecutor.execute(() -> {
RLock ahaLock = redissonClient.getLock("aha_key");
try {
boolean res = ahaLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
Thread.sleep(10);
log.info("执行业务逻辑");
ahaLock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
复制代码
思考:Redis 实现的分布式锁当主从切换的时候依旧安全吗