在Zookeeper客户端(四)—— Curator 中,我们介绍了Curator客户端的节点操作,以及异步操作、事务操作,但是我们没有介绍在Curator如何监听节点的变化,和ZkClient类似,为了避免反复监听,Curator也是为我们提供了监听器,Cache是curator中对事件监听的包装,对事件的监听可以近似看做是本地缓存视图和远程zookeeper视图的对比过程
- NodeCache: 节点缓存用于处理节点本身的变化 ,回调接口NodeCacheListener
- PathChildrenCache: 子节点缓存用于处理节点的子节点变化,回调接口PathChildrenCacheListener
- TreeCache: NodeCache和PathChildrenCache的结合体,回调接口TreeCacheListener
NodeCache
首先我们先来看一看NodeCache,我们按照上次介绍的Curator连接Zookeeper的步骤,首先先去连接Zookeeper,然后我们新增一个监听器,如下:
public class CuratorTest {
private static final String CONNENT_ADDR = "192.168.80.130:2181";
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNENT_ADDR)
.connectionTimeoutMs(5000) //连接超时时间
.sessionTimeoutMs(5000) //会话超时时间
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
final NodeCache cache = new NodeCache(curatorFramework, "/node1", false);
cache.start(); //需要调用nodeCache的start方法能进行缓存和事件监听
cache.getListenable().addListener(new NodeCacheListener() {
//监听节点变化
@Override
public void nodeChanged() throws Exception {
System.out.println("路径为:" + cache.getCurrentData().getPath());
System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
System.out.println("状态为:" + cache.getCurrentData().getStat());
}
});
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/node1", "value".getBytes());
Thread.sleep(1000);
}
}
第一个参数就是传入创建的Curator的框架客户端,第二个参数就是监听节点的路径,第三个重载参数dataIsCompressed表示是否对数据进行压缩。最后就是必须调用NodeCache的start方法能进行缓存和事件监听。
这里我们发现,如果NodeCache监听的节点为空(也就是说传入的路径不存在)。那么如果我们后面创建了对应的节点,也是会触发事件从而回调nodeChanged方法。
另外启动节点的事件监听方法cache.start(),还可以传递参数,其默认为false,如下
void start(boolean buildInitial) //true代表缓存当前节点
唯一的一个参数buildInitial代表着是否将该节点的数据立即进行缓存。如果设置为true的话,在start启动时立即调用NodeCache的getCurrentData方法就能够得到对应节点的信息ChildData类,如果设置为false的就得不到对应的信息。
如上在测试false时,尽量在前面添加一个休眠时间,不然可能得不出期待的结果,因为可能间隔时间太短只会触发一次事件。
PathChildrenCache
在了解了Curator监听节点变化后,我们在来看一看Curator如何监听子节点的新增、修改、删除操作
public class CuratorTest {
private static final String CONNENT_ADDR = "192.168.80.130:2181";
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNENT_ADDR)
.connectionTimeoutMs(5000) //连接超时时间
.sessionTimeoutMs(5000) //会话超时时间
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
final PathChildrenCache cache = new PathChildrenCache(curatorFramework, "/node1", true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//只有POST_INITIALIZED_EVENT模式才会启用监听功能
cache.getListenable().addListener(new PathChildrenCacheListener() {
//监听子节点的变化
@Override
public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED: //新增
System.out.println("新增子节点路径:" + event.getData().getPath());
System.out.println("新增子节点数据:" + new String(event.getData().getData()));
break;
case CHILD_UPDATED: //更新
System.out.println("更新子节点路径:" + event.getData().getPath());
System.out.println("更新子节点数据:" + new String(event.getData().getData()));
break;
case CHILD_REMOVED: //移除
System.out.println("移除子节点路径:" + event.getData().getPath());
System.out.println("移除子节点数据:" + new String(event.getData().getData()));
break;
default:
break;
}
}
});
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/node1/n1", "v1".getBytes());
Thread.sleep(2000);
}
}
这里我们稍微注意一下,就是下图中的第三个参数cacheData,和我们上述介绍的NodeCache是不一样的,这里表示是否把节点内容缓存起来。如果cacheData为true,那么接收到节点列表变更事件的同时,会将获得节点内容。
另外就是在创建完PathChildrenCacheListener的实例之后,需要将这个实例注册到PathChildrenCache缓存实例,使用缓存实例的addListener方法。 然后使用缓存实例nodeCache的start方法,启动节点的事件监听。
这里的start方法,需要传入启动的模式。可以传入三种模式:
-
NORMAL: 异步初始化cache
启动时,异步初始化cache,完成后不会发出通知。 -
BUILD_INITIAL_CACHE: 同步初始化cache
启动时,同步初始化cache,以及创建cache后,就从服务器拉取对应的数据。 -
POST_INITIALIZED_EVENT: 异步初始化cache,并触发完成事件
启动时,异步初始化cache,初始化完成触发PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener会收到该事件的通知。
另外这里PathChildrenCache监听子节点的新增、修改、删除操作,基本和ZkClient类似,也是
- 只能监听子节点,监听不到当前节点
- 不能递归监听,子节点下的子节点不能递归监控
但是这里有一点细微的区别,在ZkClient中,我们在操作子节点下的子节点,我们的监听器是不会有任何触发事件的,但是在Curator我们在操作子节点下的子节点,监听器是会有所感知的,只是其取不到子节点下的子节点的信息而已,如下:
TreeCache
上述我们知道了NodeCache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器,监听器对应的接口为NodeCacheListener。
PathChildrenCache子节点缓存用来观察ZNode的子节点、并缓存子节点的状态,如果ZNode的子节点被创建,更新或者删除,那么PathChildrenCache会更新缓存,并且触发事件给注册的监听器,监听器对应的接口为PathChildrenCacheListener。
TreeCache可以看做是上两种的合体,TreeCache观察的是当前ZNode节点的所有数据。TreeCache不光能监听子节点,也能监听节点自身。
其用法与PathChildrenCache基本类似,只是对应于节点的增加、修改、删除,TreeCache 的事件类型的方法名有所不同