Curator 监听

ZooKeeper 原生支持通过注册 Watcher 来进行事件监听,但是其使用并不是特別方便, 需要开发人员自己反复注册 Watcher,比较繁琐。

Curator 引入了 Cache 来实现对 ZooKeeper服务端事件的监听。Cache 是Curator中对事件监听的包装,其对事件的监听 其实可以近似看作是 一个本地缓存视图和远程ZooKeeper视图的对比过程 。同时 Curator 能够自动为开发人员处理 反复注册监听 ,从而大大简化了原生 API 开发的繁琐过程 。

Cache 分为两类监听类型:

  • NodeCache 节点监听 。监听节点的新增、修改操作
  • PathChildrenCache 子节点监听。监听子节点的新增、修改、删除操作

1、依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.4.2</version>
</dependency>

2、NodeCache 节点监听

监听节点的新增、修改操作。

2.1、构造函数:

public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed){
	this.client = client;
	this.path = path;
	this.dataIsCompressed = dataIsCompressed;
	ensurePath = client.newNamespaceAwareEnsurePath(path).excludingLast();
}

参数说明:

  • dataIsCompressed : 数据是否压缩。与 PathChildrenCache 类的第3个参数不同。

2.2、start(boolean buildInitial) 方法:

NodeCache.start(buildInitial); 默认是false。如果设置为true,表示 NodeCache 在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的数据内容,并保存在 Cache 中。 所以一般使用时设置为 true 。

2.3、示例代码

package com.aop8.curator.watcher;

/**
 * NodeCache
 *<pre> 
 *	节点监听 。监听节点的新增、修改操作。
 *</pre>
 */
public class CuratorWatcher1 {
	
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初次间隔时间为1s ,重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString("127.0.0.1:2181")
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		
		//3 建立连接
		cf.start();
		
		//4 建立一个cache缓存
		// NodeCache: 第3个参数:是否压缩。与PathChildrenCache类的第3个参数不同
		final NodeCache cache = new NodeCache(cf, "/super", false);
		cache.start(true);
		
		cache.getListenable().addListener(new NodeCacheListener() {
			/**
			 * 重写nodeChanged() 方法
			 */
			@Override
			public void nodeChanged() throws Exception {
				System.out.println("路径为:" + cache.getCurrentData().getPath());
				System.out.println("数据为:" + new String(cache.getCurrentData().getData()));
				System.out.println("状态为:" + cache.getCurrentData().getStat());
				System.out.println("---------------------------------------");
			}
		});
		
		Thread.sleep(1000);
		cf.create().forPath("/super", "123".getBytes());
		
		Thread.sleep(1000);
		cf.setData().forPath("/super", "456".getBytes());
		
		Thread.sleep(1000);
		cf.delete().forPath("/super");
		
		Thread.sleep(Integer.MAX_VALUE);
	}
}

代码分析:

NodeCache cache = new NodeCache(cf, "/super", false); 单独实例化一个节点缓存,并不是在 cf (CuratorFramework) 上创建的监听。

运行结果:

路径为:/super
数据为:123
状态为:647,647,1557581986843,1557581986843,0,0,0,0,3,0,647

---------------------------------------
路径为:/super
数据为:456
状态为:647,648,1557581986843,1557581987980,1,0,0,0,3,0,647

---------------------------------------

运行结果分析:

运行结果是 节点创建、节点的修改。

对于节点的删除,没有打印。

3、PathChildrenCache 子节点监听

监听子节点的新增、修改、删除操作。

3.1、构造函数

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData){
	....
}

参数说明:

  • cacheData :用于配置是否把节点内容缓存起来。如果配置为 true,那么客户 端在接收到节点列表变更的同时,也能够获取到节点的数据内容; 如果配置为 false,则无法获取到节点的数据内容。

3.2、PathChildrenCache .start(StartMode mode)

  • BUILD_INITIAL_CACHE // 同步初始化客户端的 cache,及创建 cache 后,就从服务器端拉入对应的数据
  • NORMAL // 异步初始化 cache
  • POST_INITIALIZED_EVENT // 异步初始化,初始化完成触发事件。如果要启用监听,必须使用此参数。

3.3、 示例代码

package com.aop8.curator.watcher;

import com.aop8.CommonsUtils;


/**
 * PathChildrenCache
 * <pre>
 * 子节点监听。监听子节点的新增、修改、删除操作。
 * </pre>
 */
public class CuratorWatcher2 {	
	
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初次间隔时间为1s,重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString("127.0.0.1:2181")
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		
		//3 建立连接
		cf.start();
		
		//4 建立一个PathChildrenCache缓存,
		//PathChildrenCache: 第3个参数为是否接受节点数据内容。正常情况下都为true。与 NodeCache 类的第3个参数不同。
		
		PathChildrenCache cache = new PathChildrenCache(cf, "/super", true);
		
		//5 在初始化的时候就进行缓存监听
		cache.start(StartMode.POST_INITIALIZED_EVENT);
		cache.getListenable().addListener(new PathChildrenCacheListener() {
			/**
			 * 重写 childEvent() 方法
			 */
			@Override
			public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception {
				switch (event.getType()) {
				case CHILD_ADDED:
					System.out.println("CHILD_ADDED :" + event.getData().getPath());
					System.out.println("add data : " + new String(event.getData().getData()));
					break;
				case CHILD_UPDATED:
					System.out.println("CHILD_UPDATED :" + event.getData().getPath());
					System.out.println("update data : " + new String(event.getData().getData()));
					break;
				case CHILD_REMOVED:
					System.out.println("CHILD_REMOVED :" + event.getData().getPath());
					System.out.println("remove data : " + new String(event.getData().getData()));
					break;
				default:
					break;
				}
			}
		});

		//创建本身节点不发生变化
		cf.create().forPath("/super", "init".getBytes());
		
		//添加子节点
		Thread.sleep(1000);
		cf.create().forPath("/super/c1", "c1内容".getBytes());
		Thread.sleep(1000);
		cf.create().forPath("/super/c2", "c2内容".getBytes());
		
		//修改子节点
		Thread.sleep(1000);
		cf.setData().forPath("/super/c1", "c1更新内容".getBytes());
		
		//删除子节点
		Thread.sleep(1000);
		cf.delete().forPath("/super/c2");		
		
		//删除本身节点
		Thread.sleep(2000);
		cf.delete().deletingChildrenIfNeeded().forPath("/super");
		
		//Thread.sleep(Integer.MAX_VALUE);		

	}
}

代码说明:

PathChildrenCache cache = new PathChildrenCache(cf, "/super", true); 启动一个独立的 子节点缓存。独立的 是指 与cf (CuratorFramework)是分开的, 不是在 cf 建立的监听。

第3个参数前面讲过,只有把节点内容缓存起来, 然后监听到数据的变化 。 否则无法监听到数据的变化。

运行结果:

CHILD_ADDED :/super/c1
add data : c1内容
CHILD_ADDED :/super/c2
add data : c2内容
CHILD_UPDATED :/super/c1
update data : c1更新内容
CHILD_REMOVED :/super/c2
remove data : c2内容
CHILD_REMOVED :/super/c1
remove data : c1更新内容

猜你喜欢

转载自blog.csdn.net/xiaojin21cen/article/details/90116224