ZooKeeper实战之ZkClient客户端实现分布式锁

声明:此博客为学习笔记,学习自极客学院ZooKeeper相关视频;本文内容是本人照着视频里的前辈所讲知识敲了
           一遍的记录,个别地方按照本人理解稍作修改。非常感谢众多大牛们的知识分享。

分布式锁:多线程锁,是用于在高并发多线程时保证共享资源的访问的;而分布式锁则用于:在分布式环境下,保
                   护跨进程、跨主机、跨网络的共享资源,实现互斥访问,保证一致性

 

相关概念:

分布式锁(相关节点)架构图:

说明:当zookeeper客户端需要抢夺某个资源时,会先在/locker节点下创建临时有序节点,通过监听比自己序号次小的节
           点的删除事件,来进行一些逻辑,判断自己的节点是否为/locker所有子节点中序号最小的,如果是那么就使用资
           源,使用完之后,再释放资源删除对应节点(具体流程见流程图)。

核心流程图:

 

软硬件环境:Windows10、IntelliJ IDEA、SpringBoot、ZkClient

准备工作:在pom.xml中引入相关依赖

<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

 

ZooKeeper分布式锁实现示例:

相关类总体说明:

DistributedLock:分布式锁基本功能接口。

BaseDistributedLock:分布式锁基本功能接口的实现辅助类,在该类中定义了一些有助于实现DistributedLock接口
                                       的方法。

SimpleDistributedLockImpl:ZooKeeper客户端单线程分布式锁的具体实现。

DistributedLockImpl:ZooKeeper客户端多线程分布式锁的具体实现。

MyZkClient:自定义的ZkClient的子类,重写了watchForData方法,以便于观察被监视节点的信息。

注:可以不继承ZkClient类重写watchForData方法,而在自己的实现逻辑中观察被监控节点的信息。如果不关心被监
        控节点的信息,那么可以什么也不干,不需要此类也是可以的。

DistributedLockTest:测试类。

提示:根据实际业务不同,可能实现的多线程分布式锁不一样,按照自己的业务修改或编写DistributedLockImpl
           
类即可

各类细节:

DistributedLock:

import java.util.concurrent.TimeUnit;

/**
 * 分布式锁 --- 接口
 *
 * @author JustryDeng
 * @date 2018/12/6 16:12
 */
public interface DistributedLock {
   
   /*
    * 获取锁,如果没有得到就等待
    */
   void acquire() throws Exception;

   /*
    * 获取锁,直到超时
    */
    boolean acquire(long time, TimeUnit unit) throws Exception;

   /*
    * 释放锁
    */
    void release() throws Exception;

}

BaseDistributedLock:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 分布式锁的 基本实现
 * 
 * 主要有两个方法:  
 *     releaseLock 释放锁
 *     attemptLock 尝试获取锁
 *
 * 声明:
 *     假设有节点node_c   /node_a/node_b/node_c
 *     那么,node_c节点的名字为node_c; node_c节点的路径为/node_a/node_b/node_c
 *
 * @author JustryDeng
 * @date 2018/12/6 16:13
 */
public class BaseDistributedLock {

    /** ZkClient客户端 */
    private final MyZkClient client;

    /** /locker节点路径 */
    private final String lockerNodePath;

    /**
     *  当前客户端在/locker节点下的子节点 路径
     *  注:创建节点后,此路径并不是创建节点后生成的路径,因为是有序节点,所以会略有不同
     *
     *  如:当前节点路径为为/aspire/abc,那么创建【临时有序】节点后,实际上路径为  /aspire/abc0000000001
     */
    private final String currentNodePath;

    /**
     *  当前客户端在/locker节点下的子节点 的节点名
     *  注:创建节点后,此名字并不是创建节点后生成的名字,因为是有序节点,所以会略有不同
     *
     *  如:当前节点名字为abc,那么创建【临时有序】节点后,实际上名字为   abc0000000001
     */
    @SuppressWarnings("all")
    private final String currentNodeName;

