直接使用zk的api实现业务功能比较繁琐。因为要处理session loss,session expire等异常,在发生这些异常后进行重连。又因为ZK的watcher是一次性的,如果要基于wather实现发布/订阅模式,还要自己包装一下,将一次性订阅包装成持久订阅。另外如果要使用抽象级别更高的功能,比如分布式锁,leader选举等,还要自己额外做很多事情。这里介绍下ZK的两个第三方客户端包装小工具,可以分别解决上述小问题。
一、 zkClient
zkClient主要做了两件事情。一件是在session loss和session expire时自动创建新的ZooKeeper实例进行重连。另一件是将一次性watcher包装为持久watcher。后者的具体做法是简单的在watcher回调中,重新读取数据的同时再注册相同的watcher实例。
zkClient简单的使用样例如下:
public static void testzkClient(final String serverList) { ZkClient zkClient4subChild = new ZkClient(serverList); zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List currentChilds) throws Exception { System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds); } }); 上面是订阅children变化,下面是订阅数据变化 ZkClient zkClient4subData = new ZkClient(serverList); zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println(prefix() + "Data of " + dataPath + " has changed"); } @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println(prefix() + dataPath + " has deleted"); } }); 订阅连接状态的变化: ZkClient zkClient4subStat = new ZkClient(serverList); zkClient4subStat.subscribeStateChanges(new IZkStateListener() { @Override public void handleNewSession() throws Exception { System.out.println(prefix() + "handleNewSession()"); } @Override public void handleStateChanged(KeeperState stat) throws Exception { System.out.println(prefix() + "handleStateChanged,stat:" + stat); } });