Java操作Zookeeper
本文是上一篇文章的后续,详情点击该链接~
创建Zookeeper结点
首先我们导入依赖
<!-- zookeeper依赖 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version> </dependency>
public class Test implements Watcher {
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//创建链接zookeeper对象
ZooKeeper zooKeeper = new ZooKeeper
// 192.168.147.128指的是Linux地址,后面的2181指的是zookeeper端口
//那个150000指的是给zookeeper链接的一个超时时间。单位是毫秒,也就是15秒
//第三个参数指的就是事件通知处理器,选中的那个类需要实现Watcher接口
("192.168.147.128:2181",150000,new Test());
//创建一个zNode
String path = zooKeeper.create("/alvin","huang".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println(path);
}
//事件通知回调方法
@Override
public void process(WatchedEvent event) {
//获取链接事件
if(event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
}
}
}
假如我们 ip 和 端口号都是正确的,依然链接不上,报出以下错误怎么办? 很简单,我们可以输入Linux命令关闭防火墙
systemctl status firewalld
service firewalld status
查看防火墙状态
service stop firewalld
service firewalld stop
关闭防火墙
就可以了~
获取Znode中的数据
获取指定节点的数据
byte[] data= zooKeeper.getData("/alvin0000000003/bob",new Test2(),new Stat());
System.out.println(data);
获取所有子结点数据
public class Test2 implements Watcher {
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//创建链接zookeeper对象
ZooKeeper zooKeeper = new ZooKeeper
// 192.168.147.128指的是Linux地址,后面的2181指的是zookeeper端口
//那个150000指的是给zookeeper链接的一个超时时间。单位是毫秒,也就是15秒
//第三个参数指的就是事件通知处理器,选中的那个类需要实现Watcher接口
("192.168.147.128:2181",150000,new Test2());
//获取所有子节点中的数据
List<String> list=zooKeeper.getChildren("/alvin",new Test2());
byte[] data = new byte[1024];
for(String path : list){
data = zooKeeper.getData("/alvin/" + path,new Test2(),null);
}
System.out.println(data);
}
//事件通知回调方法
@Override
public void process(WatchedEvent event) {
//获取链接事件
if(event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
}
}
}
设置Znode中的值
Stat stat = zooKeeper.setData("/alvin","huang".getBytes(),-1);
System.out.println(stat);
删除Znode
public class Test2 implements Watcher {
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//创建链接zookeeper对象
ZooKeeper zooKeeper = new ZooKeeper
// 192.168.147.128指的是Linux地址,后面的2181指的是zookeeper端口
//那个150000指的是给zookeeper链接的一个超时时间。单位是毫秒,也就是15秒
//第三个参数指的就是事件通知处理器,选中的那个类需要实现Watcher接口
("192.168.147.128:2181",150000,new Test2());
//删除Znode
zooKeeper.delete("/alvin/test",-1);
}
//事件通知回调方法
@Override
public void process(WatchedEvent event) {
//获取链接事件
if(event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
}
}
}
综合
public class TestZookeeper {
public static void main(String[] args) throws Exception {
delete();
}
/**
* 删除节点.
* 删除节点前,需要先查询节点的状态(cversion)。通过getData来查询这个版本。
* 设计是为了保证删除的节点是你想删除的那个。
*/
public static void delete() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("192.168.147.128:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
Stat stat = new Stat();
System.out.println(stat);
zooKeeper.getData("/alvin/sequence0000000001", false, stat);
System.out.println(stat.getCversion());
zooKeeper.delete("/alvin/sequence0000000001", stat.getCversion());
}
/**
* 查询节点中存储的数据。 相当于根据key获取value
*/
public static void get() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("192.168.147.128:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
// 获取数据
byte[] datas = zooKeeper.getData("/alvin", false, null);
System.out.println("节点/alvin中存储的数据是:" + new String(datas));
}
/**
* 查询节点, 相当于遍历Key
*/
public static void list() throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("192.168.147.128:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
// 遍历zk中所有的节点。
listAll(zooKeeper, "/");
}
private static void listAll(ZooKeeper zooKeeper, String path) throws Exception{
// 获取当前节点的所有子节点。
List<String> children = zooKeeper.getChildren(path, false);
for(String child : children){
String currentNodeName = "/".equals(path) ? (path + child) : (path + "/" + child);
System.out.println(currentNodeName);
listAll(zooKeeper, currentNodeName);
}
}
/**
* 什么是会话?
* 持久、长期、有状态的对象。
* 使用Java远程访问zk,步骤是:
* 1、 创建客户端
* 2、 使用客户端发送命令
* 3、 处理返回结果
* 4、 回收资源
*/
public static void create() throws IOException, KeeperException, InterruptedException {
// 创建客户端对象
ZooKeeper zooKeeper = new ZooKeeper("192.168.147.128:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("watch中的方法执行");
}
});
// 创建一个节点
String result = zooKeeper.create("/alvin", "alvin data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建alvin节点结果:" + result);
// 创建一个临时节点
String tmpResult =
zooKeeper.create("/alvin/tmp", null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("创建/alvin/tmp节点结果:" + tmpResult);
// 创建一个带序号的节点
String seqResult =
zooKeeper.create("/alvin/sequence", null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("创建/alvin/sequence节点结果:"+ seqResult);
Thread.sleep(6000);
// 关闭客户端
zooKeeper.close();
}
}