3.1 设置ZooKeeper的CLASSPATH
3.2 建立ZooKeeper会话
ZooKeeper的API围绕ZooKeeper的句柄(handle)而构建,每个API调用都需要传递这个句柄。这个句柄代表与ZooKeeper之间的一个会话。在下图中,与ZooKeeper服务器已经建立的一个会话如果断开,这个会话就会迁移到另外一个ZooKeeper服务器上。只要会话还活着,这个句柄就有效,ZooKeeper客户端库会持续保持这个活跃连接,以保证与ZooKeeper服务器器之间的会话存活。如果句柄关闭,ZooKeeper客户端库会告知ZooKeeper服务器终止这个会话。如果ZooKeeper发现客户端已经死掉,就会使这个会话无效。如果客户端之后尝试重新连接到ZooKeeper服务器,使用之前无效会话对应的句柄进行连接,那么ZooKeeper服务器会通知客户端库,这个会话已失效,使用这个句柄进行的任何操作都会返回错误。
创建ZooKeeper句柄的构造函数如下所示:
ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher)
其中
connectString:包含主机名称和zookeeper服务器的端口,
例如
192.168.253.129:2181,192.168.253.130:2181,192.168.253.131:2181
sessionTimeout:以毫秒为单位,表示zookeeper等待客户端通信的最长时间,之后回声明会话已死亡。目前我们使用15000,即15秒。这就是说如果zookeeper与客户端有15秒的时间无法通信,zookeeper就会终止客户端的会话。一般来说zookeeper设置的超时时间为5~10秒。
watcher:用于接收会话事件的一个对象,这个对象需要我们自己创建。因为Watcher定义为接口,所以我们需要自己实现,然后实例化这个类对象并传入zookeeper构造函数中。客户端使用这个Watcher接口来监控与zookeeper之间会话的健康状况。如果与zookeeper服务器之间建立或者失去连接时就会产生事件。它们同样还能用于监控zookeeper数据的变化。如果与zookeeper的会话过期,会通过Watcher接口传递事件来通知客户端的应用。
3.2.1 实现一个Watcher
Watcher接口定义:
public interface Watcher{
void process(WatchedEvent event);}
实现示例:
import java.io.IOException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Master implements Watcher {
ZooKeeper zk;
String hostPort;
Master(String hostPort) {
this.hostPort = hostPort;
}
void startZK() throws IOException {
zk = new ZooKeeper(hostPort, 15000, this);
}
public void process(WatchedEvent e) {
System.out.println(e);
}
public static void main(String args[])throws Exception {
Master m = new Master(args[0]);
m.startZK(); // wait for a bit
Thread.sleep(60000);
}
}
3.2.2 运行Watcher的示例
ZooKeeper有两种管理接口:JMX和四字母组成的命令,通过stat和dump这两个四字母命令来看看服务器上发生了什么。
要使用这些命令,需要先通过telnet连接到客户端端口2181,然后输入这些命令(在命令后输入Enter键)。如下:
从中我们可以看到目前有一个客户端连接到了zookeeper服务器,为192.168.253.129:58290。
3.3 获取管理权
同步调用
zk.create("/master",
serverId.getBytes(),OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
3.3.1 异步获取管理权
zookeeper中,所有同步调用方法都有对应的异步调用方法。通过异步调用,我们可以在但线程中同时进行多个调用,同时也可以简化我们的实现方式。
void create(String path,
byte[] data,
List<ACL> acl,
CreateMode createMode,
AsyncCallback.StringCallback cb,①
Object ctx)
同步与异步调用很相似,仅仅多了两个参数:
1、提供回调方法的对象
2、用户指定上下文信息(传入回调方法的对象实例)
异步调用示例:
static boolean isLeader;
static StringCallback masterCreateCallback = new StringCallback() {
void processResult(int rc, String path, Object ctx, String name) {
switch(Code.get(rc)) {
case CONNECTIONLOSS:
checkMaster();
return;
case OK:
isLeader = true;
break;
default:
isLeader = false;
}
System.out.println("I'm " + (isLeader ? "" : "not ") +"the leader");
}
};
void runForMaster() {
zk.create("/master",
serverId.getBytes(),
OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
masterCreateCallback,
null);
}
checkMaster函数实现示例:
DataCallback masterCheckCallback = new DataCallback() {
void processResult(int rc,
String path,
Object ctx,
byte[] data,
Stat stat) {
switch(Code.get(rc)) {
case CONNECTIONLOSS:
checkMaster();
return;
case NONODE:
runForMaster();
return;
}
}
}
void checkMaster() {
zk.getData("/master", false, masterCheckCallback, null);
}
3.3.2 设置元数据
我们将使用异步API方法来设置元数据路径。我们的主从模型设计依赖三个目录:/tasks、/assign和/workers,我们可以在系统启动前通过某些系统配置来创建所有目录,或者通过在主节点程序每次启动时创建这些目录。以下代码段会创建这些路径:
public void bootstrap() {
createParent("/workers", new byte[0]);
createParent("/assign", new byte[0]);
createParent("/tasks", new byte[0]);
createParent("/status", new byte[0]);
}
void createParent(String path, byte[] data) {
zk.create(path,
data,
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
createParentCallback,
data);
}
StringCallback createParentCallback = new StringCallback() {
public void processResult(int rc, String path, Object ctx, String name) {
switch (Code.get(rc)) {
case CONNECTIONLOSS:
createParent(path, (byte[]) ctx);
break;
case OK:
LOG.info("Parent created");
break;
case NODEEXISTS:
LOG.warn("Parent already registered: " + path);
break;
default:
LOG.error("Something went wrong: ",
KeeperException.create(Code.get(rc), path));
}
}
};
3.4 注册从节点
示例代码如下:
import java.util.*;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
import org.slf4j.*;
public class Worker implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
ZooKeeper zk;
String hostPort;
String serverId = Integer.toHexString(random.nextInt());
Worker(String hostPort) {
this.hostPort = hostPort;
}
void startZK() throws IOException {
zk = new ZooKeeper(hostPort, 15000, this);
}
public void process(WatchedEvent e) {
LOG.info(e.toString() + ", " + hostPort);
}
void register() {
zk.create("/workers/worker-" + serverId,
"Idle".getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL,
createWorkerCallback, null);
}
StringCallback createWorkerCallback = new StringCallback() {
public void processResult(int rc, String path, Object ctx,
String name) {
switch (Code.get(rc)) {
case CONNECTIONLOSS:
register();
break;
case OK:
LOG.info("Registered successfully: " + serverId);
break;
case NODEEXISTS:
LOG.warn("Already registered: " + serverId);
break;
default:
LOG.error("Something went wrong: "+KeeperException.create(Code.get(rc), path));
}
}
};
public static void main(String args[]) throws Exception {
Worker w = new Worker(args[0]);
w.startZK();
w.register();
Thread.sleep(30000);
}
}
3.5 任务队列化
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
public class Client implements Watcher {
ZooKeeper zk;
String hostPort;
Client(String hostPort) {
this.hostPort = hostPort;
}
void startZK() throws Exception {
zk = new ZooKeeper(hostPort, 15000, this);
}
String queueCommand(String command) throws KeeperException {
while (true) {
try {
String name = zk.create("/tasks/task-",
command.getBytes(),
OPEN_ACL_UNSAFE,
CreateMode.SEQUENTIAL);
return name;
break;
} catch (NodeExistsException e) {
throw new Exception(name + " already appears to be running");
} catch (ConnectionLossException e) {
}
}
public void process(WatchedEvent e) {
System.out.println(e);
}
public static void main(String args[]) throws Exception {
Client c = new Client(args[0]);
c.start();
String name = c.queueCommand(args[1]);
System.out.println("Created " + name);
}
}
当我们运行client应用程序并发送一个命令时,/tasks节点下就会创建一个新的znode节点,该节点并不是临时性节点,因此即使client程序结束了,这个节点依然会存在。
3.6 管理客户端
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
public class AdminClient implements Watcher {
ZooKeeper zk;
String hostPort;
AdminClient(String hostPort) {
this.hostPort = hostPort;
}
void start() throws Exception {
zk = new ZooKeeper(hostPort, 15000, this);
}
void listState() throws KeeperException {
try {
Stat stat = new Stat();
byte masterData[] = zk.getData("/master", false, stat);Date startDate = new Date(stat.getCtime());
System.out.println("Master: " + new String(masterData) +" since " + startDate);
} catch (NoNodeException e) {
System.out.println("No Master");
}
System.out.println("Workers:");
for (String w: zk.getChildren("/workers", false)) {
byte data[] = zk.getData("/workers/" + w, false, null);
String state = new String(data);
System.out.println("\t" + w + ": " + state);
}
System.out.println("Tasks:");
for (String t: zk.getChildren("/assign", false)) {
System.out.println("\t" + t);
}
}
public void process(WatchedEvent e) {
System.out.println(e);
}
public static void main(String args[]) throws Exception {
AdminClient c = new AdminClient(args[0]);
c.start();
c.listState();
}
}