文章目录
ZooKeeper 原生支持通过注册 Watcher 来进行事件监听,但是其使用并不是特別方便, 需要开发人员自己反复注册 Watcher,比较繁琐。
Curator 引入了 Cache 来实现对 ZooKeeper服务端事件的监听。Cache 是Curator中对事件监听的包装,其对事件的监听 其实可以近似看作是 一个本地缓存视图和远程ZooKeeper视图的对比过程 。同时 Curator 能够自动为开发人员处理 反复注册监听 ,从而大大简化了原生 API 开发的繁琐过程 。
Cache 分为两类监听类型:
- NodeCache 节点监听 。监听节点的新增、修改操作
- PathChildrenCache 子节点监听。监听子节点的新增、修改、删除操作
1、依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>
2、NodeCache 节点监听
监听节点的新增、修改操作。
2.1、构造函数:
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed){
this.client = client;
this.path = path;
this.dataIsCompressed = dataIsCompressed;
ensurePath = client.newNamespaceAwareEnsurePath(path).excludingLast();
}
参数说明:
- dataIsCompressed : 数据是否压缩。与 PathChildrenCache 类的第3个参数不同。
2.2、start(boolean buildInitial) 方法:
NodeCache.start(buildInitial);
默认是false。如果设置为true,表示 NodeCache 在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的数据内容,并保存在 Cache 中。 所以一般使用时设置为 true 。
2.3、示例代码
package com.aop8.curator.watcher;
/**
* NodeCache
*<pre>
* 节点监听 。监听节点的新增、修改操作。
*</pre>
*/
public class CuratorWatcher1 {
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
//1 重试策略:初次间隔时间为1s ,重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
//3 建立连接
cf.start();
//4 建立一个cache缓存
// NodeCache: 第3个参数:是否压缩。与PathChildrenCache类的第3个参数不同
final NodeCache cache = new NodeCache(cf, "/super", false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
/**
* 重写nodeChanged() 方法
*/
@Override
public void nodeChanged() throws Exception {
System.out.println("路径为:" + cache.getCurrentData().getPath());
System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
System.out.println("状态为:" + cache.getCurrentData().getStat());
System.out.println("---------------------------------------");
}
});
Thread.sleep(1000);
cf.create().forPath("/super", "123".getBytes());
Thread.sleep(1000);
cf.setData().forPath("/super", "456".getBytes());
Thread.sleep(1000);
cf.delete().forPath("/super");
Thread.sleep(Integer.MAX_VALUE);
}
}
代码分析:
NodeCache cache = new NodeCache(cf, "/super", false);
单独实例化一个节点缓存,并不是在 cf (CuratorFramework) 上创建的监听。
运行结果:
路径为:/super
数据为:123
状态为:647,647,1557581986843,1557581986843,0,0,0,0,3,0,647
---------------------------------------
路径为:/super
数据为:456
状态为:647,648,1557581986843,1557581987980,1,0,0,0,3,0,647
---------------------------------------
运行结果分析:
运行结果是 节点创建、节点的修改。
对于节点的删除,没有打印。
3、PathChildrenCache 子节点监听
监听子节点的新增、修改、删除操作。
3.1、构造函数
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData){
....
}
参数说明:
- cacheData :用于配置是否把节点内容缓存起来。如果配置为 true,那么客户 端在接收到节点列表变更的同时,也能够获取到节点的数据内容; 如果配置为 false,则无法获取到节点的数据内容。
3.2、PathChildrenCache .start(StartMode mode)
- BUILD_INITIAL_CACHE // 同步初始化客户端的 cache,及创建 cache 后,就从服务器端拉入对应的数据
- NORMAL // 异步初始化 cache
- POST_INITIALIZED_EVENT // 异步初始化,初始化完成触发事件。如果要启用监听,必须使用此参数。
3.3、 示例代码
package com.aop8.curator.watcher;
import com.aop8.CommonsUtils;
/**
* PathChildrenCache
* <pre>
* 子节点监听。监听子节点的新增、修改、删除操作。
* </pre>
*/
public class CuratorWatcher2 {
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms
public static void main(String[] args) throws Exception {
//1 重试策略:初次间隔时间为1s,重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
.build();
//3 建立连接
cf.start();
//4 建立一个PathChildrenCache缓存,
//PathChildrenCache: 第3个参数为是否接受节点数据内容。正常情况下都为true。与 NodeCache 类的第3个参数不同。
PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
//5 在初始化的时候就进行缓存监听
cache.start(StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
/**
* 重写 childEvent() 方法
*/
@Override
public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED :" + event.getData().getPath());
System.out.println("add data : " + new String(event.getData().getData()));
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED :" + event.getData().getPath());
System.out.println("update data : " + new String(event.getData().getData()));
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED :" + event.getData().getPath());
System.out.println("remove data : " + new String(event.getData().getData()));
break;
default:
break;
}
}
});
//创建本身节点不发生变化
cf.create().forPath("/super", "init".getBytes());
//添加子节点
Thread.sleep(1000);
cf.create().forPath("/super/c1", "c1内容".getBytes());
Thread.sleep(1000);
cf.create().forPath("/super/c2", "c2内容".getBytes());
//修改子节点
Thread.sleep(1000);
cf.setData().forPath("/super/c1", "c1更新内容".getBytes());
//删除子节点
Thread.sleep(1000);
cf.delete().forPath("/super/c2");
//删除本身节点
Thread.sleep(2000);
cf.delete().deletingChildrenIfNeeded().forPath("/super");
//Thread.sleep(Integer.MAX_VALUE);
}
}
代码说明:
PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
启动一个独立的 子节点缓存。独立的 是指 与cf (CuratorFramework)是分开的, 不是在 cf 建立的监听。
第3个参数前面讲过,只有把节点内容缓存起来, 然后监听到数据的变化 。 否则无法监听到数据的变化。
运行结果:
CHILD_ADDED :/super/c1
add data : c1内容
CHILD_ADDED :/super/c2
add data : c2内容
CHILD_UPDATED :/super/c1
update data : c1更新内容
CHILD_REMOVED :/super/c2
remove data : c2内容
CHILD_REMOVED :/super/c1
remove data : c1更新内容