前期准备
Maven依赖:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zookeeper</groupId>
<artifactId>ZooKeeper</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<ZooKeeper-Version>3.4.14</ZooKeeper-Version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${ZooKeeper-Version}</version>
</dependency>
</dependencies>
</project>
log4j.properties:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c:%L] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c:%L] - %m%n
创建会话
ZooKeeper有4种构造方法,任意一种都可以与ZooKeeper服务器创建会话:
ZooKeeper(String connectString, int sessionTimeout , Watcher watcher);
ZooKeeper(String connectString, int sessionTimeout , Watcher watcher, boolean canBeReadOnly);
ZooKeeper(String connectString, int sessionTimeout , Watcher watcher, long sessionId, byte[] sessionPasswd);
ZooKeeper(String connectString, int sessionTimeout , Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly);
参数 | 说明 |
---|---|
connectString | ZooKeeper的参数列表,由逗号分开的 host:port 字符串组成,每一个都代表一台ZooKeeper服务器。可以指定任意一台,如果这台机器不是Leader则会转发消息给Leader |
sessionTimeout | 会话的超时时间,是一个以毫秒为单位的整型值。在一个会话周期内,ZooKeeper客户端与服务器之间会通过心跳检测机制来维持会话的有效性,一旦在 sessionTimeout 时间内没有进行有效的心跳检测,会话就会失效。 |
watcher | 监听器,如果有事件发生,其process()方法会被调用 |
canBeReadOnly | 用于标识当前会话是否支持只读模式。默认情况下,在ZooKeeper集群中,一个机器如果和集群中过半及以上机器失去网络连接,那么这个机器将不再处理客户端请求(包括读写请求)。但在某些使用场景下,当ZooKeeper服务器发生此类故障的时候,我们还是希望ZooKeeper服务器能够提供读服务(写服务无法提供)。 |
sessionId sessionPasswd |
会话ID和会话密钥,这两个参数能够唯一确定一个会话,同时客户端使用这两个参数可以实现客户端会话复用,从而达到恢复会话的效果。可通过调用ZooKeeper对象实例的 getSessionId() 和 getSessionPasswd() 获得。 |
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
public class Text {
private static CountDownLatch semaphore = new CountDownLatch(1); //信号量
private static ZooKeeper zooKeeper = null;
public static void connect() {
String connectString = "master:2181,slave1:2181,slave2:2181";
int sessionTimeout = 2000;
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeout , new Watcher() {
public void process(WatchedEvent event) {
//事件监听
System.out.println(event);
if (KeeperState.SyncConnected == event.getState()) {
semaphore.countDown();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
connect();
semaphore.await(); //阻塞
}
}
WatchedEvent state:SyncConnected type:None path:null
创建节点
客户端可以通过两种方式创建节点(均不能递归创建节点,即无法在父节点不存在的情况下创建一个子节点):
同步方式:create(final String path, byte data[], List acl, CreateMode createMode)
异步方式:void create(final String path, byte data[], List acl, CreateMode createMode, StringCallback cb, Object ctx)
参数 | 说明 |
---|---|
path | 需要创建的数据节点的路径 |
data[] | 节点创建后的初始内容 |
acl | 节点的ACL策略,Ids.OPEN_ACL_UNSAFE 表示之后对这个节点的任何操作都不受权限控制 |
createMode | 节点类型,是一个枚举类型,通常有四种可选的节点类型: 持久(PERSISTENT) 持久顺序(PERSISTENT_SEQUENTIAL) 临时(EPHEMERAL) 临时顺序(EPHEMERAL_SEQUENTIAL) |
cb | 注册一个异步回调函数。开发人员需要实现StringCallback接口,重写 void processResult(int rc, String path, Object ctx, String name) 方法,当服务端节点创建完毕后,ZooKeeper客户端会自动调用这个方法,这样就可以处理相关的业务逻辑。 |
ctx | 存放上下文信息(context) |
同步方式:
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Text {
private static CountDownLatch semaphore = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void connect() {
String connectString = "master:2181";
int sessionTimeout = 2000;
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeout , new Watcher() {
public void process(WatchedEvent event) {
System.out.println(event);
if (KeeperState.SyncConnected == event.getState()) {
semaphore.countDown();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
public static void create() {
try {
String node = zooKeeper.create("/test",
"".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(node);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
connect();
semaphore.await();
create();
}
}
异步方式与同步方式最大的区别在于,节点的创建过程(包括网络通信和服务端节点创建过程)是异步的。而且在同步接口调用过程中,我们需要关注接口抛出异常的可能,但是在异步接口中,接口本身是不会抛出异常的,所有的异常都会在回调函数中通过 Result Code(响应码)来体现。
AsyncCallback包含了StatCallback,DataCallback,ACLCallback,ChildrenCallback,Children2Callback,StringCallback,VoidCallback七种不同的回调接口。接口中包含了processResult()方法。
processResult()方法参数
参数 | 说明 |
---|---|
rc | Result Code,服务端响应码。客户端可以从这个响应码中识别出API调用的结果。常见的响应码如下: 0(OK):接口调用成功 -4(ConnectionLoss):客户端和服务端连接已断开 -110(NodeExists):指定节点已存在 -112(SessionExpired):会话已过期。 |
path | 数据节点的路径 |
ctx | 存放上下文信息(context) |
name | 实际在服务端创建的节点名。由于创建的节点类型是顺序节点,因此在服务端没有真正创建好顺序节点之前,客户端无法知道节点的完整节点路径。于是在回调方法中客户服务端会返回这个数据节点的完整路径。 |
异步方式:
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Text {
private static CountDownLatch semaphore = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void connect() {
String connectString = "master:2181";
int sessionTimeout = 2000;
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeout , new Watcher() {
public void process(WatchedEvent event) {
System.out.println(event);
if (KeeperState.SyncConnected == event.getState()) {
semaphore.countDown();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
public static void create() throws InterruptedException {
StringCallback callback = new StringCallback() {
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("Result Code: " + rc + ", path: " + path + ", context: " + ctx + ", name: " + name);
}
};
zooKeeper.create("/test",
"".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL,
callback,
"helloworld");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
connect();
semaphore.await();
create();
}
}
获取子节点列表
动态获取子节点列表并监听子节点列表的变动。当子节点列表发送变化,服务端会向客户端发送一个 EventType.NodeChildrenChanged 类型的事件通知。需要注意的是,在服务端发送给客户端的事件通知中,是不包含最新的节点列表的,客户端必须主动进行获取。通常客户端在收到事件通知后,就可以再次获取最新的子节点列表。
注意:Watcher 通知是一次性的,一旦触发一次通知后,该 Watcher 就是失效了,因此客户端需要反复 Watcher 。
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
public class Text {
private static CountDownLatch semaphore = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void connect() {
String connectString = "master:2181";
int sessionTimeout = 2000;
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeout , new Watcher() {
public void process(WatchedEvent event) {
System.out.println(event);
if (KeeperState.SyncConnected == event.getState()) {
if (event.getPath() == null && EventType.None == event.getType()) {
semaphore.countDown();
} else if (EventType.NodeChildrenChanged == event.getType()) {
try {
getChildrenAndWatch(); //反复调用,反复注册Watcher
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
public static void getChildrenAndWatch() throws KeeperException, InterruptedException {
List<String> list = zooKeeper.getChildren("/", true);
for (String string : list) {
System.out.println(string);
}
}
public static void main(String[] args) throws Exception {
connect();
semaphore.await();
getChildrenAndWatch();
Thread.sleep(Long.MAX_VALUE);
}
}
API中包含了同步和异步的方法,异步方法通常会应用在这样的使用场景中:在应用启动的时候,会获取一些配置信息,例如机器列表等。这些配置通常比较大,并且不希望配置的获取影响应用的主流程。
获取节点数据内容
API中有4个同步和异步的方法:
byte[] getData(final String path, Watcher watcher, Stat stat)
byte[] getData(String path, boolean watch, Stat stat)
void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)
void getData(String path, boolean watch, DataCallback cb, Object ctx)
参数 | 说明 |
---|---|
path | 指定数据节点的路径 |
watcher | 注册的Watcher。一旦节点内容有变更,就会向客户端发送通知。该参数允许传入null。 |
stat | 指定数据节点的节点状态信息。用法是在接口中传入一个旧的stat变量,该变量会在方法执行过程中,被来自服务端响应的新stat对象替换(获得最新的节点状态信息)。 |
watch | 是否需要注册一个watcher。如果是true,则使用默认的Watcher(创建ZooKeeper对象的那个),如果是false则不需要注册。 |
cb | 注册一个异步回调函数 |
ctx | 存放上下文信息(context) |
客户端在获取一个节点的数据内容的时候,进行Watcher注册,一旦该节点的状态发生改变(数据的内容或版本变化),那么服务端就会向客户端发送一个 EventType.NodeDataChanged 事件通知。
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
public class Text {
private static CountDownLatch semaphore = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void connect() {
String connectString = "master:2181";
int sessionTimeout = 2000;
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeout , new Watcher() {
public void process(WatchedEvent event) {
System.out.println(event);
if (KeeperState.SyncConnected == event.getState()) {
semaphore.countDown();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
public static void getData() throws KeeperException, InterruptedException {
byte[] data = zooKeeper.getData("/test", new Watcher() {
public void process(WatchedEvent event) {
if (EventType.NodeDataChanged == event.getType()) {
System.out.println(event);
}
}
}, new Stat());
System.out.println(new String(data));
}
public static void main(String[] args) throws Exception {
connect();
semaphore.await();
getData();
Thread.sleep(Long.MAX_VALUE);
}
}
更新数据
API中有2个同步和异步的方法:
Stat setData(final String path, byte data[], int version)
void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)
需要说明的是version参数,该参数用于避免一些分布式更新的并发问题,例如分布式锁服务等。假如客户端A试图进行更新操作,它会携带上次获得到的version值进行更新。如果在这段时间内,ZooKeeper服务器上该节点的数据恰好被其他客户端更新了,那么数据版本一定发生了改变。最新的数据版本与客户端A携带的version无法匹配,于是客户端A无法更新成功。
如果对ZooKeeper数据节点的更新操作没有原子性要求,那么数据版本参数可以用 -1,告诉服务器,客户端需要基于数据的最新版本进行更新操作。
获取版本可以用stat.getVersion()。下面是同步方法实现更新数据:
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
public class Text {
private static CountDownLatch semaphore = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void connect() {
String connectString = "master:2181";
int sessionTimeout = 2000;
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeout , new Watcher() {
public void process(WatchedEvent event) {
System.out.println(event);
if (KeeperState.SyncConnected == event.getState()) {
semaphore.countDown();
}
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
public static void setData() throws KeeperException, InterruptedException {
Stat stat = zooKeeper.setData("/test", "666".getBytes(), -1); //第一次修改
System.out.println(stat.getMzxid() + " " + stat.getVersion());
Stat stat2 = zooKeeper.setData("/test", "999".getBytes(), stat.getVersion()); //第二次修改
System.out.println(stat2.getMzxid() + " " + stat2.getVersion());
try {
zooKeeper.setData("/test", "123".getBytes(), stat.getVersion()); //第三次修改
} catch (KeeperException e) {
System.out.println(e.getMessage());
}
}
public static void main(String[] args) throws Exception {
connect();
semaphore.await();
setData();
Thread.sleep(Long.MAX_VALUE);
}
}
WatchedEvent state:SyncConnected type:None path:null
8589934676 1
8589934677 2
KeeperErrorCode = BadVersion for /test
第一次更新操作成功执行后,ZooKeeper服务端返回给客户端一个数据节点的节点状态信息对象stat,节点的数据版本变为1。第二次使用数据版本1更新,执行成功后,返回节点状态信息对象stat2,数据版本变成2。第三次更新操作时,依然使用数据版本1来进行更新操作,更新失败。
删除节点
API中有2个同步和异步的方法:
void delete(final String path, int version)
void delete(final String path, int version, VoidCallback cb, Object ctx)
删除节点的方式和更新数据的方式类似。下面是异步方法实现更新数据:
public static void delete() {
VoidCallback cb = new VoidCallback() {
public void processResult(int rc, String path, Object ctx) {
if (rc == 0) {
System.out.println("Delete " + path + " successfully");
}
}
};
zooKeeper.delete("/test", -1, cb, null);
}
检测节点是否存在
API中有4个同步和异步的方法:
Stat exists(final String path, Watcher watcher)
Stat exists(String path, boolean watch)
void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
void exists(String path, boolean watch, StatCallback cb, Object ctx)
- 无论指定节点是否存在,都可以通过调用exists方法注册Watcher。
- exists方法中注册的Watcher,能够对节点创建、删除、更新事件进行监听。
- 对于指定节点的子节点的各种变化,都不会通知客户端(比如指定目录 /test,那么其它节点或该目录下的节点的任何变化都不会通知客户端,必须是/test发生变化才可以)。
public static void isExist() throws KeeperException, InterruptedException {
Stat stat = zooKeeper.exists("/test666", new Watcher() {
public void process(WatchedEvent event) {
if (EventType.NodeCreated == event.getType()) {
System.out.println("NodeCreated");
} else if (EventType.NodeDataChanged == event.getType()) {
System.out.println("NodeDataChanged");
} else if (EventType.NodeDeleted == event.getType()) {
System.out.println("NodeDeleted");
}
}
});
System.out.println(stat == null ? "Not Exist" : "Exist");
}
权限控制
为了避免存储在ZooKeeper服务器上的数据被其他进程干扰或人为操作修改,需要对ZooKeeper上的数据访问进行权限控制(Access Control)。ZooKeeper提供了ACL的权限控制机制,通过设置ZooKeeper服务器上数据节点的ACL来控制客户端对该数据节点的访问权限。如果一个客户端符合该ACL控制,那么就可以对其进行访问,否则无法操作。
ZooKeeper提供了多种权限控制模式(Scheme),分别是world,auth,digest,ip,super。
会话创建后,给该会话加上相关的权限信息(AuthInfo):addAuthInfo(String scheme, byte[] auth)
public static void accessControl() throws IOException, KeeperException, InterruptedException {
String connectString = "master:2181";
int sessionTimeout = 2000;
ZooKeeper zooKeeper1 = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
}
});
zooKeeper1.addAuthInfo("digest", "passwd".getBytes());
zooKeeper1.create("/secret", "secret".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
ZooKeeper zooKeeper2 = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
}
});
byte[] data = zooKeeper2.getData("/secret", false, null);
System.out.println(new String(data));
}
Exception in thread “main” org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /secret
上面包含权限信息的zooKeeper1创建了一个数据节点 /secret,没有权限信息的zooKeeper2无法获取数据节点 /secret 的数据。
对于删除节点而言,其权限控制比较特殊。当客户端对一个数据节点添加了权限信息后,对于删除操作而言,权限控制作用范围是其子节点。也就是说,当我们对一个数据节点添加权限信息后,可以自由的删除这个节点(不需要addAuthInfo),但对于这个节点的子节点就必须使用相应的权限信息才能够删除它。带权限的ZooKeeper实例创建了数据节点/A和/A/B,/A/B要权限删除,删除后,/A可以直接删除。