    /** 网络闪断时的 重试次数 */
    private static final Integer MAX_RETRY_COUNT = 10;

    /**
     * 构造器
     */
    public BaseDistributedLock(MyZkClient client, String lockerNodePath, String currentNodeName) {
        this.client = client;
        this.lockerNodePath = lockerNodePath;
        this.currentNodePath = lockerNodePath.concat("/").concat(currentNodeName);
        this.currentNodeName = currentNodeName;
    }

    /**
     * 释放锁
     *
     * @author JustryDeng
     * @date 2018/12/6 17:33
     */
    protected void releaseLock(String nodePath) {
        deleteNode(nodePath);
    }


    /**
     * 尝试获取锁
     *
     * @param time
     *            最大等待时长
     * @param unit
     *            最大等待时长的 时间单位
     *
     * return 成功获取到锁,那么返回 当前客户端创建节点后得到的节点路径
     *        没有获取到锁,那么返回null
     * @date 2018/12/6 17:33
     */
    protected String attemptLock(long time, TimeUnit unit) throws Exception {

        final long startMillis = System.currentTimeMillis();
        final Long millisToWait = (unit != null) && (time != -1) ? unit.toMillis(time) : null;
        String finalCurrentNodePath = null;
        boolean gotTheLock = false;

        boolean isDone = false;
        int retryCount = 0;
        // 首次进入,都会进一次下列代码块儿;但当网络出现闪断时,会进行循环重试
        while (!isDone) {
            isDone = true;
            try {
                try {
                    // 创建临时有序子节点
                    finalCurrentNodePath = createEphemeralSequentialNode(currentNodePath, null);
                } catch (ZkNoNodeException e) {
                    // 如果有父节点不存在,那么先创建父节点,父节点路径即为:lockerNodePath
                    client.createPersistent(lockerNodePath, true);
                    // 再次创建临时有序子节点
                    createEphemeralSequentialNode(currentNodePath, null);
                } catch (ZkNodeExistsException e) {
                    // 由于网络闪断,导致多次进行此步骤的话,那么忽略
                }
                gotTheLock = waitToLock(startMillis, millisToWait, finalCurrentNodePath);
            } catch (ZkNoNodeException e) {
                if (retryCount++ < MAX_RETRY_COUNT) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }
        if (gotTheLock) {
            return finalCurrentNodePath;
        }
        return null;
    }

    /**
     * 创建临时有序节点
     */
    private String createEphemeralSequentialNode(final String path, final Object data) {
        return client.createEphemeralSequential(path, data);
    }

    /**
     * 删除节点
     */
    private void deleteNode(String nodePath) {
        client.delete(nodePath);
    }

    /**
     * 等待获取锁
     *
     * @param startMillis
     *            等待开始时间
     * @param millisToWait
     *            最大等待时长
     * @param finalCurrentNodePath
     *            当前客户端对应的 在zookeeper上创建的节点 路径
     * @return  是否成功获取到锁
     * @throws  Exception
     *
     * @date 2018/12/6 18:14
     */
    private boolean waitToLock(long startMillis, Long millisToWait, String finalCurrentNodePath) throws Exception {
        boolean gotTheLock = false;
        boolean doDelete = false;
        try {
            while (!gotTheLock) {
                // 获取到/locker节点下的 按照 节点名 排序后的所有子节点
                List<String> children = getSortedChildren();
                // 获取当前客户端对应的节点的 节点名称
                String sequenceNodeName = finalCurrentNodePath.substring(lockerNodePath.length() + 1);
                // 获取当前客户端对应的节点 所在集合中的位置
                int ourIndex = children.indexOf(sequenceNodeName);
                if (ourIndex < 0) { // 如果集合中不存在该节点,那么抛出异常
                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                }
                // 当  当前客户端对应的节点  排在集合开头时,表示该此客户端获得锁
                boolean shouldGetTheLock = ourIndex == 0;
                // 当前客户端 应该监视的节点的名字
                String nodeNameToWatch = shouldGetTheLock ? null : children.get(ourIndex - 1);
                if (shouldGetTheLock) {
                    gotTheLock = true;
                } else {
                    // 组装当前客户端 应该监视的节点的路径
                    String previousSequencePath = lockerNodePath.concat("/").concat(nodeNameToWatch);
                    // 倒计时锁
                    final CountDownLatch latch = new CountDownLatch(1);
                    // 创建监听器
                    final IZkDataListener previousListener = new IZkDataListener() {
                        public void handleDataDeleted(String dataPath) {
                            latch.countDown();
                        }
                        public void handleDataChange(String dataPath, Object data) {
                            // ignore
                        }
                    };

                    try {
                        // 如果节点不存在会出现异常
                        client.subscribeDataChanges(previousSequencePath, previousListener);

                        if (millisToWait != null) {// 如果设置了等待时间,那么最多只等这么长时间
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if (millisToWait <= 0) { // 如果等待已经超时,那么需要删除当前客户端对应的临时有序节点
                                doDelete = true;
                                break;
                            }
                            // CountDownLatch#await
                            latch.await(millisToWait, TimeUnit.MICROSECONDS);
                        } else { // 如果没有设置等待时间,那么一直等待,知道获取到锁
                            // CountDownLatch#await
                            latch.await();
                        }
                    } catch (ZkNoNodeException e) {
                        //ignore
                    } finally {
                        client.unsubscribeDataChanges(previousSequencePath, previousListener);
                    }
                }
            }
        } catch (Exception e) {
            // 发生异常需要删除节点
            doDelete = true;
            throw e;
        } finally {
            // 如果需要删除节点
            if (doDelete) {
                deleteNode(finalCurrentNodePath);
            }
        }
        return gotTheLock;
    }


    /**
     * 按照子节点名字,升序排序
     *
     * @date 2018/12/6 18:14
     */
    private List<String> getSortedChildren() {
        try {
            List<String> children = client.getChildren(lockerNodePath);
            children.sort(Comparator.comparing(String::valueOf));
            return children;
        } catch (ZkNoNodeException e) {
            client.createPersistent(lockerNodePath, true);
            return getSortedChildren();
        }
    }

}

SimpleDistributedLockImpl:

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * zookeeper客户端下单线程线程  时的分布式锁的实现 (一个zookeeper客户端下 线程数 == 1的情况 ps:此单线程是指:不考虑守护线程)
 *
 * @author JustryDeng
 * @date 2018/12/7 9:37
 */
public class SimpleDistributedLockImpl extends BaseDistributedLock implements DistributedLock {

