一、分布式锁
- 多个进程在使用该资源的时候,会先去获得锁,当"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
二、分布式锁案例分析
三、原生 Zookeeper 实现分布式锁案例
3.1、原生 Zookeeper 实现分布式锁代码
-
分布式锁代码
package com.xz.case2; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; public class DistributedLock { // zk服务列表 private final String connectString = "192.168.136.26:2181"; // 超时时间 private final int sessionTimeout = 200000; private final ZooKeeper zk; // 根节点路径 private String rootNode = "locks"; // 根节点下的子节点路径前缀 private String subNode = "seq-"; // zk连接 private CountDownLatch connectLatch = new CountDownLatch(1); // zk节点等待 private CountDownLatch waitLatch = new CountDownLatch(1); // 当前客户端等待的子节点路径 private String waitPath; // 当前客户端创建的子节点路径 private String currentMode; public DistributedLock() throws IOException, InterruptedException, KeeperException { // 获取连接 zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { // connectLatch 如果连接上zk 可以释放 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){ connectLatch.countDown(); } // waitLatch 需要释放 if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){ waitLatch.countDown(); } } }); // 等待zk正常连接后,往下走程序 connectLatch.await(); // 判断根节点/locks是否存在 Stat stat = zk.exists("/"+rootNode, false); if (stat == null) { // 创建根节点 zk.create("/"+rootNode, "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } // 对zk加锁 public void zklock() { // 创建对应的临时带序号节点 try { currentMode = zk.create("/"+rootNode +"/"+ subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // wait一小会, 让结果更清晰一些 Thread.sleep(10); /** * 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点 */ // 获取根节点下所有子节点 List<String> children = zk.getChildren("/" +rootNode, false); if (children.size() == 1) { //如果children 只有一个值,那就直接获取锁; return; } else { // 如果有多个节点,需要判断,谁最小 // 对根节点下的所有临时顺序节点进行从小到大排序 Collections.sort(children); // 获取节点名称 seq-00000000 String thisNode = currentMode.substring(("/" +rootNode+ "/").length()); // 通过seq-00000000获取该节点在children集合的位置 int index = children.indexOf(thisNode); // 判断 if (index == -1) { System.out.println("数据异常"); } else if (index == 0) { // 就一个节点,可以获取锁了 return; } else { // 需要监听 他前一个节点变化 waitPath = "/" + rootNode + "/" + children.get(index - 1); zk.getData(waitPath,true,new Stat()); //进入等待锁状态 waitLatch.await(); return; } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } // 解锁 public void unZkLock() { // 删除节点 try { zk.delete(this.currentMode,-1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }
3.2、分布式锁测试代码
-
分布式锁测试代码
package com.xz.case2; import org.apache.zookeeper.KeeperException; import java.io.IOException; public class DistributedLockTest { public static void main(String[] args) throws InterruptedException, IOException, KeeperException { final DistributedLock lock1 = new DistributedLock(); final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable() { @Override public void run() { try { lock1.zklock(); System.out.println("线程1 启动,获取到锁"); Thread.sleep(5 * 1000); lock1.unZkLock(); System.out.println("线程1 释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.zklock(); System.out.println("线程2 启动,获取到锁"); Thread.sleep(5 * 1000); lock2.unZkLock(); System.out.println("线程2 释放锁"); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
3.3、分布式锁测试
-
linux服务器启动zk服务端和客户端,创建locks根节点即3个临时子节点
[zk: localhost:2181(CONNECTED) 1] create -e -s /locks/seq- "zhangsan" Created /locks/seq-0000000000 [zk: localhost:2181(CONNECTED) 2] create -e -s /locks/seq- "lisi" Created /locks/seq-0000000001 [zk: localhost:2181(CONNECTED) 3] create -e -s /locks/seq- "wangwu" Created /locks/seq-0000000002 [zk: localhost:2181(CONNECTED) 4]
-
启动测试类代码,观察控制台变化