1、Framework 的简介
原文:http://curator.apache.org/curator-framework/index.html
Curator Framework 是一个高级 API ,极大的简化了 Zookeeper 的使用。它增加了许多机遇 Zookeeper 的特性,并处理了与 Zookeeper 集群的连接和重试操作的复杂性。
1.1 自动连接管理
通常情况下,对 Zookeeper 客户端的连接、重试操作可能会存在潜在的错误。Curator 自动并且大部分都透明化的处理了这些问题。监听 NodeDataChanged 事件,并根据需要调用 updateServerList() 。监听会被 Curator Recipes 自动移除。
1.2 简洁的 API 操作
简化了 Zookeeper 原生的方法、事件等操作。提供了模板、流式接口。
1.3 Recipe 实现
- Leader eletion 领导人选举
- Share lock 共享锁
- Path cache and watcher 路径缓存和监视器
- Distributed Queue 分布式队列
- Distributed Priority Queue 分布式优先级队列
- …
注意:java 8 版本还可使用 Curator Async
2、创建一个 CuratorFrameworks 实例
使用 CuratorFrameworkFactory 工厂类来构建实例,该工厂类提供了工厂方法和建造者来创建实例。
重要:CuratorFramework 的实例是完全线程安全的。可以在 Zookeeper 集群应用中共享它。
工厂方法(有多个重载)提供了简单的方式来创建实例:
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
建造者的方式可以控制参数个数:
// 通过工厂建造出连接实例:client
CuratorFramework client = CuratorFrameworkFactory.builder()
// 连接地址和端口(zookeeper)
.connectString("localhost:2181")
// 会话超时时间
.sessionTimeoutMs(5000)
// 连接超时时间
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
当你拥有一个实例之后,必须调用 start()
方法。在应用程序的末尾,应该调用 close()
方法。
3、CuratorFramework API
CuratorFramework 使用一个流式接口。操作都是通过构建的 CuratorFramework 实例构造的。其使用案例请见:https://blog.csdn.net/FBB360JAVA/article/details/104017549
4、 API 中的方法
方法 | 描述 |
---|---|
create() |
开始创建操作。调用其他方法(mode 或者background )。并通过 forPath() 方法来完成最终操作。 |
delete() |
开始删除操作。调用其他方法(version 或者background )。并通过 forPath() 方法来完成最终操作。 |
checkExists() |
开始检查操作,检查一个 ZNode 是否存在。调用其他方法(watch 或 background )。 |
getData() |
获取一个 ZNode 的数据。调用其他方法(watch 或 get stat )并且最终需要调用 forPath() 方法。 |
setData() |
设置一个 ZNode 的数据。调用其他方法(version 或 background )并且最终需要调用 forPath() 方法。 |
getChildren() |
获取一个 ZNode 子节点的列表。调用其他方法(watch , background 或 get stat )。并且最终需要调用 forPath() 方法。 |
transactionOp() |
用于分配操作,并且需要和 transaction() 方法一起使用。 |
transaction() |
原子的提交一个集合的操作,作为一次事务。 |
getACL() |
返回一个 ZNode 的 ACL 设置。 |
setACL() |
设置一个 ZNode 的 ACL 设置。最终需要使用 forPath() 方法。 |
getConfig() |
返回最后一次提交的配置。并且最终需要调用 forEnsemble() 方法。 |
reconfig() |
改变配置,并且最终需要调用 forEnsemble() 方法。 |
5、 通知事项
发布在 ClientListener
接口发布的通知操作和监听。可以通过 CuratorFramework
实例中的 addListener()
方法来注册监听。这个监听者实现了两个方法:
方法 | 描述 |
---|---|
eventReceived() | 一个面板操作被执行或一个监视被触发;检测给定的事件的详细信息。 |
6、 CuratorEvent
CuratorEvent
对象是一个简单对象(POJO)
,它能处理每一个面板通知的类型和并且触发监视。CuratorEvent
中的有用的变量依赖于事件的类型,并且可以通过 getType()
方法获得。下表内容来自于官方文档:
Event Type | Event Methods |
---|---|
CREATE | getResultCode() and getPath() |
DELETE | getResultCode() and getPath() |
EXISTS | getResultCode(), getPath() and getStat() |
GET_DATA | getResultCode(), getPath(), getStat() and getData() |
SET_DATA | getResultCode(), getPath() and getStat() |
CHILDREN | getResultCode(), getPath(), getStat(), getChildren() |
SYNC | getResultCode(), getStat() |
GET_ACL | getResultCode(), getACLList() |
SET_ACL | getResultCode() |
TRANSACTION | getResultCode(), getOpResults() |
WATCHED | getWatchedEvent() |
GET_CONFIG | getResultCode(), getData() |
RECONFIG | getResultCode(), getData() |
7、命名空间
由于一个 Zookeeper
集群是一个被共享的环境,命名空间被观察到是很重要的。因此,多个应用能够使用不冲突的 ZK
路径。(其实,就是起了一个可共享的前缀)
CuratorFramework
有一个 namespace
的概念。你可以在创建 CuratorFramework
实例(使用建造者)时,设置一个 namespace
。 CuratorFramework
将会在它的一个 API 被调用后,将namespace 前置。
官方案例:
CuratorFramework client = CuratorFrameworkFactory
.builder()
.namespace("MyApp")
...
build();
...
client.create().forPath("/test", data);
// node was actually written to: "/MyApp/test"
在这个例子中,可以看到将 MyApp
作为 namespace
。当创建 /test
结点时,会自动将 /MyApp
作为前缀创建。
8、 临时连接
临时 CuratorFramework 实例旨在容易发生故障的网络发送单次请求到 Zookeeper ,比如 WAN 。 CuratorTempFremework 的 API 的可用性是被限制的。并且,一段时间后,不使用的连接就会自动关闭。
这里有一篇文章可供参考:
http://whilefalse.blogspot.com/2012/12/building-global-highly-available.html
8.1 创建一个 CuratorTempFramework
创建 CuratorTempFramework 实例和创建普通的 CuratorFramework 实例是相似的。然而,在调用 build() 方法时,改为调用 buildTemp() 方法。 buildTemp() 可以选择在不活跃 3 分钟后自动关闭。有一个替代版本的 buildTemp() 方法可以允许你去指定不活跃的时间段。
8.2 API
CuratorTempFramework 实例提供了以下方法:
// 停止客户端:关闭连接
public void close();
/**
* 获得一个事务建造器
*
* @return builder object
* @throws Exception errors
*/
public CuratorTransaction inTransaction() throws Exception;
/**
* 启动一个获取数据的建造器
*
* @return builder object
* @throws Exception errors
*/
public TempGetDataBuilder getData() throws Exception;