1、原生API的不足
1、连接的创建是异步的,需要开发人员自行编码实现等待
2、连接没有自动的超时重连机制
3、ZK本身不提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化
4、Watcher注册一次只会生效一次,需要不断的重复注册
5、Watcher的使用方式不符合java本身的术语,如果采用监听方式,更容易理解
6、不支持递归创建树形节点
2、开源客户端---Curator介绍
Apache基金会得顶级项目之一
1 、解决session会话超时重连
2、watcher反复注册
3、简化开发API
4、遵循Fluent风格Api规范
5、NodeExistsException异常处理
6、共享锁服务,master选举 , 分布式计数器
3、创建会话
1、使用CuratorFrameworkFactory工厂的两个静态方法创建客户端
public class CuratorClientDemo {
private CuratorFramework client = null;
/**
*
* public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
* return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
* }
*
* public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
* return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
* }
*
* connectStrng 逗号分开的ip:port
* retryPolicy 重试策略,默认四种: Exponential BackoffRetry, RetryNtime, RetryOneTime, RetryUntilElapsed
* sessionTimeoutMs 会哈超时时间,单位为毫秒,默认60000ms
* connectionTimeoutMs 连接创建超时时间,单位为毫秒,默认是15000ms
*
*/
public CuratorClientDemo() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(1).retryPolicy(retryPolicy).build();
client.start();
}
}
4、重试策略
1、实现接口RetryPolicy可以自定义重试策略
public static void main(String[] args) {
/**
* retryCount : 已经重试次数,如果第一次重试,此值为0
* elapsedTimeMs : 重试花费的时间,单位为毫秒
* sleeper : 类似于Thread.sleep,用于sleep指定时间
* 返回值 : 如果还会继续重试,则返回true
*/
// public interface RetryPolicy {
// boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
// }
}
1、ExponentialBackoffRetry
- ExponentialBackoffRetry
// baseSleepTimeMs : 初始sleep时间
// maxRetries : 最大重试次数
// maxSleepMs : 最大重试时间
// 返回值 : 如果还会继续重试,则返回true
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
当前应该sleep的时间 : baseSleepTimeMs * Math.max(1,random.nextInt(1 << (retryConut +1 )))
2、RetryNTimes
// RetryNTimes
// RetryNTimes(int n, int sleepMsBetweenRetries
// 当前应该sleep
// 参数名 n : 最大重试数
// 参数名 sleepMsBetweenRetries
3、RetryOneTime
// RetryOneTime
// 只重试一次
// RetryOneTime(int sleepMsBetweenRetry)
// 参数名 sleepMsBetweenRetries
4、RetryUntilElapsed
// RetryUntilElapsed
// RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
// 重试的时间超过最大时间后,就不在重试
// 参数名 maxElapsedTimeMs : 最大重试时间
// 参数名 sleepMsBetweenRetries
5、Fluent风格的API
-
定义 : 一种面向对象的开发方式,目的是提高代码的可读性
-
实现方式 : 通过方法的级联或者方法链的方式实现
-
举例:
public CuratorClientTest() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181,localhost:2182")
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("base").build();
client.start();
}
6、创建节点
1、代码举例
public void createNode(String path, byte[] data) throws Exception {
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path, data);
}
2、参数说明
-
1、构建操作包装类(Builder):CreateBuilder create()--- CuratorFramework
-
2、CreateBuilder
1、creatingParentsIfNeeded // 递归创建父目录
2、withMode(CreateMode mode) // 设置节点属性 比如:CreateMode.PERSISTENT ,如果是递归创建,创建模式为临时节点,则只有叶子节点是临时界定啊,非叶子节点都为持久化节点
3、withACL(List aclList) // 设置ACL
4、forPath(String path) // 指定路径
7、删除节点
1、名词解释
-
构建操作包装类(Builder):DeleteBuilder delete() -----CuratorFramework
-
DeleteBuilder
1、withVersion (int version) // 特定版本号
2、guaranteed() // 确保节点被删除
3、forPath(String path) // 指定路径
4、deletingChildrenIfNeeded() // 递归删除所有子节点
关于 guaranteed:
Solves edge cases where an operation may succeed on the server but connection failure occurs before a response can be successfully returned to the client
意思是: 解决当某个删除操作在服务器端可能成功,但是此时客户端与服务器端的连接中断,而删除的响 应没有成功返回到客户端 底层的本质:重试
2、源码分析
public Void forPath(String path) throws Exception {
final String unfixedPath = path;
path = this.client.fixForNamespace(path);
if (this.backgrounding.inBackground()) {
ErrorCallback<String> errorCallback = null;
//
if (this.guaranteed) {
errorCallback = new ErrorCallback<String>() {
public void retriesExhausted(OperationAndData<String> operationAndData) {
// 删除失败的集合
DeleteBuilderImpl.this.client.getFailedDeleteManager().addFailedDelete(unfixedPath);
}
};
}
this.client.processBackgroundOperation(new OperationAndData(this, path, this.backgrounding.getCallback(), errorCallback, this.backgrounding.getContext()), (CuratorEvent)null);
} else {
this.pathInForeground(path, unfixedPath);
}
return null;
}
二次执行删除
void addFailedDelete(String path) {
if (this.debugListener != null) {
this.debugListener.pathAddedForDelete(path);
}
// 客户端状态属于启动状态
if (this.client.getState() == CuratorFrameworkState.STARTED) {
this.log.debug("Path being added to guaranteed delete set: " + path);
try {
// 再次执行删除
((ErrorListenerPathable)this.client.delete().guaranteed().inBackground()).forPath(path);
} catch (Exception var3) {
ThreadUtils.checkInterrupted(var3);
this.addFailedDelete(path);
}
}
}
3、关于异步操作 inBackground
从参数看跟zk的原生异步API相同,多了一个线程池,用于执行回调
public interface Backgroundable<T> {
T inBackground();
T inBackground(Object var1);
T inBackground(BackgroundCallback var1);
T inBackground(BackgroundCallback var1, Object var2);
T inBackground(BackgroundCallback var1, Executor var2);
T inBackground(BackgroundCallback var1, Object var2, Executor var3);
}