    /** /locker节点下所有子节点的 名称前缀(注:因为是有序的,实际上创建后的名字可能为lock-0000000001) */
    private static final String LOCK_NODE_NAME_PREFIX = "lock-";

    /** /locker节点路径 */
    private final String lockerNodePath;

    /** 当前客户端在/locker节点下创建子节点后,得到的(最终的)节点路径 */
    private String finalCurrentNodePath;

    /**
     * 构造器
     *
     * @param client
     *            zkclient客户端
     * @param lockerNodePath
     *            /locker节点路径
     */
    public SimpleDistributedLockImpl(MyZkClient client, String lockerNodePath) {
        super(client, lockerNodePath, LOCK_NODE_NAME_PREFIX);
        this.lockerNodePath = lockerNodePath;
    }


    /**
     * 获取锁的公共方法
     *
     * 注:当 time != -1 && unit != null时,才会最多只等待到指定时长,否者会一直等待下去
     * @param time
     *            等待时长
     * @param unit
     *            等待时长的单位
     * @return 是否获取到了锁
     * @throws Exception
     */
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        finalCurrentNodePath = attemptLock(time, unit);
        return finalCurrentNodePath != null;
    }

    /**
     * 一直等待---直到获取锁
     */
    public void acquire() throws Exception {
        if (!internalLock(-1, null)) {
            throw new IOException("连接丢失!在路径:'" + lockerNodePath + "'下不能获取锁!");
        }
    }

