curator监听zk临时节点实现“信息中心服务”

我们使用curator建立连接,curator有session维护,重试机制,对递归创建节点和删除节点有较好的支持:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString("10.10.210.123:2181")
                        .sessionTimeoutMs(50000000)
                        .connectionTimeoutMs(50000000)
                        .retryPolicy(retryPolicy)
                        .build();
client.start();//start以后就可以使用Curator操作zk了,用完了以后不要忘记释放调用close方法。

常规代码可以先判断是否存在后,创建消息通讯的根节点:

 Stat s = client.checkExists().forPath("/msg_node_list");
            if (s == null) {
                String o = client.create().withMode(CreateMode.PERSISTENT).forPath("/msg_node_list");
            }

针对我们的每一个netty服务,根据自己的服务id向msg_node_list注册临时节点:

client.create().withMode(CreateMode.EPHEMERAL).forPath("/msg_node_list/b");

而我们的监控中心,负责给客户端分发具体的netty链接服务器,则需要轮询监听每一个临时子节点的状态,并且通过watch监听异常下线节点,去redis中更新服务列表,从而给客户端提供消息服务:

List<String> list = client.getChildren().forPath("/msg_node_list");
//遍历list中的每一个临时子节点后调用如下代码,参数为list中取出的临时节点名称:
client.getChildren().usingWatcher(w).forPath("/msg_node_list/b");
 Watcher w = new Watcher() {
        @Override
        public void process(WatchedEvent watchedEvent) {
            Event.EventType type = watchedEvent.getType();
            if (type.equals(Event.EventType.NodeDeleted)) {
                System.out.println(watchedEvent.getPath());
            }
        }
    };

如果客户端连接的netty服务异常下线,则在次访问监控服务器获取可用的netty地址。

redis中维护着一个列表,每个netty服务器对应的客户端id,保存在一个redis的set中

监控服务器在内部维护了一个持有netty服务器地址的集合,并根据客户端的连接数量由少到多的排列,每次优先分配持有客户端最少的netty服务

如果该netty服务不可用,并且列表中未及时删除。

猜你喜欢

转载自www.cnblogs.com/zzq-include/p/12124898.html