依赖如下:
说明:zk原生API不好用,所以采用另一套。
<!-- <repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories> -->
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
java代码连接zk集群并进行增加节点、删除节点、改变节点、查询数据以及监控等操作,代码如下:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class TestZnode {
public static void main(String[] args) throws Exception {
//重试机制
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
//获取客户端对象
CuratorFramework client = CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 1000, 1000, retryPolicy);
//开启客户端操作
client.start();
//创建节点
//client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/test_1/test_1_1");
//修改节点
//client.setData().forPath("/test_1","java001".getBytes());
//节点数据查询
//byte[] test_1 = client.getData().forPath("/test_1");
//System.out.println(new String(test_1));
//watch机制
//设置监听哪个路径
TreeCache treeCache = new TreeCache(client, "/test_1");
//设置监听器和出来过程
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data = treeCacheEvent.getData();
if (data!=null){
switch (treeCacheEvent.getType()){
case NODE_ADDED:
System.out.println("节点增加:"+new String(data.getPath())+" "+new String(data.getData()));
break;
case NODE_REMOVED:
System.out.println("节点删除:"+new String(data.getPath())+" ");
break;
case NODE_UPDATED:
System.out.println("节点修改:"+new String(data.getPath())+" "+new String(data.getData()));
break;
default:
break;
}
}else {
System.out.println("数据为空"+treeCacheEvent.getType());
}
}
});
//开始监听
treeCache.start();
Thread.sleep(50000000);
//关闭客户端
client.close();
}
}