1、Curator简介
Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。通过查看官方文档,可以发现Curator主要解决了三类问题:
1、封装ZooKeeper client与ZooKeeper server之间的连接处理
2、提供了一套Fluent风格的操作API
3、提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装
Curator主要从以下几个方面降低了zk使用的复杂性:
重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)
连接状态监控: Curator初始化之后会一直对zk连接进行监听,一旦发现连接状态发生变化将会作出相应的处理
zk客户端实例管理:Curator会对zk客户端到server集群的连接进行管理,并在需要的时候重建zk实例,保证与zk集群连接的可靠性
各种使用场景支持:Curator实现了zk支持的大部分使用场景(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况
2、工程中.pom文件引入curator依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
3、使用Curator客户端连接zookeeper,实现对zk节点的创建、修改、删除操作
public static void main(String[] args) throws Exception {
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString("192.168.1.102:2181,192.168.1.102:2182,192.168.1.102:2183")//zk服务器地址
.sessionTimeoutMs(4000)// 超市时间
.retryPolicy(new ExponentialBackoffRetry(1000, 3))// 重试策略:初试时间为1s 重试10次
.namespace("curator-demo")//命名空间
.build();
cf.start();
System.out.println(States.CONNECTED);
System.out.println(cf.getState());
//创建持久化节点
cf.create().forPath("/che","123".getBytes());
//创建持久化有序节点
cf.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/che_seq","che_seq data".getBytes());
//创建临时节点
cf.create().withMode(CreateMode.EPHEMERAL)
.forPath("/che/tmp1","tmp1data01".getBytes());
//创建临时有序节点
cf.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath("/che/tmp2","tmp2data".getBytes());
cf.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/che/tmp3","tmp3data".getBytes());
//测试检查某个节点是否存在
Stat stat1 = cf.checkExists().forPath("/che");
Stat stat2 = cf.checkExists().forPath("/che3");
System.out.println("'/che'是否存在: " + (stat1 != null ? true : false));
System.out.println("'/che3'是否存在: " + (stat2 != null ? true : false));
//获取某个节点的所有子节点
System.out.println(cf.getChildren().forPath("/"));
//获取某个节点数据
System.out.println(new String(cf.getData().forPath("/che")));
//设置某个节点数据
cf.setData().forPath("/che","new123".getBytes());
//创建测试节点
cf.create().orSetData().creatingParentContainersIfNeeded()
.forPath("/che/che005","che005data".getBytes());
cf.create().orSetData().creatingParentContainersIfNeeded()
.forPath("/che/che006","che006data".getBytes());
cf.create().forPath("/che/che007/che007001","che007001data".getBytes());
//删除该节点
cf.delete().forPath("/che/che005");
//级联删除子节点
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/che/che007");
}
orSetData()方法:如果节点存在则Curator将会使用给出的数据设置这个节点的值,相当于 setData() 方法
creatingParentContainersIfNeeded()方法:如果指定节点的父节点不存在,则Curator将会自动级联创建父节点
guaranteed()方法:如果服务端可能删除成功,但是client没有接收到删除成功的提示,Curator将会在后台持续尝试删除该节点
deletingChildrenIfNeeded()方法:如果待删除节点存在子节点,则Curator将会级联删除该节点的子节点
4、Curator客户端Watcher事件示例
public class CuratorWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString("192.168.1.102:2181,192.168.1.102:2182,192.168.1.102:2183")//zk服务器地址
.sessionTimeoutMs(4000)// 超市时间
.retryPolicy(new ExponentialBackoffRetry(1000, 3))// 重试策略:初试时间为1s 重试10次
.namespace("curator-demo")//命名空间
.build();
cf.start();
System.out.println(States.CONNECTED);
System.out.println(cf.getState());
// PathChildCache 监听一个节点下子节点的创建、删除、修改事件
addListenerWithNodeChildCache(cf, "/che-test01");
// NodeCache 监听一个节点的修改和创建事件
addListenerWithNodeCache(cf, "/che-test01");
// TreeCache 监听所有节点的创建、删除、修改事件
addListenerWithTreeCache(cf, "/che-test01");
System.in.read();
}
/**
* PathChildCache 监听一个节点下子节点的创建、删除、修改事件
* @param cf
* @param path
* @throws Exception
*/
public static void addListenerWithNodeCache(CuratorFramework cf,String path) throws Exception {
@SuppressWarnings("resource")
final NodeCache nodeCache = new NodeCache(cf, path, false);
NodeCacheListener cacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("监听到节点事件:" + " - " + nodeCache.getCurrentData().getPath());
}
};
nodeCache.getListenable().addListener(cacheListener);
nodeCache.start();
}
/**
* NodeCache 监听一个节点的修改和创建事件
* @param cf
* @param path
* @throws Exception
*/
public static void addListenerWithNodeChildCache(CuratorFramework cf,String path) throws Exception {
@SuppressWarnings("resource")
final PathChildrenCache pathChildrenCache = new PathChildrenCache(cf, path, true);
PathChildrenCacheListener cacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("监听到子节点事件:" + event.getType());
}
};
pathChildrenCache.getListenable().addListener(cacheListener);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
}
/**
* TreeCache 监听所有节点的创建、删除、修改事件
* @param cf
* @param path
* @throws Exception
*/
public static void addListenerWithTreeCache(CuratorFramework cf,String path) throws Exception {
@SuppressWarnings("resource")
final TreeCache treeCache = new TreeCache(cf, path);
TreeCacheListener cacheListener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("监听到节点事件:" + event.getType() + " - " + event.getData().getPath());
}
};
treeCache.getListenable().addListener(cacheListener);
treeCache.start();
}
}