文章目录
Apache Curator客户端的使用
常用zk的一些java客户端对比
- zk原生api
- 超时重连,不支持自动,需要手动操作
- Watch注册一次后会失效
- 不支持递归创建节点
- zkclient(更新缓慢)
- Apache curator
- Apache的开源项目
- 解决watcher的注册一次就失效
- Api更加简单易用
- 提供更多解决方案并且实现简单:如分布式锁
- 提供常用的ZooKeeper工具类
客户端 curator 的基本Api
1. Maven中引入相关依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
2.创建会话
2.1 使用静态工厂方法创建客户端
RetryPolicy retryPolicy = new RetryNTimes(3,5000);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
zkServerPath
,5000
,10000
,retryPolicy);
-参数:
参数名 | 含义 |
---|---|
connectionString | 服务器列表,格式host1:port1,host2:port2,… |
retryPolicy | 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口 |
sessionTimeoutMs | 会话超时时间,单位毫秒,默认60000ms |
connectionTimeoutMs | 连接创建超时时间,单位毫秒,默认60000ms |
namespace | 命名空间 |
2.2 使用Fluent风格创建会话
RetryPolicy retryPolicy = new RetryNTimes(3,5000);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000)
.connectionTimeoutMs(50000)
.retryPolicy(retryPolicy)
.build();
2.3 使用命名空间namespace创建会话
当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。
通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多个应用共用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离十分有意义。
RetryPolicy retryPolicy = new RetryNTimes(3,5000);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000)
.connectionTimeoutMs(50000)
.retryPolicy(retryPolicy)
.namespace("workspace")
.build();
2.4 Curator重试连接时使用的策略
- ExponentialBackoffRetry
参数 | 含义 |
---|---|
baseSleepTimeMs | 初始sleep的时间 |
maxRetries | 最大重试次数 |
maxSleepMs | 最大重试时间 |
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
- RetryNTimes
参数 | 含义 |
---|---|
n | 重试的次数 |
sleepMsBetweenRetry | 每次重试间隔的时间 |
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
- RetryOneTime
参数 | 含义 |
---|---|
sleepMsBetweenRetry | 每次重试间隔的时间 |
RetryPolicy retryPolicy2 = new RetryOneTime(3000);
- RetryUntilElapsed
参数 | 含义 |
---|---|
maxElapsedTimeMs | 最大重试时间 |
sleepMsBetweenRetries | 每次重试间隔重试时间超过maxElapsedTimeMs后,就不再重试 |
RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);
3. 启动和关闭客户端
成功创建client的实例后可以调用其start( )方法
client.start();
需要关闭时,获取client实例调用其close()方法
client.close();
4. 节点操作
4.1 创建节点
创建一个节点,指定创建模式,附带初始化内容,并且自动递归创建父节点
String nodepath = "/super/chandler";
byte[] data ="superme".getBytes();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodepath,data);
基本概念和API了解后我们来实操一个例子:
/**
* @Date: 19-1-29
* @version: V1.0
* @Author: Chandler
* @Description: ${todo}
*/
public class CuratorOperator {
private CuratorFramework client = null;
public static final String zkServerPath = "127.0.0.1:2181";
public CuratorOperator() {
RetryPolicy retryPolicy = new RetryNTimes(3,5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
}
public static void main(String[] args) throws Exception {
CuratorOperator cto = new CuratorOperator();
boolean isZkCuratorStarted = cto.client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));
//创建节点
String nodepath = "/super/chandler";
byte[] data ="superme".getBytes();
cto.client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodepath,data);
}
public void closeZKClient() {
if (client != null) {
this.client.close();
}
}
}
命名空间为workspace,并且子节点也已经被成功递归创建
[zk: localhost:2181(CONNECTED) 36] ls /
[workspace, zookeeper, test, boboanx, testnode, aclboboan, sonnode]
[zk: localhost:2181(CONNECTED) 37] ls /workspace
[super]
[zk: localhost:2181(CONNECTED) 38] ls /workspace/super
[chandler]
[zk: localhost:2181(CONNECTED) 39] ls /workspace/super/chandler
[]
[zk: localhost:2181(CONNECTED) 40] get /workspace/super/chandler
superme
cZxid = 0x101
ctime = Tue Jan 29 15:53:54 CST 2019
mZxid = 0x101
mtime = Tue Jan 29 15:53:54 CST 2019
pZxid = 0x101
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0
4.2 修改节点
4.2.1 修改节点数据
cto.client.setData().forPath(nodepath,newData);
4.2.2指定节点版本强制更新。
cto.client.setData().withVersion(0).forPath(nodepath,newData);
4.3 删除节点
4.3.1 删除一个节点
只能删除叶子节点,否则会异常
client.delete().forPath(nodePath);
4.3.2 guaranteed()强制保证删除
如果失败,后端还是会继续删除直到成功,会递归删除子节点。
这些方法都是可以组合使用的
client.delete()
.guaranteed()
.deletingChildrenIfNeeded()
.withVersion(0)
.forPath(nodePath);
4.4 查询节点
4.4.1 读取一个节点的数据内容
能够返回一个bytep[]
byte[] data = cto.client.getData().forPath(nodepath);
4.4.2 读取一个节点的数据内容,同时获取到该节点的stat
我们可以拿到该节点的stat信息
Stat stat = new Stat();
byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodepath);
System.out.println("该节点的版本号为: " + stat.getVersion());
4.4.3 查询某个节点的所有子节点路径
该方法返回的是一个集合,遍历即可得到全部子节点路径
List<String> childNodes = cto.client.getChildren().forPath(nodepath);
4.4.4 查询某个节点是否存在
如果存在返回Stat对象,不存在返回null
Stat statExist = cto.client.checkExists().forPath(nodepath + "/chandler");
5. Watch监控事件
5.1 Watch监控事件一次–usingWatcher
当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodepath);
public class MyCuratorWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println("触发watcher,节点路径为:" + watchedEvent.getPath());
}
}
6. 缓存
Zookeeper原生Watcher只能单次注册单次使用,需要我们反复注册。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
6.1 NodeCache
nodeCache在初始化的时候获取node的值并且缓存,同时用NodeCache会监听数据节点的变更,会触发监听事件。
//创建NodeCache
final NodeCache nodeCache = new NodeCache(cto.client, nodepath);
//启动
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("节点初始化数据为空...");
}
//监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if (nodeCache.getCurrentData() == null) {
System.out.println("空");
return;
}
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);
}
});
6.2 PathCache
- PathCache监听数据节点的增删改,会触发事件
- 主要涉及四个类:
- PathChildrenCache
- PathChildrenCacheEvent
- PathChildrenCacheListener
- ChildData
- 初始化PathChildrenCache后,调用它的start()方法,设置好初始化方式,注册上监听事件,使用完毕后调用close()方法关闭
- start()方法里StartMode的启动方式
- POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
- NORMAL:异步初始化
- BUILD_INITIAL_CACHE:同步初始化
final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点数据列表:");
for (ChildData cd : childDataList) {
String childData = new String(cd.getData());
System.out.println(childData);
}
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子节点初始化ok...");
}
else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
if (path.equals("/super/chandler")) {
System.out.println("添加子节点:" + event.getData().getPath());
System.out.println("子节点数据:" + new String(event.getData().getData()));
} else if (path.equals("/super/chandle")) {
System.out.println("添加不正确...");
}
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除子节点:" + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子节点路径:" + event.getData().getPath());
System.out.println("修改子节点数据:" + new String(event.getData().getData()));
}
}
});
Thread.sleep(100000);
cto.closeZKClient();
我们利用监听事件可以让我们在服务器配置进行变更的时候实现自动化更新等操作,也能根据我们的业务进行逻辑处理,非常方便~
7. Acl
在创建节点的时候添加.withACL(acls, true)方法放入自定义好的ACL权限组即可。
String nodePath = "/acl/father";
List<ACL> acls = new ArrayList<ACL>();
Id boboan1 = new Id("digest", AclUtils.getDigestUserPwd("boboan1:123456"));
Id boboan2 = new Id("digest", AclUtils.getDigestUserPwd("boboan2:123456"));
acls.add(new ACL(ZooDefs.Perms.ALL, boboan1));
acls.add(new ACL(ZooDefs.Perms.READ, boboan2));
acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE, boboan2));
// 创建节点
byte[] data = "spiderman".getBytes();
cto.client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls, true)
.forPath(nodePath, data);