curator简介
Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,提供了各种应用场景的实现封装,fluent风格的API增加了代码的优雅性,简化了Zookeeper客户端的开发量。
public class CuratorCreateSessionDemo {
private final static String CONNECTSTRING="192.168.74.131:2181,192.168.74.132:2181,192.168.74.133:2181";
public static void main(String[] args) {
//创建会话的两种方式
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000,5000,new ExponentialBackoffRetry(1000, 3));//ExponentialBackoffRetry 衰减重试
curatorFramework.start();
//fluent风格
CuratorFramework curatorFramework2 = CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
curatorFramework2.start();
System.out.println("success");
}
}
curator创建 删除 查询 更新 异步 事务(事务是curator独有的)代码:
public class CuratorOperatorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
System.out.println("连接成功....");
// flunt风格
/**
* 创建节点 可以递归创建
*/
/*
* try { String result =
* curatorFramework.create().creatingParentsIfNeeded().withMode(
* CreateMode.PERSISTENT).forPath("/c/cc/ccc", "123".getBytes()); }
* catch (Exception e) { // TODO: handle exception }
*/
/**
* 删除节点 可以递归删除
*/
/*
* try {
* curatorFramework.delete().deletingChildrenIfNeeded().forPath("/c"); }
* catch (Exception e) { // TODO: handle exception }
*/
/**
* 查询
*/
/*
* try { Stat stat = new Stat(); byte[]
* bytes=curatorFramework.getData().storingStatIn(stat).forPath(
* "/zkclient"); System.out.println(new String(bytes)+"-->stat:"+stat);
* } catch (Exception e) { }
*/
/**
* 更新
*/
/*
* try { Stat stat = curatorFramework.setData().forPath("/zookeeper",
* "666".getBytes()); byte[]
* bytes=curatorFramework.getData().storingStatIn(stat).forPath(
* "/zkclient"); System.out.println(new String(bytes));
* System.out.println("stat:"+stat); } catch (Exception e) { // TODO:
* handle exception }
*/
/**
* 异步操作 创建节点与set值异步进行
*/
/*ExecutorService service = Executors.newFixedThreadPool(1);
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
System.out.println(1);
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
throws Exception {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName() + "->resultCode:"
+ curatorEvent.getResultCode() + "->" + curatorEvent.getType());
countDownLatch.countDown();
}
},service).forPath("/b","111".getBytes());
} catch (Exception e) {
}
countDownLatch.await();
service.shutdown();*/
/**
* 事务操作
*/
/*try {
Collection<CuratorTransactionResult> resultCollections=curatorFramework.inTransaction().create().forPath("/trans","111".getBytes()).and().
setData().forPath("/bzy","111".getBytes()).and().commit();
for (CuratorTransactionResult result:resultCollections){
System.out.println(result.getForPath()+"->"+result.getType());
}
} catch (Exception e) {
e.printStackTrace();
}*/
}
}
curator创建监听
public class CuratorEventDemo {
/**
* 三种watcher来做节点监听
* pathcache 监视一个路径下子节点的创建 删除 数据更新
* nodecache 监视一个节点的创建 更新 删除
* treecache pathcache+nodecache的合体(监视路径下的创建 更新 删除)
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
/**
* 节点变化nodecache
*/
NodeCache nodeCache = new NodeCache(curatorFramework,"/zkclient",false);
nodeCache.start(true);
nodeCache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
":"+new String(nodeCache.getCurrentData().getData())));
curatorFramework.setData().forPath("/zkclient","bzy666".getBytes());
/**
* pathcache 子节点数据变化
*/
PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/zkclient",true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT
cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED:
System.out.println("增加子节点");
break;
case CHILD_REMOVED:
System.out.println("删除子节点");
break;
case CHILD_UPDATED:
System.out.println("更新子节点");
break;
default:break;
}
});
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/zkclient","zkclient".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("1");
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/zkclient/zkclient1","1".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("2");
curatorFramework.setData().forPath("/zkclient/zkclient1","222".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("3");
curatorFramework.delete().forPath("/zkclient/zkclient1");
System.out.println("4");
System.in.read();
}
}