    /**
     * 最多等待指定时长---获取锁
     *
     * @return 是否获取到了锁
     * @throws Exception
     */
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return internalLock(time, unit);
    }

    /**
     * 释放锁
     *
     * @throws Exception
     */
    public void release() throws Exception {
        releaseLock(finalCurrentNodePath);
    }

}

DistributedLockImpl:

import org.I0Itec.zkclient.ZkClient;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * zookeeper客户端下多线程线程  时的分布式锁的实现 (一个zookeeper客户端下 线程数 >= 1的情况)
 *
 * @author JustryDeng
 * @date 2018/12/7 9:37
 */
public class DistributedLockImpl extends BaseDistributedLock implements DistributedLock {

    /** /locker节点下所有子节点的 名称前缀(注:因为是有序的,实际上创建后的名字可能为lock-0000000001) */
    private static final String LOCK_NODE_NAME_PREFIX = "lock-";

    /** ZooKeeper客户端对应的使用锁的信息 */
    private final ConcurrentMap<ZkClient, NodeInfo> zooKeeperClientInfo = new ConcurrentHashMap<>(8);

    /** zookeeper客户端 */
    private final ZkClient zkClient;

    /** /locker节点路径 */
    private final String lockerNodePath;

    /**
     * 成员内部类 --- 用于封装 每个线程的节点数据
     * 
     * 注:这里将/locker节点下的每一个子节点看作是一个lock
     * 
     */
    private static class NodeInfo {

        /** 对应的节点路径 */
        final String nodePath;

        /** 该客户端内,使用该锁资源的线程数 计数器 */
        final AtomicInteger lockCount = new AtomicInteger(1);

        /** NodeInfo类的构造器 */
        private NodeInfo(String nodePath) {
            this.nodePath = nodePath;
        }
    }

    /**
     * DistributedLockMutex类的构造器
     *
     * @param client
     *            ZkClient客户端
     * @param lockerNodePath
     *            /locker节点路径
     */
    public DistributedLockImpl(MyZkClient client, String lockerNodePath) {
        super(client, lockerNodePath, LOCK_NODE_NAME_PREFIX);
        this.zkClient = client;
        this.lockerNodePath = lockerNodePath;
    }


    /**
     * 获取锁的公共方法
     *
     * 注:当 time != -1 && unit != null时,才会最多只等待到指定时长,否者会一直等待下去
     * @param time
     *            等待时长
     * @param unit
     *            等待时长的单位
     * @return 是否获取到了锁(如果之前已经获取了锁,那么也会返回true)
     * @throws Exception
     */
    private synchronized boolean internalLock(long time, TimeUnit unit) throws Exception {
        NodeInfo nodeInfo = zooKeeperClientInfo.get(zkClient);
        if (nodeInfo != null) { // 如果此线程已经获取了锁
            System.out.println("zookeeper" + zkClient + "已经获得了该锁了!");
            nodeInfo.lockCount.incrementAndGet();
            return true;
        }
        // 如果此线程之前未获取锁
        String nodePath = attemptLock(time, unit);
        if (nodePath != null) {
            NodeInfo newNodeInfo = new NodeInfo(nodePath);
            zooKeeperClientInfo.put(zkClient, newNodeInfo);
            return true;
        }
        return false;
    }


    /**
     * 一直等待---直到获取锁
     */
    public void acquire() throws Exception {
        if (!internalLock(-1, null)) {
            throw new IOException("连接丢失!在路径:'" + lockerNodePath + "'下不能获取锁!");
        }
    }

