版权声明:转载请声明原文出处!!!! https://blog.csdn.net/weixin_40461281/article/details/85337190
zookeeper 的安装与集群的搭建 请参考我的另一片文章 https://blog.csdn.net/weixin_40461281/article/details/85336396
首先 创建一个maven项目 (不细讲了,不会的自行百度)
导入curator jar包
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.0</version>
</dependency>
new 一个类 名为 CuratorCreateSessionDemo
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* Curator 创建session连接 demo
*/
public class CuratorCreateSessionDemo {
// 集群连接地址
private final static String CONNECTSTRING = "***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181";
public static void main(String[] args){
// 创建会话的两种方式 normal
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(
CONNECTSTRING,5000,5000,new ExponentialBackoffRetry(1000,3));
curatorFramework.start();
// fluent 风格
CuratorFramework build = CuratorFrameworkFactory.builder()
.connectString(CONNECTSTRING)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
// 异常重试策略
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
// 命名空间(以后创建节点都在这个节点之下)
.namespace("/curator")
.build();
// 开启连接
build.start();
System.out.println("success");
}
}
为了便于之后的使用 我将连接编写为 工具类 CuratorClientUtils
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* Curator客户端工具类
*/
public class CuratorClientUtils {
// 集群连接地址
private final static String CONNECTSTRING = "***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181,***.***.***.***:2181";
private static CuratorFramework curatorFramework;
public static CuratorFramework getInstance(){
curatorFramework = CuratorFrameworkFactory.newClient(
CONNECTSTRING,5000,5000,new ExponentialBackoffRetry(1000,3));
curatorFramework.start();
return curatorFramework;
}
}
API 操作
创建节点 curator 可以直接创建父节点与子节点 (底层为递归创建) 返回值为创建节点的名称
// 创建节点
String s = curatorFramework.create()
// 同时创建父节点与子节点
.creatingParentContainersIfNeeded()
// PERSISTENT 永久节点 EPHEMERAL 临时节点
.withMode(CreateMode.PERSISTENT)
.forPath("/curator/curator1/curator11", "123".getBytes());
System.out.println(s);
删除节点 将上一步创建的 /curator 节点删除
curator 可以直接指定 父节点进行删除 会将该父节点下的子节点全部删除 (底层为递归操作)
// 删除节点
curatorFramework.delete()
// 同时删除父节点与子节点
.deletingChildrenIfNeeded()
.forPath("/curator");
System.out.println("删除成功...........");
查询/获取节点数据
// 查询/获取数据
Stat stat = new Stat();
byte[] bytes1 = curatorFramework.getData().storingStatIn(stat).forPath("/curator/curator1/curator11");
System.out.println(new String(bytes1) + " ---> stat: " + stat);
修改节点数据
// 修改数据
Stat stat1 = curatorFramework.setData().forPath("/curator", "456".getBytes());
System.out.println(stat1);
// 查询/获取数据
Stat stat2 = new Stat();
byte[] bytes2 = curatorFramework.getData().storingStatIn(stat2).forPath("/curator");
System.out.println(new String(bytes2) + " ---> stat: " + stat2);
异步操作
// 异步操作
// 创建一个线程池
ExecutorService service = Executors.newFixedThreadPool(1);
// 创建同步计数器 数值为1
CountDownLatch countDownLatch = new CountDownLatch(1);
curatorFramework.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
// 异步执行
.inBackground((curatorFramework1, curatorEvent) -> {
// 输出处理线程名称 结果号 执行类型
System.out.println(Thread.currentThread().getName() + " -> resultCode" +
curatorEvent.getResultCode() + " -> " + curatorEvent.getType());
// 计数器递减
countDownLatch.countDown();
}, service).forPath("/demo","789".getBytes());
// 最后一个参数为 一个线程池(可不填) 表示将任务交给线程池来处理
// 阻塞线程 等待计数器归零
countDownLatch.await();
// 关闭线程池
service.shutdown();
事务操作
操作一个错误节点
// 事务操作
Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
.create().forPath("/trans", "111".getBytes())
// 操作一个不存在的节点 会报错并回退
.and().setData().forPath("/xxxx", "111".getBytes())
.and().commit();
for (CuratorTransactionResult result:commit){
System.out.println(result.getForPath()+" -> "+ result.getType());
}
报错节点不存在
操作一个正确节点
// 事务操作
Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
.create().forPath("/trans", "111".getBytes())
// 操作一个正确节点
.and().setData().forPath("/curator", "111".getBytes())
.and().commit();
for (CuratorTransactionResult result:commit){
System.out.println(result.getForPath()+" -> "+ result.getType());
}
整体代码
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Curator API 操作 demo
*/
public class CuratorOperatorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
System.out.println("连接成功...........");
// 创建节点
String s = curatorFramework.create()
// 同时创建父节点与子节点
.creatingParentContainersIfNeeded()
// PERSISTENT 永久节点 EPHEMERAL 临时节点
.withMode(CreateMode.PERSISTENT)
.forPath("/curator/curator1/curator11", "123".getBytes());
System.out.println("创建节点: "+s + "内容为 123");
// 删除节点
curatorFramework.delete()
// 同时删除父节点与子节点
.deletingChildrenIfNeeded()
.forPath("/curator");
System.out.println("删除成功...........");
// 查询/获取数据
Stat stat = new Stat();
byte[] bytes1 = curatorFramework.getData().storingStatIn(stat).forPath("/curator/curator1/curator11");
System.out.println(new String(bytes1) + " ---> stat: " + stat);
// 修改数据
Stat stat1 = curatorFramework.setData().forPath("/curator", "456".getBytes());
System.out.println("修改成功: 内容已修改为 456 stat: "+stat1);
// 查询/获取数据
Stat stat2 = new Stat();
byte[] bytes2 = curatorFramework.getData().storingStatIn(stat2).forPath("/curator");
System.out.println(new String(bytes2) + " ---> stat: " + stat2);
// 异步操作
// 创建一个线程池
ExecutorService service = Executors.newFixedThreadPool(1);
// 创建同步计数器 数值为1
CountDownLatch countDownLatch = new CountDownLatch(1);
curatorFramework.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
// 异步执行
.inBackground((curatorFramework1, curatorEvent) -> {
// 输出处理线程名称 结果号 执行类型
System.out.println(Thread.currentThread().getName() + " -> resultCode" +
curatorEvent.getResultCode() + " -> " + curatorEvent.getType());
// 计数器递减
countDownLatch.countDown();
}, service).forPath("/demo","789".getBytes());
// 最后一个参数为 一个线程池(可不填) 表示将任务交给线程池来处理
// 阻塞线程 等待计数器归零
countDownLatch.await();
// 关闭线程池
service.shutdown();
// 事务操作
Collection<CuratorTransactionResult> commit = curatorFramework.inTransaction()
.create().forPath("/trans", "111".getBytes())
// 操作一个正确节点
.and().setData().forPath("/curator", "111".getBytes())
// 操作一个不存在的节点 会报错并回退
// .and().setData().forPath("/xxxx", "111".getBytes())
.and().commit();
for (CuratorTransactionResult result:commit){
System.out.println(result.getForPath()+" -> "+ result.getType());
}
}
}
事件监听
直接送上代码 注释很详细
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.TimeUnit;
/**
* Curator 事件监听 dmeo
*/
public class CuratorEventDemo {
/**
* 三种watcher来做节点的监听
* pathcache 监听一个路径下子节点的 创建 删除 更新
* NodeCache 监听一个节点的 创建 更新 删除
* TreeCache pathcache 加 nodecache 的合体 (监听路径下的 创建 更新 删除事件)
* 缓存路径下的所有子节点的数据
*/
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorClientUtils.getInstance();
// 节点变化 NodeCache
// NodeCache cache = new NodeCache(curatorFramework,"/curator",false);
//
// // true 初始化设置
// cache.start(true);
//
// cache.getListenable().addListener(() ->
// System.out.println("节点数据发生变化,变化后的结果: " + new String(cache.getCurrentData().getData())));
//
// curatorFramework.setData().forPath("/curator","菲菲".getBytes());
// PathChildrenCache
PathChildrenCache cache = new PathChildrenCache(curatorFramework, "/event", true);
/**
* Normal 初始化为空
* BUILD_INITIAL_CACHE 方法 return 之前 调用一个 rebuild 操作
* POST_INITIALIZED_EVENT cache 初始化后发出一个事件
*/
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((curatorFramework1, pathChildrenCacheEvent) -> {
// 事件还有很多,列举三个
switch (pathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
System.out.println("增加子节点");
break;
case CHILD_REMOVED:
System.out.println("删除子节点");
break;
case CHILD_UPDATED:
System.out.println("更新子节点");
break;
default:
break;
}
});
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
TimeUnit.SECONDS.sleep(1);
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
TimeUnit.SECONDS.sleep(1);
curatorFramework.setData().forPath("/event/event1", "222".getBytes());
TimeUnit.SECONDS.sleep(1);
curatorFramework.delete().forPath("/event/event1");
// curatorFramework.delete().deletingChildrenIfNeeded().forPath("/event");
System.in.read();
}
}