利用ZooKeeper特性实现分布式锁
在单机应用中,常常会遇到多进程/线程竞争系统资源的情况,为了保持多进程/线程情况下数据的一致性,在计算机中引入了锁的概念,从而 保证在多进程/线程情况下,一个系统资源在某个CPU事件片内只有一个进程/线程能够访问。但是在分布式系统的情况下,数据不再是某一台机器进行维护,而是多个系统,多个机器进行维护,根据单机中的锁的概念,人们又衍生出分布式锁。
ZooKeeper是开源的分布式应用协调服务,是Google的Chubby的开源实现,其分布式一致性协议是以Paxos算法为基础改造而来,来保证分布式系统中的数据一致性。其特点有很多 ,包括zNode,临时节点,有序节点,事件监听等。
根据Zookeeper的有序临时节点和事件监听机制我们可以设计一个简易版的分布式锁。
核心思想是,Zookeeper的临时节点会在客户端断开链接后自动删除,删除会触发监听事件;而Zookeeper的有序节点可以保证整个分布式系统获取锁的顺序,我们约定最小节点为当前的锁的获得者。在尝试获取锁的阶段先创建一个有序临时节点,然后将所有有序临时节点获取到并排序,获取最小的一个,如果和当前创建的节点为同一个节点,则表示获取到了锁,否则,根据当前节点去拿到比当前节点小一级的节点,然后为其注册监听事件 ,一直等待到其被删除释放为止。
代码:
public class DistributeLock implements Lock, Watcher {
private ZooKeeper zk = null; //zookeeper客户端实例
private String ROOT_LOCK = "/locks"; //分布式锁根目录
private String WAIT_LOCK; //当前进程需要等待的锁节点
private String CURRENT_LOCK; //当前的锁
private CountDownLatch countDownLatch;
public DistributeLock(){
try {
zk = new ZooKeeper("master:2181",
4000, this);
//判断根节点是否存在
Stat stat = zk.exists(ROOT_LOCK, false);
if(stat == null){
//如果不存在,就创建
zk.create(ROOT_LOCK,
"0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void lock() {
if(this.tryLock()){
System.out.println(Thread.currentThread().getName() + "->" + CURRENT_LOCK + "->获得锁成功");
return;
}
waitForLock(WAIT_LOCK); //没有获得锁就等待获取锁
}
private boolean waitForLock(String prev){
try {
Stat stat = zk.exists(prev, true); //这里设置为True,表示对prev节点进行监听
//因为临时节点在客户端断开连接后会被自动删除,这时就会触发监听事件
if(stat != null){
System.out.println(Thread.currentThread().getName() + "->等待锁"+prev+"->释放");
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "->获得锁"+CURRENT_LOCK+"->成功");
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
/**
* 尝试获得锁时,不管发生任何事,首先去创建一个临时的有序节点
* 并将其设置为当前获得的锁
* @return
*/
@Override
public boolean tryLock() {
try {
CURRENT_LOCK = zk.create(ROOT_LOCK+"/",
"0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"->尝试竞争锁");
List<String> childrens = zk.getChildren(ROOT_LOCK, false);
SortedSet<String> sortedSet = new TreeSet<>(); //对获取到的所有子节点进行排序
for(String child : childrens){
sortedSet.add(ROOT_LOCK+"/"+child);
}
String first = sortedSet.first(); //得到最小的节点
SortedSet<String> lessThanMe = sortedSet.headSet(CURRENT_LOCK);
//最小的节点和我创建的节点进行比较
if(CURRENT_LOCK.equals(first)){
//表示我创建的节点是最小的节点
return true;
}else{
//否则等待那个比当前节点更小的节点释放锁
if(!lessThanMe.isEmpty()){
WAIT_LOCK = lessThanMe.last();
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() +"-> 释放锁");
try {
zk.delete(CURRENT_LOCK,-1);
CURRENT_LOCK = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
@Override
public void process(WatchedEvent watchedEvent) {
//处理监听事件
if(watchedEvent.getType().equals(Event.EventType.NodeDeleted)){
if(this.countDownLatch != null){
this.countDownLatch.countDown();
}
}
}
public static void main(String[] args) throws IOException {
CountDownLatch cdLatch = new CountDownLatch(10);
for(int i=0; i<10; i++){
new Thread(() -> {
try {
cdLatch.await();
DistributeLock distributeLock = new DistributeLock();
distributeLock.lock(); //获取锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程"+i).start();
cdLatch.countDown();
}
System.in.read();
}
}
测试结果