当客户服务器端是一个集群时,集群内服务器的数量可能随时发生变化,有的服务器down机下线,有的服务器上线工作提供服务,而客户端如何感知服务的上下线,并入服务器进行通信呢,这个问题就可以通过zookeeper进行解决。
问题分析:
1.每次有服务器上线提供服务时就要在zookeeper上创建一个临时序列节点,这样当服务器下线时,该服务程序结束,就会删除这个节点
2.上下线都会触发父节点NodeChildrenChanged事件通知
3.客户端注册监听器监听父节点,当接收到这个事件通知时,就去访问父节点,并获取父节点下创建了哪些子节点,就表示可以与哪些服务器进行通信,再通过一定的策略决定于那一台服务器进行交互
server的代码
package bigdata.zkserertest; import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class zkServers { private static final String connectString = "lanc05:2181,lanc06:2181,lanc07:2181"; private static final int sessionTimeout =2000; private static final String parent="/servers"; static ZooKeeper zk =null; public static void testparent() throws Exception{ //判断是否存在这个父目录 Stat exists = zk.exists(parent, false); //如果不存在则创建 if(exists == null){ zk.create(parent, "IIIII".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(" not exit ,then create----"); } } public static void getconnect() throws Exception { zk =new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub System.out.print(event + "++++++" + event.getPath()); try { zk.getChildren(parent, true); } catch (Exception e) { } } }); } public static void registserver(String args) throws Exception{ //在父目录下创建一个临时的序列化子节点 String createsever = zk.create(parent+"/server", args.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(args + " is online " + createsever ); } public static void serverwork(String args) throws Exception { System.out.println(args + " start working ....."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { //// 获取zookeeper的链接 getconnect(); //先判断着个父目录是否存在,不存在就创建 testparent(); //在zookeeper上注册该服务器 registserver(args[0]); //执行这个服务器的相关业务 serverwork(args[0]); } }
client代码:
package bigdata.zkserertest; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class zkClientss { private static final String connectString = "lanc05:2181,lanc06:2181,lanc07:2181"; private static final int sessionTimeout =2000; private static final String parent="/servers"; static ZooKeeper zk =null; private static volatile List<String> serverList; public void getconnect() throws Exception { zk =new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { try { //再次注册监听,并更新信息 getznode(); } catch (Exception e) { } } }); } /** * 获取子节点的相关信息和数据 * @throws Exception */ public void getznode() throws Exception{ List<String> servers = new ArrayList<String>(); //获取子节点信息,并对父节点进行监听 List<String> children = zk.getChildren(parent, true); //使用true 就是使用上面的监听器 for(String child:children){ System.out.println(child); byte[] data = zk.getData(parent+"/"+child, false, null); //c把获取的数据给成员变量,以方便给各个客户端的业务使用 servers.add(new String(data)); } serverList =servers; System.out.println(serverList); } /** * 客户端业务 * @throws Exception */ public void clientwork() throws Exception { System.out.println("client starts working ....."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 获取zookeeper的链接 zkClientss zc= new zkClientss(); zc.getconnect(); //查看zookeeper下/servers目录下的子目录 zc.getznode(); //业务线程使用 zc.clientwork(); } }