分布式锁之redis
实现方式之redisson
引入redission:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.4</version>
</dependency>
RedissonClient中已经封装好了锁的工具类RLock,我这里直接拿来使用:
package com.morris.distribute.lock.redis.redisson;
import com.morris.distribute.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RedissonClient redissonClient;
/**
* 分布式锁之redis(redisson实现)
*
* @param id
*/
public void updateStatus(int id) {
log.info("updateStatus begin, {}", id);
String key = "updateStatus" + id;
RLock lock = redissonClient.getLock(key);
lock.lock(); // 加锁
try {
Integer status = jdbcTemplate.queryForObject("select status from t_order where id=?", new Object[]{
id}, Integer.class);
if (Order.ORDER_STATUS_NOT_PAY == status) {
try {
// 模拟耗时操作
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int update = jdbcTemplate.update("update t_order set status=? where id=? and status=?", new Object[]{
2, id, Order.ORDER_STATUS_NOT_PAY});
if (update > 0) {
log.info("updateStatus success, {}", id);
} else {
log.info("updateStatus failed, {}", id);
}
} else {
log.info("updateStatus status already updated, ignore this request, {}", id);
}
log.info("updateStatus end, {}", id);
} finally {
lock.unlock(); // 释放锁
}
}
}
运行结果如如下:
2020-09-16 14:43:20,778 INFO [main] (Version.java:41) - Redisson 3.12.4
2020-09-16 14:43:21,298 INFO [redisson-netty-2-16] (ConnectionPool.java:167) - 1 connections initialized for 10.0.4.211/10.0.4.211:6379
2020-09-16 14:43:21,300 INFO [redisson-netty-2-19] (ConnectionPool.java:167) - 24 connections initialized for 10.0.4.211/10.0.4.211:6379
2020-09-16 14:43:21,371 INFO [t2] (OrderService.java:29) - updateStatus begin, 1
2020-09-16 14:43:21,371 INFO [t1] (OrderService.java:29) - updateStatus begin, 1
2020-09-16 14:43:21,371 INFO [t3] (OrderService.java:29) - updateStatus begin, 1
2020-09-16 14:43:24,610 INFO [t3] (OrderService.java:51) - updateStatus success, 1
2020-09-16 14:43:24,610 INFO [t3] (OrderService.java:58) - updateStatus end, 1
2020-09-16 14:43:24,620 INFO [t1] (OrderService.java:56) - updateStatus status already updated, ignore this request, 1
2020-09-16 14:43:24,620 INFO [t1] (OrderService.java:58) - updateStatus end, 1
2020-09-16 14:43:24,630 INFO [t2] (OrderService.java:56) - updateStatus status already updated, ignore this request, 1
2020-09-16 14:43:24,630 INFO [t2] (OrderService.java:58) - updateStatus end, 1
实现方式之jedis
下面通过jedis来手动实现redis分布式锁,更深入的理解redis实现分布锁的原理。
引入jedis:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
关键命令:SET key value NX PX miliseconds
。
怎么样才算加锁成功?谁调用set命令并带上nx选项,key不存在设置成功,否则失败,谁设置key成功,谁就获得锁。
客户端挂了怎么办?可以为key设置一个超时时间,如果客户端加锁后就挂了,那么这个key到时间就会被删除,不会造成死锁。
假如在超时时间内,业务还没处理完,key快要过期了怎么办?启动一个线程为key来延时。
具体实现如下:
package com.morris.distribute.lock.redis.my;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@Slf4j
public class RedisLock {
@Autowired
private JedisPool jedisPool;
public void lock(String key, String value) {
for (; ;) {
// 自旋获取锁
if (tryLock(key, value)) {
return;
}
try {
TimeUnit.MILLISECONDS.sleep(100); // 这里暂时休眠100ms后再次获取锁,后续可以向AQS一样使用等待队列实现
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 尝试加锁
*
* @param key
* @param value
* @return
*/
private boolean tryLock(String key, String value) {
SetParams setParams = SetParams.setParams().nx().px(4_000); // 默认超时时间为4s
Jedis jedis = jedisPool.getResource();
String result = jedis.set(key, value, setParams);
if ("OK".equals(result)) {
Thread thread = new Thread(() -> {
while (true) {
try {
TimeUnit.SECONDS.sleep(1); // 守护线程1s检测一下超时时间
} catch (InterruptedException e) {
e.printStackTrace();
}
Long ttl = jedis.pttl(key);
if (ttl < 2_000) {
// 当超时时间小于1/2时,增加超时时间到原来的4s
jedis.expire(key, 4_000);
log.info("add expire time for key : {}", key);
}
}
}, "expire1");
thread.setDaemon(true);
thread.start();
return true;
}
return false;
}
public void unlock(String key, String value) {
Jedis jedis = jedisPool.getResource();
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(key), Collections.singletonList(value));
}
}
使用方式如下:
package com.morris.distribute.lock.redis.my;
import com.morris.distribute.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RedisLock redisLock;
/**
* 分布式锁之redis(jedis实现)
*
* @param id
*/
public void updateStatus(int id) {
log.info("updateStatus begin, {}", id);
String key = "updateStatus" + id;
String value = UUID.randomUUID().toString();
redisLock.lock(key, value);
try {
Integer status = jdbcTemplate.queryForObject("select status from t_order where id=?", new Object[]{
id}, Integer.class);
if (Order.ORDER_STATUS_NOT_PAY == status) {
try {
// 模拟耗时操作
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int update = jdbcTemplate.update("update t_order set status=? where id=? and status=?", new Object[]{
2, id, Order.ORDER_STATUS_NOT_PAY});
if (update > 0) {
log.info("updateStatus success, {}", id);
} else {
log.info("updateStatus failed, {}", id);
}
} else {
log.info("updateStatus status already updated, ignore this request, {}", id);
}
log.info("updateStatus end, {}", id);
} finally {
redisLock.unlock(key, value); // 释放锁
}
}
}
运行结果如下:
2020-09-16 16:19:01,453 INFO [t2] (OrderService.java:28) - updateStatus begin, 1
2020-09-16 16:19:01,453 INFO [t1] (OrderService.java:28) - updateStatus begin, 1
2020-09-16 16:19:01,453 INFO [t3] (OrderService.java:28) - updateStatus begin, 1
2020-09-16 16:19:03,565 INFO [expire1] (RedisLock.java:53) - add expire time for key : updateStatus1
2020-09-16 16:19:04,748 INFO [t1] (OrderService.java:49) - updateStatus success, 1
2020-09-16 16:19:04,749 INFO [t1] (OrderService.java:56) - updateStatus end, 1
2020-09-16 16:19:04,801 INFO [t2] (OrderService.java:54) - updateStatus status already updated, ignore this request, 1
2020-09-16 16:19:04,801 INFO [t2] (OrderService.java:56) - updateStatus end, 1
2020-09-16 16:19:04,902 INFO [t3] (OrderService.java:54) - updateStatus status already updated, ignore this request, 1
2020-09-16 16:19:04,902 INFO [t3] (OrderService.java:56) - updateStatus end, 1
为什么不能分两步,先set key value
再expire key millseconds
?因为这两个操作不是原子性操作,如果一个客户端加锁后就挂了,那么这个key就一直不会删除,造成死锁。
为什么要启动一个守护线程来为key延时?守护线程会随创建它的线程的关闭而自动销毁,无需手动关闭,延时是为了让业务逻辑执行完成,避免key过期让其他线程抢到锁。
为什么释放锁的时候不是直接发送del key
命令?释放锁的时候需要校验value值,避免进程P1加的锁被其他进程释放,所以value值的设置也是有讲究的,这个值只有进程P1知道,这样释放的时候只有他才能删除这个key。
总结
优点:基于redis实现的分布式锁就会拥有redis的特点,那就是速度快。
缺点:实现逻辑复杂,redis本身是一个AP模型,只能保证网络分区和可用性,并不能保证强一致性,而分布式锁这个逻辑是一个CP模型,必须保证一致性,所以redis这种实现方式在一定概率上会出现多个客户端获取到锁,例如redis中的master节点设置key成功并返回给客户端,此时还没来得及同步给slave就挂了,然后slave被选举为新的master节点,其他客户端来获取锁就会成功,这样多个客户端就同时获取到锁了。