Zookeeper
zookeeper应用场景
- 配置中心
- 命名服务
- Master选举
- 集群管理
- 分布式队列
- 分布式锁
zookeeper数据模型
层次命名空间
从官网上可以看到zookeeper的数据模型图如下:
- 类似Linux文件系统,以(/)为 根
- 节点既可以存储数据,也可以可以存储子节点,所以可以认为zookeeper节点既是文件夹,其本身也是一个文件
- 节点的路径 总是表示为规范的,绝对的,以(/)分割的路径
znode
- 名称唯一,命名规范
- 节点类型:持久节点、顺序节点、临时节点、临时顺序节点
- 节点命名规则:
null不能作为节点
zookeeper为zookeeper保留节点
不能够使用相对路径,具体可以查看官网
zookeeper的具体使用
分布式锁
分布式锁是控制分布式系统同步访问共享资源的一种方式。
在分布式系统中,如果有两个系统需要对同一个资源进行操作的时候,就需要通过一些互斥的手段来防止其他系统的干扰,以保证一致性,此时就需要使用到分布式锁了。
常见的分布式锁实现有redis分布式锁,zookeeper分布式锁,下面介绍下zookeeper分布式锁的实现。
1、原理
使用zookeeper临时节点+watch机制实现。
由于zookeeper节点不可重名,所以对于同一个节点(/lock/createOrder),如果有多个系统同时想要创建时,只会有一个系统成功,成功的系统即为获取到锁,可以进行下一步的业务操作,没有获取到锁的系统,可以对该临时节点设置监听,以便再临时节点删除后,可以获取锁资源。
下图为第一种分布式锁的实现原理图:
具体代码如下:
package com.xiaohuihui.zookeeper;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.io.UnsupportedEncodingException;
/**
* @Desription: 小灰灰的zk序列化
* @Author: yangchenhui
*/
public class XiaohuihuiZkSerializer implements ZkSerializer {
String charset = "UTF-8";
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
try {
return String.valueOf(data).getBytes(charset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new ZkMarshallingError(e);
}
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, charset);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
throw new ZkMarshallingError(e);
}
}
}
package com.xiaohuihui.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @Desription: zk分布式锁
* @Author: yangchenhui
*/
public class ZkDistributeLock implements Lock {
/**
* zk节点
*/
private String lockPath;
/**
* zk客户端
*/
private ZkClient zkClient;
private final String connectString = "localhost:2181";
private final int sessionTimeout = 2000;
private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
public ZkDistributeLock(String lockPath) {
this.lockPath = lockPath;
zkClient = new ZkClient(connectString, sessionTimeout);
zkClient.setZkSerializer(new XiaohuihuiZkSerializer());
}
public ZkDistributeLock(String lockPath, ZkClient zkClient) {
this.lockPath = lockPath;
this.zkClient = zkClient;
}
@Override
public void lock() {
boolean hasLock = tryLock();
if (!hasLock) {
// 没有获取到锁,等待获取锁
waitForLock();
// 再次尝试获取锁
lock();
}
}
/**
* 监听zk节点变化,等待获取锁
*/
private void waitForLock() {
CountDownLatch downLatch = new CountDownLatch(1);
IZkDataListener zkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("=======> 有人修改了zk节点数据");
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
downLatch.countDown();
System.out.println("=======> zk节点被删除,可以重新获取锁");
}
};
zkClient.subscribeDataChanges(lockPath, zkDataListener);
// 如果当前zk节点存在,阻塞线程
if (zkClient.exists(lockPath)) {
try {
downLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注册
zkClient.unsubscribeDataChanges(lockPath, zkDataListener);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
// 保证锁的可重入性(在一个业务的执行过程中,如果有多个地方需要获取到锁,需要实现锁的可重入性),通过本地线程变量实现
Integer count = this.reentrantCount.get();
if (count != null && count > 0) {
// 表示当前线程已经获取到锁
this.reentrantCount.set(++count);
return true;
}
// 当前线程没有获取到锁,尝试创建zk节点,创建成功即为获取到锁
try {
zkClient.createEphemeral(lockPath);
this.reentrantCount.set(1);
return true;
} catch (ZkNodeExistsException e) {
return false;
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
// 判断当前线程是否持有锁
Integer count = this.reentrantCount.get();
if (null != count && count > 1) {
// 当前线程有超过1处获取锁
this.reentrantCount.set(--count);
return;
} else if (null != count && count.equals(1)) {
// 只有一处获取过锁
this.reentrantCount.set(null);
}
// 删除zk节点
zkClient.delete(lockPath);
}
@Override
public Condition newCondition() {
return null;
}
}
package com.xiaohuihui.zookeeper;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @Desription: zk分布式锁测试
* @Author: yangchenhui
*/
public class ZkDistributeLockTest {
public static void main(String[] args) {
// 并发数
int currency = 50;
// 循环屏障
CyclicBarrier cb = new CyclicBarrier(currency);
// 多线程模拟高并发
for (int i = 0; i < currency; i++) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " ==============> 准备好");
// 等待一起出发
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
ZkDistributeLock lock = new ZkDistributeLock("/xiaohuihui zkDistributeLock");
// ZkDistributeLock2 lock = new ZkDistributeLock2("/xiaohuihui zkDistributeLock");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 获得锁!");
} finally {
lock.unlock();
}
}
}).start();
}
}
}
创建锁的时候需要考虑到可重入性,否则在同一个线程中,如果有多个地方需要获取到锁,获取不到的地方会产生阻塞,所以需要实现可重入性,使用ThreadLocal可以实现。
运行效果如下:
这种情况下,只要节点删除,所有设置监听的客户端都会收到通知,产生惊群效应,极大的浪费系统资源,需要进行改进,实现思路如下图:
这样只会最小号收到通知,具体实现如下:
package com.xiaohuihui.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @Desription: 使用zk临时顺序节点实现分布式锁,避免惊群效应
* @Author: yangchenhui
*/
public class ZkDistributeLock2 implements Lock {
/**
* zk节点
*/
private String lockPath;
/**
* zk客户端
*/
private ZkClient zkClient;
private final String connectString = "localhost:2181";
private final int sessionTimeout = 2000;
/**
* 重入锁计数
*/
private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
/**
* 当前节点路径
*/
private ThreadLocal<String> currentPath = new ThreadLocal<>();
/**
* 上个节点路径
*/
private ThreadLocal<String> beforePath = new ThreadLocal<>();
public ZkDistributeLock2(String lockPath) {
this.lockPath = lockPath;
zkClient = new ZkClient(connectString, sessionTimeout);
zkClient.setZkSerializer(new XiaohuihuiZkSerializer());
if (!this.zkClient.exists(lockPath)) {
try {
this.zkClient.createPersistent(lockPath);
} catch (ZkNodeExistsException e) {
}
}
}
@Override
public void lock() {
// 获取锁
boolean hasLock = tryLock();
if (!hasLock) {
// 没有获取到锁,阻塞监听
waitLock();
// 再次尝试获取锁
lock();
}
}
/**
* 阻塞监听,等待获取zk锁
*/
private void waitLock() {
CountDownLatch downLatch = new CountDownLatch(1);
IZkDataListener zkDataListener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
downLatch.countDown();
System.out.println("=======> zk节点被删除,可以重新获取锁");
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("=======> 有人修改了zk节点数据");
}
};
// 此时只需要监听上一个zk节点,不会引起惊群效应
zkClient.subscribeDataChanges(this.beforePath.get(), zkDataListener);
// 如果前一个zk节点存在,阻塞线程
if (zkClient.exists(this.beforePath.get())) {
try {
downLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注册
zkClient.unsubscribeDataChanges(this.beforePath.get(), zkDataListener);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
// 保证锁的可重入性(在一个业务的执行过程中,如果有多个地方需要获取到锁,需要实现锁的可重入性),通过本地线程变量实现
Integer count = this.reentrantCount.get();
if (count != null && count > 0) {
// 表示当前线程已经获取到锁
this.reentrantCount.set(++count);
return true;
}
// 如果当前节点不存在,创建临时顺序节点
if (this.currentPath.get() == null) {
this.currentPath.set(zkClient.createEphemeralSequential(lockPath + "/", "xiaohuihui testlock2"));
}
// 获取所有子节点,判断当前节点是不是最小的节点
List<String> children = zkClient.getChildren(lockPath);
Collections.sort(children);
if (this.currentPath.get().equals(lockPath + "/" + children.get(0))) {
// 当前节点即为最小节点
this.reentrantCount.set(1);
return true;
} else {
// 获取到当前节点的前一个节点,设置beforePath
int currentPathIndex = children.indexOf(this.currentPath.get().substring(lockPath.length() + 1));
this.beforePath.set(lockPath + "/" + children.get(currentPathIndex - 1));
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
// 判断当前线程是否持有锁
Integer count = this.reentrantCount.get();
if (count != null) {
// 当前线程已经在多处获取锁
if (count > 1) {
this.reentrantCount.set(--count);
return;
} else {
this.reentrantCount.set(null);
}
}
// 删除当前zk节点
zkClient.delete(this.currentPath.get());
}
@Override
public Condition newCondition() {
return null;
}
}
运行效果如下,已经避免惊群效应:
PS:未完待续