在我们实际的生产环境中,zookpeeper是一个很关键的组件,因为zk是用来注册发现服务的服务,zk集群的稳定性与可靠性直接决定了其他服务的稳定。
ZkClient是一个开源的ZooKeeper客户端,是在原生的ZooKeeper API接口之上进行包装,是一个更易使用的ZooKeeper客户端。ZkClient在内部实现了Session超时重连、Watcher反复注册等功能,使得ZooKeeper客户端的繁琐细节对开发人员透明。
实际项目中,我们就使用ZkClient来对zk进行相关操作。为了方便测试,特意在本地开启了一个zk的server并使用zkclient来进行操作。
1.安装Zookeeper并启动服务
首先从网上下载到zookeeper的压缩包,然后解压。
cd到解压的目录以后,bin目录里有zkServer.sh,然后执行./zkServer.sh start
,zk服务就已经在本机启动。
如果出现如下页面,则表明此时zk服务已正常启动。
2.用ZkClient监控子节点的变化情况
b按惯例,直接上代码。
public static void subchild() throws InterruptedException {
ZkClient zkClient = new ZkClient("localhost:2181", 5000);
String path = "/mipush";
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath + "'s child changed, currentChilds: " + currentChilds);
}
});
if(zkClient.exists(path)) {
zkClient.deleteRecursive(path);
// zkClient.delete(path);
System.out.println("delete " + path + " successful");
}
zkClient.createPersistent(path);
Thread.sleep(1000);
System.out.println("now child path is: " + zkClient.getChildren(path));
zkClient.createPersistent(path + "/p1");
Thread.sleep(1000);
System.out.println("now child path is: " + zkClient.getChildren(path));
zkClient.createPersistent(path + "/p2", "abc");
Thread.sleep(1000);
System.out.println("now child path is: " + zkClient.getChildren(path));
}
将上部分代码成功运行起来以后,结果如下:
Node /mipush deleted.
Node /mipush changed, new data: null
/mipush's child changed, currentChilds: []
now child path is: []
/mipush's child changed, currentChilds: [p1]
now child path is: [p1]
/mipush's child changed, currentChilds: [p1, p2]
now child path is: [p1, p2]
3.用ZkClient监控节点数据变化情况
public static void changedata() throws InterruptedException {
ZkClient zkClient = new ZkClient("localhost:2181", 5000);
String path = "/mipush";
if(zkClient.exists(path)) {
zkClient.deleteRecursive(path);
// zkClient.delete(path);
System.out.println("delete " + path + " successful");
}
zkClient.createPersistent(path, "abc");
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("Node " + dataPath + " changed, new data: " + data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("Node " + dataPath + " deleted.");
}
});
// 读取节点数据
System.out.println("now data is: " + zkClient.readData(path).toString());
zkClient.writeData(path, "def");
Thread.sleep(1000);
System.out.println("Node exists: " + zkClient.exists(path));
zkClient.delete(path);
System.out.println("Node exists: " + zkClient.exists(path));
}
运行结果如下:
delete /mipush successful
now data is: abc
Node /mipush changed, new data: def
Node exists: true
Node exists: false