分布式锁
有的时候,我们需要保证一个方法在同 一时间内只能被同一个线程执行。在单机模式下,可以通过sychronized、锁等方式来实现,
在分布式环境下,有以下的解决方案:
数据库锁
1.通过一个一张表的一条记录,来判断资源的占用情况
2.使用基于数据库的排它锁 (即select * from tb_User for update)
3.使用乐观锁的方式,即CAS操作(或version字段)
基于Redis的分布式锁(缓存锁)
redis提供了可以用来实现分布式锁的方法,比如redis的setnx方法等。(即redis事务机制可以实现乐观锁CAS)
基于Zookeeper的分布式锁(这种方式最可靠)
基于zookeeper临时有序节点可以实现的分布式锁。大致思想即为:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。
判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。
redis分布式锁和Zookeeper分布式锁的区别
- redis分布式锁,其实需要自己不断去尝试获取锁,比较消耗性能;
- zk分布式锁,获取不到锁,注册个监听器即可,不需要不断主动尝试获取锁,性能开销较小。
- redis获取锁的那个客户端bug了或者挂了,那么只能等待超时时间之后才能释放锁;而zk的话,因为创建的是临时znode,只要客户端挂了,znode就没了,此时就自动释放锁。
数据库的方式实现分布式锁实现方式
info:分布式锁表
CREATE TABLE `lock_info`
( `id` bigint(20) NOT NULL,
`expiration_time` datetime NOT NULL,
`status` int(11) NOT NULL, `tag` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uk_tag` (`tag`) )
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
- id:主键
- tag:锁的标示,以订单为例,可以锁订单id
- expiration_time:过期时间
- status:锁状态,0,未锁,1,已经上锁
Redis实现分布式锁的方式
RedisLock:redis实现分布式锁工具类
/**
* redis实现分布式锁机制
*/
@Component
public class RedisLock {
//日志记录器
private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 加锁
* @param key
* @param value 当前事件+超时事件
* @return
*/
public boolean lock(String key,String value){
//加锁成功
if (redisTemplate.opsForValue().setIfAbsent(key,value)){
return true;
}
//假如currentValue=A先占用了锁 其他两个线程的value都是B,保证其中一个线程拿到锁
String currentValue = redisTemplate.opsForValue().get(key);
//锁过期 防止出现死锁
if (!StringUtils.isEmpty(currentValue) &&
Long.parseLong(currentValue) < System.currentTimeMillis()){
//获取上一步锁的时间
String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
if (!StringUtils.isEmpty(oldValue) &&
oldValue.equals(currentValue)){
return true;
}
}
return false;
}
/**
* 解锁
* @param key
* @param value
*/
public void unlock(String key,String value){
try {
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue) &&
currentValue.equals(value)){
redisTemplate.opsForValue().getOperations().delete(key);
}
}catch (Exception e){
logger.error("【redis分布式锁】 解锁异常,{}",e);
}
}
OrderService:消费者中的订单操作
@Service
@Transactional
public class OrderService extends ServiceImpl<OrderPojoMapper, OrderPojo> implements IOrderService {
@Autowired
OrderPojoMapper orderPojoMapper;
/** 超时时间 */
private static final int TIMEOUT = 5000;
@Autowired
RedisLock redisLock;
@Autowired
RedisTemplate redisTemplate;
//日志记录器
private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME);
/**
* 秒杀中订单
* @param orderPojo
* @return
*/
@Override
public ResponseResult seckillNewOrder(OrderPojo orderPojo) {
String msg="";
String productId=orderPojo.getProductId();
long time = System.currentTimeMillis() + TIMEOUT;
//加锁
String lockKey = "seckill:"+productId;
if (!redisLock.lock(lockKey,String.valueOf(time))){
return ResponseResult.error(ConstantsUtil.seckill_fail);
}
//进行商品抢购(商品数减一)
int stockNum = 0;
Object tmp =redisTemplate.opsForValue().get("seckill_"+productId);
if(tmp!=null){
String stockNumStr =tmp.toString();
if(StringUtils.isNotBlank(stockNumStr)){
stockNum = Integer.valueOf(stockNumStr);
}
if (stockNum == 0) {
//库存不足
return ResponseResult.error(ConstantsUtil.seckill_fail2);
} else {
redisTemplate.opsForValue().set("seckill_"+productId,String.valueOf(stockNum-1));
msg="恭喜你抢到商品,剩余商品数量为"+(stockNum-1);
//创建订单
orderPojoMapper.insert(orderPojo);
}
}else {
return ResponseResult.error(ConstantsUtil.seckill_fail3);
}
//解锁
redisLock.unlock(lockKey, String.valueOf(time));
return ResponseResult.success(msg);
}
}
使用Zookeeper实现分布式锁
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock implements Watcher {
private int threadId;
private ZooKeeper zk = null;
private String selfPath;
private String waitPath;
private String LOG_PREFIX_OF_THREAD;
private static final int SESSION_TIMEOUT = 10000;
private static final String GROUP_PATH = "/locks";
private static final String SUB_PATH = "/locks/sub";
private static final String CONNECTION_STRING = "127.0.0.1:2181";
private static final int THREAD_NUM = 10;
// 确保连接zk成功;
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
// 确保所有线程运行结束;
private static final CountDownLatch threadSemaphore = new CountDownLatch(
THREAD_NUM);
public DistributedLock(int id) {
this.threadId = id;
LOG_PREFIX_OF_THREAD = "【第" + threadId + "个线程】";
}
public static void main(String[] args) {
// 用多线程模拟分布式环境
for (int i = 0; i < THREAD_NUM; i++) {
final int threadId = i + 1;
new Thread() {
@Override
public void run() {
try {
DistributedLock dc = new DistributedLock(threadId);
dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
// GROUP_PATH不存在的话,由一个线程创建即可;
synchronized (threadSemaphore) {
dc.createPath(GROUP_PATH, "该节点由线程" + threadId
+ "创建", true);
}
dc.getLock();
} catch (Exception e) {
System.out.println("【第" + threadId + "个线程】 抛出的异常:");
e.printStackTrace();
}
}
}.start();
}
try {
threadSemaphore.await();
System.out.println("所有线程运行结束!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取锁
*
* @return
*/
private void getLock() throws KeeperException, InterruptedException {
// 去创建临时节点
selfPath = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(LOG_PREFIX_OF_THREAD + "创建锁路径:" + selfPath);
if (checkMinPath()) {
getLockSuccess();
}
}
/**
* 创建节点
*
* @param path 节点path
* @param data 初始数据内容
* @return
*/
public boolean createPath(String path, String data, boolean needWatch)
throws KeeperException, InterruptedException {
if (zk.exists(path, needWatch) == null) {
System.out.println(LOG_PREFIX_OF_THREAD
+ "节点创建成功, Path: "
+ this.zk.create(path, data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
+ ", content: " + data);
}
return true;
}
/**
* 创建ZK连接
*
* @param connectString ZK服务器地址列表
* @param sessionTimeout Session超时时间
*/
public void createConnection(String connectString, int sessionTimeout)
throws IOException, InterruptedException {
zk = new ZooKeeper(connectString, sessionTimeout, this);
connectedSemaphore.await();
}
/**
* 获取锁成功
*/
public void getLockSuccess() throws KeeperException, InterruptedException {
if (zk.exists(this.selfPath, false) == null) {
System.out.println(LOG_PREFIX_OF_THREAD + "本节点已不在了...");
return;
}
System.out.println(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!");
Thread.sleep(2000);
System.out.println(LOG_PREFIX_OF_THREAD + "删除本节点:" + selfPath);
zk.delete(this.selfPath, -1);
releaseConnection();
threadSemaphore.countDown();
}
/**
* 关闭ZK连接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
}
}
System.out.println(LOG_PREFIX_OF_THREAD + "释放连接");
}
/**
* 检查自己是不是最小的节点
*
* @return
*/
public boolean checkMinPath() throws KeeperException, InterruptedException {
List<String> subNodes = zk.getChildren(GROUP_PATH, false);
Collections.sort(subNodes);
int index = subNodes.indexOf(selfPath.substring(GROUP_PATH.length() + 1));
switch (index) {
case -1: {
System.out.println(LOG_PREFIX_OF_THREAD + "本节点已不在了..." + selfPath);
return false;
}
case 0: {
System.out.println(LOG_PREFIX_OF_THREAD + "子节点中,我果然是老大...哈哈哈" + selfPath);
return true;
}
default: {
this.waitPath = GROUP_PATH + "/" + subNodes.get(index - 1);
System.out.println(LOG_PREFIX_OF_THREAD + "获取子节点中,排在我前面的。。。"
+ waitPath);
try {
zk.getData(waitPath, true, new Stat());
return false;
} catch (KeeperException e) {
if (zk.exists(waitPath, false) == null) {
System.out.println(LOG_PREFIX_OF_THREAD + "子节点中,排在我前面的。。。"
+ waitPath + "已失踪,幸福来得太突然?");
return checkMinPath();
} else {
throw e;
}
}
}
}
}
@Override
public void process(WatchedEvent event) {
// 监听器处理事件
if (event == null) {
return;
}
Event.KeeperState keeperState = event.getState();
Event.EventType eventType = event.getType();
if (Event.KeeperState.SyncConnected == keeperState) {
if (Event.EventType.None == eventType) {
System.out.println(LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器");
connectedSemaphore.countDown();
} else if (event.getType() == Event.EventType.NodeDeleted
&& event.getPath().equals(waitPath)) {
System.out.println(LOG_PREFIX_OF_THREAD
+ "收到情报,排我前面的家伙已挂,我是不是可以出山了?");
try {
if (checkMinPath()) {
getLockSuccess();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else if (Event.KeeperState.Disconnected == keeperState) {
System.out.println(LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接");
} else if (Event.KeeperState.AuthFailed == keeperState) {
System.out.println(LOG_PREFIX_OF_THREAD + "权限检查失败");
} else if (Event.KeeperState.Expired == keeperState) {
System.out.println(LOG_PREFIX_OF_THREAD + "会话失效");
}
}
}