zkclient
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency>
public class ZkClientApiOperatorDemo {
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";
private static ZkClient getInstance(){
return new ZkClient(CONNECTSTRING,10000);
}
public static void main(String[] args) throws InterruptedException {
ZkClient zkClient=getInstance();
//zkclient 提供递归创建父节点的功能
/* zkClient.createPersistent("/zkclient/zkclient1/zkclient1-1/zkclient1-1-1",true);
System.out.println("success");*/
//递归删除节点
// zkClient.deleteRecursive("/zkclient");
//获取子节点
List<String> list=zkClient.getChildren("/node");
System.out.println(list);
//watcher
//提供订阅事件
zkClient.subscribeDataChanges("/node", new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("节点名称:"+s+"->节点修改后的值"+o);
}
@Override
public void handleDataDeleted(String s) throws Exception {
}
});
zkClient.writeData("/node","node");
TimeUnit.SECONDS.sleep(2);
zkClient.subscribeChildChanges("/node", new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> list) throws Exception {
}
});
}
}
curator
Curator本身是Netflix公司开源的zookeeper客户端;
curator提供了各种应用场景的实现封装
curator-framework 提供了fluent风格api
curator-replice 提供了实现封装
curator连接的重试策略
ExponentialBackoffRetry() 衰减重试
RetryNTimes 指定最大重试次数
RetryOneTime 仅重试一次
RetryUnitilElapsed 一直重试知道规定的时间
导入jar
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.11.0</version> </dependency>
public static void main(String[] args) {
//创建会话的两种方式 normal
CuratorFramework curatorFramework= CuratorFrameworkFactory.
newClient(CONNECTSTRING,5000,5000,
new ExponentialBackoffRetry(1000,3));
curatorFramework.start(); //start方法启动连接
//fluent风格 链式操作
CuratorFramework curatorFramework1=CuratorFrameworkFactory.builder()
.connectString(CONNECTSTRING)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.namespace("/curator")//命名空间节点 后期该链接的操作都在/curator节点下
.build();
curatorFramework1.start();
System.out.println("success");
}
通过curator客户端操作API
public static void main(String[] args) throws InterruptedException {
CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
System.out.println("连接成功.........");
//fluent风格
//curatorFramework的创建、删除、get、set 方法返回的都是builder 对象
/**
* 创建节点
*/
/* try {
String result=curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).
forPath("/curator/curator1/curator11","123".getBytes());
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 删除节点
*/
/*try {
//默认情况下,version为-1
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/node11");
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 查询
*/
/*Stat stat=new Stat();
try {
byte[] bytes=curatorFramework.getData().storingStatIn(stat).forPath("/curator");//storingStatIn获取节点的状态信息
System.out.println(new String(bytes)+"-->stat:"+stat);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 更新
*/
/* try {
Stat stat=curatorFramework.setData().forPath("/curator","123".getBytes());
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 异步操作
*/
/*ExecutorService service= Executors.newFixedThreadPool(1);
CountDownLatch countDownLatch=new CountDownLatch(1);
try {
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(Thread.currentThread().getName()+"->resultCode:"+curatorEvent.getResultCode()+"->"
+curatorEvent.getType());
countDownLatch.countDown();
}
},service).forPath("/mic","123".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.await();
service.shutdown();*/
/**
* 事务操作(curator独有的)
*/
try {
Collection<CuratorTransactionResult> resultCollections=curatorFramework.inTransaction().create().forPath("/trans","111".getBytes())
.and()//通过and将create和setData 绑定到一个事务中
.setData().forPath("/curator","111".getBytes())
.and().commit();
for (CuratorTransactionResult result:resultCollections){
System.out.println(result.getForPath()+"->"+result.getType());
}
} catch (Exception e) {
e.printStackTrace();
}
}
curator提供了三种注册到节点的监听器watcher
1pathcache 2 nodecache 3 treecache
/**
* 三种watcher来做节点的监听
* pathcache 监视一个路径下子节点的创建、删除、节点数据更新
* NodeCache 监视一个节点的创建、更新、删除
* TreeCache pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
* 缓存路径下的所有子节点的数据
*/
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
/**
* 节点变化NodeCache false 对缓存的结果是否压缩
*/
// NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
// cache.start(true);
//
// cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
// ":"+new String(cache.getCurrentData().getData())));
//
// curatorFramework.setData().forPath("/curator","菲菲".getBytes());
/**
* PatchChildrenCache
*/
PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT
cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED:
System.out.println("增加子节点");
break;
case CHILD_REMOVED:
System.out.println("删除子节点");
break;
case CHILD_UPDATED:
System.out.println("更新子节点");
break;
default:break;
}
});
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("1");
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("2");
curatorFramework.setData().forPath("/event/event1","222".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("3");
curatorFramework.delete().forPath("/event/event1");
System.out.println("4");
System.in.read();
}