package org.gjp;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class ZkCreateNode {
/**
* 检查节点是否存在
*
* @param client
* @param path
* @return
*/
public Stat checkNode(CuratorFramework client, String path) {
ExistsBuilder eb = client.checkExists();
Stat stat = null;
try {
stat = eb.forPath(path);
if (stat != null) {
System.out.println(stat);
System.out.println("version:" + stat.getVersion());
} else {
System.out.println(" null ......");
}
} catch (Exception e) {
e.printStackTrace();
}
return stat;
}
/**
* 创建节点
*
* @param client
* @param path
* @return
*/
public String createNode(CuratorFramework client, final String path, final String value) {
String result = "";
try {
result = client.create().forPath(path, value.getBytes());
System.out.println("res:" + result);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 设置具有序列化的临时节点
* @param client
* @param path
* @param value
* @return
*/
public String createModeEphSeqNode(CuratorFramework client, final String path, final String value) {
String result = "";
try {
result = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, value.getBytes());
System.out.println("res:" + result);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 删除节点
*
* @param client
* @param path
* @return
*/
public boolean deleteNode(CuratorFramework client, final String path) {
boolean result = true;
try {
client.delete().forPath(path);
} catch (Exception e) {
result = false;
e.printStackTrace();
}
return result;
}
/**
* 修改节点值
* @param client
* @param path
* @param val
* @return
*/
public boolean changeNodeData(CuratorFramework client, final String path,final String val){
boolean result =true;
try {
client.setData().forPath(path, val.getBytes());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
}
public void getNodeChild(CuratorFramework client, final String path) throws Exception{
List<String> pathChild = client.getChildren().forPath(path);
for(String item:pathChild){
System.out.println("child:"+item);
}
}
/**
* 分布式锁
*
* @param client
* @param path
* @param val
*/
public boolean disLock(CuratorFramework client, final String path, final String val) {
boolean result = false;
// 首先查看节点是否存在
Stat stat = checkNode(client, path);
if (null == stat) {
// 创建节点
String res = createNode(client, path, val);
if (null != res && "".equals(res.trim())) {
// 创建节点成功
InterProcessMutex lock = new InterProcessMutex(client, path);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
//业务逻辑内容。。。。。。。
System.out.println(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " release lock");
result = true;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return result;
}
//节点监听说明 /////////////////////////////////////////////////////////////
//1.path Cache 连接 路径 是否获取数据
//能监听所有的字节点 且是无限监听的模式 但是 指定目录下节点的子节点不再监听
public void setListenterThreeOne(CuratorFramework client,final String path) throws Exception{
PathChildrenCache childrenCache = new PathChildrenCache(client, path, true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("开始进行事件分析:-----"+event.getType());
ChildData data = event.getData();
StringBuffer sb = new StringBuffer(32);
switch (event.getType()) {
case CHILD_ADDED:
sb.append("CHILD_ADDED : "+ data.getPath());
break;
case CHILD_REMOVED:
sb.append("CHILD_REMOVED : "+ data.getPath());
break;
case CHILD_UPDATED:
sb.append("CHILD_UPDATED : "+ data.getPath());
break;
default:
break;
}
}
};
childrenCache.getListenable().addListener(childrenCacheListener);
System.out.println("Register zk watcher successfully!");
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
}
//2.Node Cache 监控本节点的变化情况 连接 目录 是否压缩
//监听本节点的变化 节点可以进行修改操作 删除节点后会再次创建(空节点)
public void setListenterThreeTwo(CuratorFramework client,final String path) throws Exception{
//设置节点的cache
final NodeCache nodeCache = new NodeCache(client, path, false);
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("the test node is change and result is :");
System.out.println("path : "+nodeCache.getCurrentData().getPath());
System.out.println("data : "+new String(nodeCache.getCurrentData().getData()));
System.out.println("stat : "+nodeCache.getCurrentData().getStat());
}
});
nodeCache.start();
}
//3.Tree Cache
// 监控 指定节点和节点下的所有的节点的变化--无限监听 可以进行本节点的删除(不在创建)
public void setListenterThreeThree(CuratorFramework client,final String path) throws Exception{
ExecutorService pool = Executors.newCachedThreadPool();
//设置节点的cache
TreeCache treeCache = new TreeCache(client, path);
//设置监听器和处理过程
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
StringBuffer sb = new StringBuffer(32);
switch (event.getType()) {
case NODE_ADDED:
sb.append("NODE_ADDED : "+ data.getPath());
break;
case NODE_REMOVED:
sb.append("NODE_REMOVED : "+ data.getPath());
break;
case NODE_UPDATED:
sb.append("NODE_UPDATED : "+ data.getPath());
break;
default:
break;
}
if(null !=data){
sb.append(" ;data=");
sb.append(new String(data.getData()));
}
sb.append(" end");
System.out.println(sb.toString());
}
});
//开始监听
treeCache.start();
}
public static void main(String[] args) {
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new RetryNTimes(10, 5000));
client.start();
String parentPath = "/zkpath2";
String path = "/zkpath2/product13";
ZkCreateNode zk = new ZkCreateNode();
try {
//zk.setListenterThreeOne(client, path);
//zk.setListenterThreeTwo(client, path);
zk.setListenterThreeThree(client,path);
//zk.checkNode(client, path);
//zk.createNode(client, path, "val2");
// zk.createNode(client, "/zkpath2/product6", "val6");
//zk.changeNodeData(client, path, "bocetest13");
//zk.changeNodeData(client, path, "bocetest13");
//System.out.println("del="+zk.deleteNode(client,path));
//zk.getNodeChild(client, parentPath);
zk.createModeEphSeqNode(client, "/seq/test", "sql1");
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(client !=null){
client.close();
}
}
}