package zk.lock;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZkDistributedClient {
// 超时时间
private static final int SESSION_TIMEOUT = 5000;
// zookeeper server列表
private String hosts = "127.0.0.1:2181";
private String groupNode = "locks1";
//private String subNode = "sub";
private ZooKeeper zk;
// 当前client创建的子节点
private String thisPath;
// 当前client等待的子节点
private String waitPath;
private CountDownLatch latch = new CountDownLatch(1);
//返回结果
public String result =null;
/**
* 连接zookeeper
* subNode 临时节点头
* data 数据
*/
public String connectZookeeper(String subNode,String data) throws Exception {
final String fdata = data;
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 连接建立时, 打开latch, 唤醒wait在该latch上的线程
if (event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
// 发生了waitPath的删除事件
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
// 确认thisPath是否真的是列表中的最小节点
List<String> childrenNodes = zk.getChildren("/" + groupNode, false);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 排序
Collections.sort(childrenNodes);
int index = childrenNodes.indexOf(thisNode);
if (index == 0) {
// 确实是最小节点
String t = doSomething(fdata);
System.out.println("watch:::"+t);
result = t;
} else {
// 说明waitPath是由于出现异常而挂掉的
// 更新waitPath
waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);
// 重新注册监听, 并判断此时waitPath是否已删除
if (zk.exists(waitPath, true) == null) {
String tt = doSomething(fdata);
System.out.println("watch===watch:::"+tt);
result = tt;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 等待连接建立
latch.await();
// 创建子节点
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小会, 让结果更清晰一些
Thread.sleep(10);
// 注意, 没有必要监听"/locks"的子节点的变化情况
List<String> childrenNodes = zk.getChildren("/" + groupNode, false);
// 列表中只有一个子节点, 那肯定就是thisPath, 说明client获得锁
if (childrenNodes.size() == 1) {
return doSomething(data);
} else {
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
System.out.println(thisNode +"--------------");
// 排序
Collections.sort(childrenNodes);
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
// never happened
} else if (index == 0) {
// inddx == 0, 说明thisNode在列表中最小, 当前client获得锁
return doSomething(data);
} else {
// 获得排名比thisPath前1位的节点
this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);
// 在waitPath上注册监听器, 当waitPath被删除时, zookeeper会回调监听器的process方法
zk.getData(waitPath, true, new Stat());
return null;
}
}
return null;
}
private String doSomething(String data) throws Exception {
try {
System.out.println("gain lock: " + thisPath+"----"+data);
byte[] by= zk.getData("/locknum", null, null);
String num = new String(by);
System.out.println(num+" :::::当前值--------------------------");
Integer newNum = Integer.parseInt(num) -1;
zk.setData("/locknum", newNum.toString().getBytes(), -1);
System.out.println(newNum+" :::::最终值--------------------------");
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
// 将thisPath删除, 监听thisPath的client将获得通知
// 相当于释放锁
zk.delete(this.thisPath, -1);
}
return data;
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 5; i++) {
new Thread() {
public void run() {
try {
ZkDistributedClient dl = new ZkDistributedClient();
String s = dl.connectZookeeper("gao","test");
System.out.println("return-=========="+s);
System.out.println("return========================"+dl.result);
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
Thread.sleep(Long.MAX_VALUE);
}
}