一、ZkClient的客户端
优点:
zkclienrt是一个开源的框架,是在原生的zookeeper原生的API的基础上进行的包装;
内部实现了Session 超时的重连;
Wacther 的反复的注册;
1、 ZKClient启动
zkClient在启动的时候主要做了几件事:
1、指定连接的字符串zkServers ,sessionTimeout,ZkSerializer 序列化工具;
2、创建并启动 eventThread,用于接收事件,并调度事件监听器 Listener 的执行;
3、 连接到 zookeeper 服务器,同时将 ZKClient 自身作为默认的 Watcher。
2、zookeeper的CRUD
zkClient提供了相应的代理:
新增
删除:
修改:
3、zkClient的使用代码:
ZkClientCrud.java(增删改查)
public class ZkClientCrud<T> {
ZkClient zkClient ;
final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class);
public ZkClientCrud(ZkSerializer zkSerializer) {
logger.info("链接zk开始");
// zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout);
zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer);
}
public void createEphemeral(String path,Object data){
zkClient.createEphemeral(path,data);
}
/***
* 支持创建递归方式
* @param path
* @param createParents
*/
public void createPersistent(String path,boolean createParents){
zkClient.createPersistent(path,createParents);
}
/***
* 创建节点 跟上data数据
* @param path
* @param data
*/
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
/***
* 子节点
* @param path
* @return
*/
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
public T readData(String path){
return zkClient.readData(path);
}
public void writeData(String path,Object data){
zkClient.writeData(path,data);
}
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
}
测试代码:
ZkClientCrudTest.java
public class ZkClientCrudTest {
final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class);
public static void main(String[] args) {
ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer());
String path="/root";
zkClientCrud.deleteRecursive(path);
zkClientCrud.createPersistent(path,"hi");
/* zkClientCrud.createPersistent(path+"/a/b/c",true);//递归创建 但是不能设在value
//zkClientCrud.createPersistent(path,"hi");
logger.info(zkClientCrud.readData(path));
//更新
zkClientCrud.writeData(path,"hello");
logger.info(zkClientCrud.readData(path));
logger.info(String.valueOf(zkClientCrud.getChildren(path)));
//子节点
List<String> list=zkClientCrud.getChildren(path);
for(String child:list){
logger.info("子节点:"+child);
}*/
User user=new User();
user.setUserid(1);
user.setUserName("悟空");
zkClientCrud.writeData(path,user);
System.out.println(zkClientCrud.readData(path).getUserName());;
}
Watcher的测试
ZkClientWatcher.java
public class ZkClientWatcher {
ZkClient zkClient;
public ZkClientWatcher() {
zkClient= new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);
}
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
public void writeData(String path,Object object){
zkClient.writeData(path,object);
}
public void delete(String path){
zkClient.delete(path);
}
public boolean exists(String path){
return zkClient.exists(path);
}
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
//对父节点添加监听数据变化。
public void subscribe(String path){
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath,data );
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.printf("删除的节点为:%s\r\n", dataPath );
}
});
}
//对父节点添加监听子节点变化。
public void subscribe2(String path){
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("父节点: " + parentPath+",子节点:"+currentChilds+"\r\n");
}
});
}
//客户端状态
public void subscribe3(String path) {
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if(state== Watcher.Event.KeeperState.SyncConnected){
//当我重新启动后start,监听触发
System.out.println("连接成功");
}else if(state== Watcher.Event.KeeperState.Disconnected){
System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
}else
System.out.println("其他状态"+state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("重建session");
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
}
});
}
/* @Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
}*/
}
watcher的测试代码:
public class ZkClientWatcherTest {
public static void main(String[] args) throws InterruptedException {
ZkClientWatcher zkClientWatche=new ZkClientWatcher();
String path="/root";
zkClientWatche.deleteRecursive(path);
zkClientWatche.createPersistent(path,"hello");
zkClientWatche.subscribe(path);
zkClientWatche.subscribe2(path);
// zkClientWatche.subscribe3(path);//需要启服务
// Thread.sleep(Integer.MAX_VALUE);
zkClientWatche.createPersistent(path+"/root2","word");
TimeUnit.SECONDS.sleep(1);
zkClientWatche.writeData(path,"hi");
TimeUnit.SECONDS.sleep(1);
//zkClientWatche.delete(path);//如果目录下有内容 不能删除 会报 Directory not empty for /root的异常
zkClientWatche.deleteRecursive(path);
TimeUnit.SECONDS.sleep(1); //这个main线程就结束
}
}
4、maven依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
或者是:
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
二、Curator的客户端
1、maven的依赖
<dependency>
<groupId>org.apache.curator</groupId> 基础框架
<artifactId>curator-framework</artifactId>
<version></version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId> 功能 jar 分布式锁、队列等
<version></version>
</dependency>
<dependency> 客户端重试策略
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version></version>
</dependency>
2、创建 Curator 连接实例
方法一:
方法二:
CuratorFramework client=CuratorFrameworkFactory.newClient(ZookeeperUtil.connectString,new ExponentialBackoffRetry(1000, 3));
client.start();
注意:1、一个 Zookeeper 集群只需要构造一个 CuratorFramework 实例对象即可。
2、CuratorFramework 使用之前必须先调用 client.start();
3、framework增删改查:
4、代码
package com.tl.api.curator;
import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
public class CuratorCurd<T> {
//设置重置策略
RetryPolicy retryPolicy=new ExponentialBackoffRetry(2000,5);
//设置连接的字符串
String conneString="192.168.6.2:2181";
//工厂创建客户端
CuratorFramework client= CuratorFrameworkFactory.newClient(conneString,retryPolicy);
//工厂创建客户端的另一种方式
CuratorFramework client2=CuratorFrameworkFactory
.builder()
.connectString(conneString)
.connectionTimeoutMs(3000)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(3000)
.canBeReadOnly(false)
.defaultData(null)
.build();
//开启客户端
public CuratorCurd() {
client.start();
}
/**
* 在path路径下新增节点及数据
* @param path
* @param data
*/
public void create(String path, byte[] data) throws Exception {
String res = client.create().creatingParentsIfNeeded().forPath(path, data);
}
/**
* 删除path路径下的节点
* @param path
*/
public void delete(String path) throws Exception {
client.delete().forPath(path);
}
/**
* 获取path路径下的节点数据
* @param path
*/
public String getData(String path) throws Exception {
byte[] res = client.getData().forPath(path);
return new String(res);
}
/**
* 设置path路径下的节点数据
* @param path
*/
public void setData(String path,byte[] data) throws Exception {
client.setData().forPath(path,data);
}
/**
* path 路径下节点是否存在
* @param path
* stat是对znode节点的一个映射,stat=null表示节点不存在
*/
public Stat checkExists(String path) throws Exception {
Stat res = client.checkExists().forPath(path);
return res;
}
/**
* 获取子节点
* @param path
*/
public List<String> getChildren(String path) throws Exception {
List<String> pathList = client.getChildren().forPath(path);
return pathList;
}
//关闭客户端的连接
public void close() {
client.close();
}
//测试
public static void main(String[] args) {
String path="/root";
CuratorCrud curatorCrud=new CuratorCrud();
if(null!=curatorCrud.checkExists(path)) {
curatorCrud.delete(path);
System.out.println("---删除---");
}
curatorCrud.create(path+"/bbb"+"/ccc","hi".getBytes());//递归创建 可以支持赋值
List<String> list= curatorCrud.getChildren("/");//获取path下面的节点
for(String chipath:list){
System.out.println(chipath);
}
curatorCrud.setData(path,"hello".getBytes());
System.out.println(new String(curatorCrud.getData(path)));
}
}
5、三种Watcher的监听
-
Path Cache
监视一个路径下1)子结点的创建、2)删除,3)以及结点数据的更新。
产生的事件会传递给注册的PathChildrenCacheListener。 -
Node Cache
监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。 -
Tree Cache
Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
6、watcher上代码:
public class CuratorWatcherDemo {
String connectString="192.168.6.2:2181";
//设置重连策略
RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,3);
//使用工厂创建客户端
CuratorFramework client=CuratorFrameworkFactory.newClient(connectString,retryPolicy);
public CuratorWatcherDemo() {
//创建客户端的连接
client.start();
}
//为path路径下所有的子节点建立监听watcher
public void setListenterForPathChild(String path) throws Exception {
PathChildrenCache pathChildrenCache=new PathChildrenCache(client, path, true);
PathChildrenCacheListener listener =new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED : "+ data.getPath() +" 数据:"+ new String (data.getData()));
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED : "+ data.getPath() +" 数据:"+ new String (data.getData()));
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED : "+ data.getPath() +" 数据:"+ new String (data.getData()));
break;
default:
break;
}
}
};
pathChildrenCache.getListenable().addListener(listener );
pathChildrenCache.start(StartMode.POST_INITIALIZED_EVENT);;
}
//为path路径建立监听watcher不包括子节点
public void setListnerForNode(String path) throws Exception {
final NodeCache nodeCache=new NodeCache(client, path, false);
ExecutorService pool =Executors.newCachedThreadPool() ;
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
if(null!=nodeCache.getCurrentData()) {
System.out.println("path : " + nodeCache.getCurrentData().getPath());
System.out.println("data : " + new String(nodeCache.getCurrentData().getData()));
System.out.println("stat : " + nodeCache.getCurrentData().getStat());
}
}
}, pool);
nodeCache.start();
}
//监控所有的父节点及子节点变化
public void setListnerForPathAndNode(String path) throws Exception {
TreeCache treeCache=new TreeCache(client, path);
ExecutorService pool=Executors.newCachedThreadPool();
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if(null!=data) {
switch (event.getType()) {
case NODE_ADDED:
System.out.println("NODE_ADDED : "+ data.getPath() +" 数据:"+ new String(data.getData()));
break;
case NODE_REMOVED:
System.out.println("NODE_REMOVED : "+ data.getPath() +" 数据:"+ new String(data.getData()));
break;
case NODE_UPDATED:
System.out.println("NODE_UPDATED : "+ data.getPath() +" 数据:"+ new String(data.getData()));
break;
default:
break;
}
}else {
System.out.println( event.getType());
}
}
}, pool);
treeCache.start();
}
public static void main(String[] args) throws Exception {
String path = "/root";
/****============监听开启========**/
CuratorWatcherDemo curatorWatcher = new CuratorWatcherDemo();
curatorWatcher.setListenterForPathChild(path);//子节点重复监听
// curatorWatcher.setListnerForNode(path);//当前节点重复监听
// curatorWatcher.setListnerForPathAndNode(path);//当前和子节点重复监听
/****============数据更新========**/
CuratorCrud curatorCrud=new CuratorCrud();
if(null!=curatorCrud.checkExists(path)) {
curatorCrud.delete(path);
}
Thread.sleep(1000);
curatorCrud.create(path,"111".getBytes());
System.out.println("------创建目录-----");
Thread.sleep(1000);
curatorCrud.setData(path,"222".getBytes());
System.out.println("------设置目录-----");
Thread.sleep(1000);
curatorCrud.create(path+"/ccc","333".getBytes());
System.out.println("------创建子目录-----");
Thread.sleep(1000);
curatorCrud.setData(path+"/ccc","444".getBytes());
System.out.println("------设置子目录-----");
Thread.sleep(1000);
}
}