    /**
     * 最多等待指定时长---获取锁
     *
     * @return 是否获取到了锁
     * @throws Exception
     */
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return internalLock(time, unit);
    }


    /**
     * 释放锁
     */
    public void release(){
        NodeInfo nodeInfo = zooKeeperClientInfo.get(zkClient);
        if (nodeInfo == null) {
            throw new IllegalMonitorStateException("你不是锁: " + lockerNodePath + "的拥有者,无法执行此操作!");
        }
        int newLockCount = nodeInfo.lockCount.decrementAndGet();
        if (newLockCount > 0) { // 当还有其他线程在使用锁时,那么还不能释放
            return;
        }
        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("锁计数器已经为负数: " + lockerNodePath);
        }
        try {
            // 只有当计数器为0时,才能正常释放锁
            releaseLock(nodeInfo.nodePath);
        } finally {
            zooKeeperClientInfo.remove(zkClient);
        }
    }

}

MyZkClient:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;

/**
 * 继承ZkClient 重写watchForData方法
 *
 * 注:如果对被监听的节点信息不感兴趣,那么可以不重写
 *
 * 注:watchForData 的参数final String path为被监听的节点的路径
 *    可以通过重写watchForData方法,来获取一些该节点的信息;
 *    我们当然也可以使用其他方式来获取该节点的信息。
 *
 * @author JustryDeng
 * @date 2018/12/7 11:52
 */
public class MyZkClient extends ZkClient {

    public MyZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer) {
        super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
    }

    @Override
    public void watchForData(final String path) {
        retryUntilConnected(() -> {
            System.out.println("进入重写的watchForData方法了!要被监听的节点path是:" + path);
            Stat stat = new Stat();
            _connection.readData(path, stat, true);
            return null;
        });
    }

}

DistributedLockTest:

import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;

/**
 * 测试类
 *
 * @author JustryDeng
 * @date 2018/12/7 11:19
 */
public class DistributedLockTest {

    /**
     * 程序入口
     */
    public static void main(String[] args) {
        // zookeeper单线程测试
        simpleDistributedLockImplTest();
        // zookeeper多线程测试
        // distributedLockImplTest();
    }

