一:zookeeper常用客户端
- zookeeper:官方提供的,原生的api,使用起来比较麻烦,比较底层,不够直接,不建议使用。
- zkclient: 对原生api的封装,开源项目(https://github.com/adyliu/zkclient),dubbo中使用的是这个。
- Apache Curator:Apache的开源项目,对原生客户端zookeeper进行封装,易于使用, 功能强大, 一般都是使用这个框架。
二:zookeeper原生客户端
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
public enum CreateMode {
/** 持久节点 */
PERSISTENT(0, false, false),
/** 持久顺序节点 */
PERSISTENT_SEQUENTIAL(2, false, true),
/** 临时节点(本次会话有效,会话结束后会自动删除) */
EPHEMERAL(1, true, false),
/** 临时顺序节点 */
EPHEMERAL_SEQUENTIAL(3, true, true);
}
public class ZooKeeper {
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException;
// 不存在返回null
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException;
/**
* 重复创建同一个节点会抛异常
* 创建子节点必选先保证父节点已经创建好了
*/
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException;
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException;
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException;
public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException;
/**
* 只能删除一个节点,不支持递归的删除某个路径
* 删除的节点不能包含子节点
*/
public void delete(String path, int version) throws InterruptedException, KeeperException;
// 添加认证信息(类似于密码)
public void addAuthInfo(String scheme, byte[] auth);
// ACL(Access Control List)设置节点访问权限列表,每个节点都可以设置访问权限,指定只有特定的客户端才能访问和操作节点。
public Stat setACL(String path, List<ACL> acl, int aclVersion) throws KeeperException, InterruptedException;
public List<ACL> getACL(String path, Stat stat) throws KeeperException, InterruptedException;
public synchronized void close() throws InterruptedException;
}
public class ZooDefs {
public interface Ids {
Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
Id AUTH_IDS = new Id("auth", "");
/** 这是一个完全开放的权限,所有客户端都有权限 */
ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(31, ANYONE_ID_UNSAFE)));
/** 只有创建节点的客户端才有所有权限 */
ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(31, AUTH_IDS)));
/** 所有客户端只有读取的 */
ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(1, ANYONE_ID_UNSAFE)));
}
}
EventType
public static enum EventType {
None(-1),
NodeCreated(1),
NodeDeleted(2),
NodeDataChanged(3),
NodeChildrenChanged(4);
}
KeeperState
public static enum KeeperState {
Disconnected(0),
SyncConnected(3),
AuthFailed(4),
ConnectedReadOnly(5),
SaslAuthenticated(6),
Expired(-112);
}
注意:程序不能debug,只能run,如果debug会报错org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired for /app1
原因是debug造成程序停止运行,导致会话过期
public class ZookeeperTest {
/** zookeeper地址 */
static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
/** session超时时间 */
static final int sessionTimeout = 5000;
/** 阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 事件状态
Event.KeeperState keeperState = watchedEvent.getState();
if (Event.KeeperState.SyncConnected == keeperState) {
// 事件类型
Event.EventType eventType = watchedEvent.getType();
if (Event.EventType.None == eventType) {
// 连接成功建立,发送信号量,让后续阻塞程序向下执行
countDownLatch.countDown();
System.out.println("连接成功:" + watchedEvent);
}
}
}
};
// 注意:ZooKeeper客户端和服务器会话的建立是一个异步的过程,也就是说程序方法在处理完客户端初始化后立即返回
// 也就是说可能并没有真正构建好一个可用的会话,只有会话声明周期处于SyncConnected时才算真正建立好会话
// 这也是为什么要使用CountDownLatch来等待连接成功
ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
// 添加认证信息
// digest:是最常见的权限控制模式,也更符合我们对权限控制的认证,类似于用户名和密码的方式,zk会对形成的权限标识先后进行两次编码处理,分别是SHA-1加密算法、Base64编码
zooKeeper.addAuthInfo("digest", "123456".getBytes());
// 如果没有连接成功则进行阻塞,直到连接成功才继续往下执行,连接时需要时间的,可能不会立即连接成功,肯能会等一两秒之后才连接成功,
countDownLatch.await();
String app1Node = "/app1";
try {
Stat app1Stat = zooKeeper.exists(app1Node, false);
if (app1Stat == null) {
List<ACL> acls = new ArrayList<>(1);
for (ACL acl : ZooDefs.Ids.CREATOR_ALL_ACL) {
acls.add(acl);
}
// 创建节点(如果节点已存在则抛出异常),返回节点名称
// 该节点需要认证,只有认证的其它zk客户端才能操作该节点
String app1NodePath = zooKeeper.create(app1Node, "app1".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println("app1NodePath=" + app1NodePath);
Stat p1Stat = zooKeeper.exists(app1Node + "/p_1", false);
if (p1Stat == null) {
// 创建子节点(父节点必须存在,父节点不存在则抛异常)
// CreateMode.EPHEMERAL 表示临时节点,当会话结束后会立即删除
zooKeeper.create(app1Node + "/p_1", "p_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(app1Node + "/p_2", "p_2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(app1Node + "/p_3", "p_3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
byte[] data = zooKeeper.getData(app1Node, false, null);
String app1NodeValue = new String(data);
System.out.println("app1NodeValue=" + app1NodeValue);
zooKeeper.setData(app1Node, "my p_1".getBytes(), -1);
List<String> children = zooKeeper.getChildren(app1Node, false);
for (String path : children) {
String fullPath = app1Node + "/" + path;
String nodeValue = new String(zooKeeper.getData(fullPath, false, null));
System.out.println(fullPath + "=" + nodeValue);
}
// version版本号 -1: 全删除
zooKeeper.delete(app1Node + "/p_2", -1);
// Thread.sleep(10000);
} catch (Exception e) {
System.out.println(e);
} finally {
zooKeeper.close();
}
}
}
原生zookeeper客户端使用注意:
ZooKeeper建立连接时是异步创建的,有可能已经开始调用客户端方法了,连接还没有完全建立好,所以在创建连接时一般将异步创建客户端变成同步创建客户端
session过期的问题: 在极端情况下,出现ZooKeeper session过期,客户端需要自己去监听该状态并重新创建ZooKeeper实例
自动恢复(failover)的问题: 当client与一台server的连接丢失,并试图去连接另外一台server时, client将回到初始连接模式
对可恢复异常的处理:当在server端创建一个有序ZNode,而在将节点名返回给客户端时崩溃,此时client端抛出可恢复的异常,用户需要自己捕获这些异常并进行重试
使用场景的问题:Zookeeper提供了一些标准的使用场景支持,但是ZooKeeper对这些功能的使用说明文档很少,而且很容易用错.在一些极端场景下如何处理,zk并没有给出详细的文档说明.比如共享锁服务,当服务器端创建临时顺序节点成功,但是在客户端接收到节点名之前挂掉了,如果不能很好的处理这种情况,将导致死锁
三:Watcher示例
Watcher:监控者,监控某个节点对应的操作,也就是对某个节点操作时执行一下回调函数,Watcher#process()
如果某个动作(操作节点)需要进行监控需要设置某个动作的watch=true或者设置exists(path, true)。注意执行一次动作和一次watch绑定,如果想每次都需要监控,那么每次动作都必须线上设置监控
public class ZookeeperWatcher implements Watcher {
/** zookeeper地址 */
private static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
/** session超时时间 */
private static final int sessionTimeout = 5000;
/** 阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
private AtomicInteger seq = new AtomicInteger();
private ZooKeeper zooKeeper;
private static final String PARENT_PATH = "/p";
private static final String CHILDEREN_PATH = "/p/c";
public void createConnection(String connectString, int sessionTimeout){
try {
this.releaseConnection();
zooKeeper = new ZooKeeper(connectString, sessionTimeout, this);
countDownLatch.await();
} catch (Exception e) {
}
}
public void releaseConnection() {
if (this.zooKeeper == null) return;
try {
this.zooKeeper.close();
System.out.println("断开连接");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public boolean createPath(String path, String data, boolean needWatch) {
try {
// 设置监控(由于zookeeper的监控都是一次性的,所以每次都需要重新设置监控)
System.out.println("添加节点");
this.zooKeeper.exists(path, needWatch);
this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("[" + Thread.currentThread().getName() + "] 节点创建成功 path=" + path + ", content=" + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
public void writeData(String path, String data) {
System.out.println("修改节点");
try {
this.zooKeeper.setData(path, data.getBytes(), -1);
} catch (Exception e) {
e.printStackTrace();
}
}
public List<String> getChildren(String path, boolean needWatch) {
try {
System.out.println("获取子节点");
return this.zooKeeper.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public void deleteAllTestPath(boolean needWatch) {
if (this.reSetWatch(CHILDEREN_PATH) != null) {
this.deleteNode(CHILDEREN_PATH);
}
if (this.reSetWatch(PARENT_PATH) != null) {
this.deleteNode(PARENT_PATH);
}
}
public void deleteNode(String path) {
try {
System.out.println("删除节点:" + path);
this.zooKeeper.delete(path, -1);
} catch (Exception e) {
e.printStackTrace();
}
}
public Stat reSetWatch(String path){
try {
// 重新设置监控
return this.zooKeeper.exists(path, true);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("process watchedEvent=" + watchedEvent);
if (watchedEvent == null) return;
Event.KeeperState keeperState = watchedEvent.getState();
Event.EventType eventType = watchedEvent.getType();
String path = watchedEvent.getPath();
String prefix = "[Watcher(" + Thread.currentThread().getName() + ") -" + this.seq.incrementAndGet() + "] path=" + path + "\t";
System.out.println(prefix + "连接状态:" + keeperState);
System.out.println(prefix + "事件:" + eventType);
if (Event.KeeperState.SyncConnected == keeperState) {
if (Event.EventType.None == eventType) {
System.out.println(prefix + "成功连接上ZK服务器 》》》》》");
countDownLatch.countDown();
} else if (Event.EventType.NodeCreated == eventType) {
System.out.println(prefix + "创建节点");
this.sleep(100);
} else if (Event.EventType.NodeDataChanged == eventType) {
System.out.println(prefix + " 修改节点");
this.sleep(100);
} else if (Event.EventType.NodeChildrenChanged == eventType) {
System.out.println(prefix + " 修改子节点");
this.sleep(3000);
} else if (Event.EventType.NodeDeleted == eventType) {
System.out.println(prefix + " 删除子节点 " + path);
this.sleep(3000);
}
} else if (Event.KeeperState.Disconnected == keeperState) {
System.out.println(prefix + " 连接断开 XXXXX");
} else if (Event.KeeperState.AuthFailed == keeperState) {
System.out.println(prefix + " 权限检查失败");
} else if (Event.KeeperState.Expired == keeperState) {
System.out.println(prefix + " 会话过期");
}
System.out.println("--------------------------------------------------------------------");
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ZookeeperWatcher zkWatcher = new ZookeeperWatcher();
// 第一次触发watcher 建立连接会触发
zkWatcher.createConnection(connectString, sessionTimeout);
Thread.sleep(1000);
// true:表示需要监控, 会触发第二次watcher
if (zkWatcher.createPath(PARENT_PATH, System.currentTimeMillis() + "", true)) {
Thread.sleep(1000);
// 重新建立监控(每次监控完就会断开,下次要想还监控必须手动再次设置监控)
zkWatcher.reSetWatch(PARENT_PATH);
// 第三次触发监控(因监控是一次性的,createPath监控用完了就断开监控了,而上面又再次建立了监控,所以writeData能被监控到)
zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "");
// 监控父节点(当下面的代码执行时,父节点增加了一个子节点也属于父节点发生了变化)
zkWatcher.getChildren(PARENT_PATH, true);
// 监控子节点,会触发监控
zkWatcher.createPath(CHILDEREN_PATH, System.currentTimeMillis() + "", true);
// 触发两次监控
zkWatcher.deleteAllTestPath(true);
}
zkWatcher.releaseConnection();
Thread.sleep(10000);
}
}