一.什么是惊群效应
举一个很简单的例子,当你往一群鸽子中间扔一块食物,虽然最终只有一个鸽子抢到食物,但所有鸽子都会被惊动来争夺,没有抢到食物的鸽子只好回去继续睡觉, 等待下一块食物到来。这样,每扔一块食物,都会惊动所有的鸽子,即为惊群。对于操作系统来说,多个进程/线程在等待同一资源是,也会产生类似的效果,其结果就是每当资源可用,所有的进程/线程都来竞争资源。
二.惊群效应的危害
- 巨大服务器性能消耗
- 可能发生宕机
- 巨大的网络冲击
三.分布式锁实现方式
数据库写入log实现分布式锁
缺点:1.若当前线程挂掉容易出现死锁
2.容易出现单点故障
redis实现分布式锁
缺点 :容易出现短期死锁,当前线程挂掉,但是由于redis的过期机制,所以可能多时间内死锁
zookeeper实现分布式锁
总结:相对来说zookeeper 更适合实现分布式锁,实现相对简单,可靠性相对比较高,性能比较好。
四.分布式锁惊群效应产生的场景
描述:多个线程未获取到锁,处于阻塞等待状态,等获取到锁的线程释放锁,所有处于阻塞状态的线程去争抢资源获取锁,导致产生惊群效应。
五.惊群效应解决方案
采用医院看病取号排队的原理,采用zookeeper的临时顺序节点,先创建子节点,获取所有子节点列表,判断当前节点是否是最小节点,如果是最小节点占用锁执行后续代码,执行完毕,释放锁。如果不是最小节点观察比自己小一个节点,等待锁,比自己小一个的节点被删除,得到通知,当前线程取获取锁,有效的避免了惊群效应。
六.粗略实现分布式分布式锁
1. 依赖核心jar
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
</dependency>
2.序列化器的实现
package com.jiuye.sona.common.Lock;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.io.UnsupportedEncodingException;
/**
* @Author: xinjian.hu
* @Date: 2020/8/25 16:59
* @Email: [email protected]
*/
public class SonaZkSerializer implements ZkSerializer {
/**
* 字符集
*/
private static final String CHARSET = "utf-8";
/**
* 序列化
*
* @param data
* @return
* @throws ZkMarshallingError
*/
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
try {
return String.valueOf(data).getBytes(CHARSET);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError(e);
}
}
/**
* 反序列化
*
* @param bytes
* @return
* @throws ZkMarshallingError
*/
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, CHARSET);
} catch (UnsupportedEncodingException e) {
throw new ZkMarshallingError(e);
}
}
}
3.代码逻辑实现:
/**
* 该版本为zk分布式锁的升级版本,正确规避惊群效应
*
* @Author: xinjian.hu
* @Date: 2020/8/25 18:10
* @Email: [email protected]
*/
public class ZkLockSuper implements Lock {
private static final Logger log = LoggerFactory.getLogger(ZkLock.class);
/**
* 锁路径父节点
*/
private String lockPath;
/**
* zk 客户端,对zk进行操作完成锁操作
*/
private ZkClient client;
/**
* 服务器路径
*/
private static final String serverPath = "localhost:2181";
/**
* 子节点(排队票号)
*/
private ThreadLocal<String> currentPath = new ThreadLocal<String>();
/**
* 前一个节点(前一个票号)
*/
private ThreadLocal<String> beforePath = new ThreadLocal<String>();
/**
* 重入锁计数器
*/
private AtomicInteger reentryLockCount = new AtomicInteger(0);
/**
* 初始化参数
*
* @param lockPath
*/
public ZkLockSuper(String lockPath) {
if (StringUtils.isEmpty(lockPath)) {
throw new IllegalArgumentException("锁路径为空,请输入锁路径");
}
this.lockPath = lockPath;
// 创建客户端对象
client = new ZkClient(serverPath);
// 设置序列化器
client.setZkSerializer(new SonaZkSerializer());
// 创建持久节点
if (!client.exists(this.lockPath)) {
client.createPersistent(this.lockPath);
}
//
reentryLockCount.set(0);
}
/**
* 使用锁的入口
*/
@Override
public void lock() {
if (!tryLock()) {
// 等待释放锁
waitForLock();
// 递归调用自己
lock();
}
}
/**
* 尝试获取锁
*
* @return
*/
@Override
public boolean tryLock() {
// 当前节点为空则创建临时顺序节点
if (this.currentPath.get() == null || !client.exists(this.currentPath.get())) {
String node = this.client.createEphemeralSequential(lockPath + "/", "sona-locked");
currentPath.set(node);
}
// 获取当前节点下所有的子节点
List<String> children = client.getChildren(lockPath);
// 对节点进行由小到大排序
Collections.sort(children);
String firstChild = lockPath + "/" + children.get(0);
// 如果当前节点是最小的节点则获取到锁
if (firstChild.equals(currentPath.get())) {
reentryLockCount.getAndIncrement();
System.out.println("获取锁成功,重入锁计数计数器加1,当前重入次数为"+reentryLockCount.get());
log.info("获取锁成功,重入锁计数计数器加1,当前重入次数为{}", reentryLockCount.get());
return true;
} else {
// 得到字节的索引号(获取当前节点的索引序号)
int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
// 获取前一个索引节点
String beforeNode = lockPath + "/" + children.get(curIndex - 1);
beforePath.set(beforeNode);
return false;
}
}
/**
* 释放锁的逻辑
*/
@Override
public void unlock() {
Integer count = reentryLockCount.get();
if (this.currentPath != null && count == 1) {
client.delete(currentPath.get());
currentPath.set(null);
reentryLockCount.getAndDecrement();
System.out.println("释放锁成功,重入锁计数计数器减1,剩余重入次数为"+reentryLockCount.get());
log.info("释放锁成功,重入锁计数计数器减1,剩余重入次数为{}", reentryLockCount.get());
}else {
reentryLockCount.getAndDecrement();
System.out.println("释放锁成功,重入锁计数计数器减1,剩余重入次数为"+reentryLockCount.get());
log.info("释放锁成功,重入锁计数计数器减1,剩余重入次数为{}", reentryLockCount.get());
}
}
/**
* 等待释放锁
*/
private void waitForLock() {
CountDownLatch countDownLatch = new CountDownLatch(1);
// zk数据节点的监听器
IZkDataListener dataListener = new IZkDataListener() {
/**
* 数据改变执行业务
* @param dataPath
* @param data
* @throws Exception
*/
@Override
public void handleDataChange(String dataPath, Object data) {
log.info("zk数据改变");
}
/**
* 数据删除的执行方法
* @param dataPath
* @throws Exception
*/
@Override
public void handleDataDeleted(String dataPath) {
countDownLatch.countDown();
log.info("zk锁被释放,释放锁的名称是{}", dataPath);
}
};
// 绑定监听节点
client.subscribeDataChanges(beforePath.get(), dataListener);
if (this.client.exists(beforePath.get())) {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解除监听
client.unsubscribeDataChanges(beforePath.get(), dataListener);
}
@Override
public boolean tryLock(long time, TimeUnit unit) {
return false;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition() {
return null;
}
}