    /**
     * 测试SimpleDistributedLockImpl实现分布式锁
     * <p>
     * 测试原理: 让一个zookeeper客户端率先获取到锁,然后给足够的时间让其他zookeeper客户端
     *          试着去获取锁(会发现:不论给其他时间多长时间,它们都不能获取到锁),只有等到获
     *          取到锁的zookeeper客户端释放锁之后,其他zookeeper客户端才能获取到锁
     *
     * @date 2018/12/7 11:20
     */
    private static void simpleDistributedLockImplTest() {
        // 分别创建两个客户端,并获取两个SimpleDistributedLockImpl实例
        final MyZkClient zkClientOne = new MyZkClient("10.8.109.32:2181", 5000, 5000, new BytesPushThroughSerializer());
        final SimpleDistributedLockImpl sdliOne = new SimpleDistributedLockImpl(zkClientOne, "/locker");

        final MyZkClient zkClientTwo = new MyZkClient("10.8.109.32:2181", 5000, 5000, new BytesPushThroughSerializer());
        final SimpleDistributedLockImpl sdliTwo = new SimpleDistributedLockImpl(zkClientTwo, "/locker");

        try {
            sdliOne.acquire();
            System.out.println("zkClientOne 获取到锁了!");
            Thread threadOne = new Thread(() -> {
                try {
                    sdliTwo.acquire();
                    System.out.println("zkClientTwo 获取到锁了!");
                    sdliTwo.release();
                    System.out.println("zkClientTwo 释放锁了!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            threadOne.start();
            Thread.sleep(10000);
            sdliOne.release();
            System.out.println("zkClientOne 释放锁了!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 测试DistributedLockImpl实现分布式锁
     *
     * 注:有几个zookeeper客户端,/locker节点下就应该有几个子节点;子节点个数应只与zookeeper客户端个
     *     数有关,而与客户端的线程数无关
     * <p>
     * 说明:分布式锁,保证的是:同一时刻,最多只能有一个zookeeper客户端获取到锁;但是对于同一个客户端
     *                     内部的众多线程而言,同一时刻同一个zookeeper客户端里,是可以有多个线程
     *                     同时获取到分布式锁(资源)的.
     * <p>
     * 测试原理: 让一个zookeeper客户端的多个线程率先获取到锁,然后让其中的一个线程释放锁,其他线程仍然
     *          持有锁,此时该zookeeper客户端仍然持有着锁;然后给足够的时间让其他zookeeper客户端试着
     *          去获取锁(会发现:不论给其他时间多长时间,它们都不能获取到锁),只有等到获取到锁的zookeeper
     *          客户端的所有持有锁的线程全部释放完锁之后,这个zookeeper客户端才会真正的释放锁,此时其他
     *          zookeeper客户端才能获取到锁
     *
     * @date 2018/12/7 11:20
     */
    private static void distributedLockImplTest() {
        // 分别创建两个客户端,并获取两个SimpleDistributedLockImpl实例
        final MyZkClient zkClientOne = new MyZkClient("10.8.109.32:2181", 5000, 5000, new BytesPushThroughSerializer());
        final DistributedLockImpl dliOne = new DistributedLockImpl(zkClientOne, "/locker");

        final MyZkClient zkClientTwo = new MyZkClient("10.8.109.32:2181", 5000, 5000, new BytesPushThroughSerializer());
        final DistributedLockImpl dliTwo = new DistributedLockImpl(zkClientTwo, "/locker");
        try {

            Thread threadOne = new Thread(() -> {
                try {
                    System.out.println("zkClientTwo-threadOne 尝试获取锁!");
                    dliTwo.acquire();
                    System.out.println("zkClientTwo-threadOne 获取到锁了!");
                    Thread.sleep(2000);
                    dliTwo.release();
                    System.out.println("zkClientTwo-threadOne 释放锁了!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            Thread threadTwo = new Thread(() -> {
                try {
                    System.out.println("zkClientTwo-threadTwo 尝试获取锁!");
                    dliTwo.acquire();
                    System.out.println("zkClientTwo-threadTwo 获取到锁了!");
                    Thread.sleep(5000);
                    dliTwo.release();
                    System.out.println("zkClientTwo-threadTwo 释放锁了!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            threadOne.start();
            threadTwo.start();
            // 阻塞一秒,保证另一个zookeeper客户端已经获取到锁了,这个zookeeper才去尝试获取锁
            Thread.sleep(2000);
            System.out.println("zkClientOne 尝试获取锁!");
            dliOne.acquire();
            System.out.println("zkClientOne 获取到锁了!");
            dliOne.release();
            System.out.println("zkClientOne 释放锁了!");
            // 为了观察控制台输出,这里sleep 20秒
            Thread.sleep(7000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 

测试一下:

前提条件:启动对应的ZooKeeper服务器,开放端口(或关闭防火墙)

zookeeper客户端单线程测试:

使DistributedLockTest类的main方法执行单线程测试:

控制台输出:

zookeeper客户端多线程测试:

使DistributedLockTest类的main方法执行单线程测试:

控制台输出:

由此可见,分布式锁成功!

 

所有zookeeper示例内容有(代码链接见本人末):

微笑如有不当之处,欢迎指正

微笑zookeeper示例代码托管链接
              
https://github.com/JustryDeng/CommonRepository
微笑参考书籍
             《ZooKeeper-分布式过程协同技术详解》
                        Flavio Junqueira Benjamin Reed 著, 谢超 周贵卿 译

微笑学习视频(推荐观看学习)
             极客学院ZooKeeper相关视频

微笑本文已经被收录进《程序员成长笔记(三)》,笔者JustryDeng

猜你喜欢

转载自blog.csdn.net/justry_deng/article/details/84875228