curator简介
curator是Netflix开源的一个zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册、分布式计数器、分布式锁等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。Curator使用的链式编程风格。如果只需要zookeeper连接管理和重试功能可以只引入curator-framework。
官网地址:http://curator.apache.org/
curator包含下面几个模块,对于大多数人来说引入curator-recipes
即可。
兼容性
Apache Curator旨在与ZooKeeper 3.5+一起使用。 但是,它也与ZooKeeper 3.4.x兼容。
详情:http://curator.apache.org/zk-compatibility.html
curator基本使用
curator依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.9.1</version>
</dependency>
我这里版本比较低,官网版本已经到3.5+了。
创建连接
// 重试策略,重试间隔时间为1秒,重试次数为10次。curator管理了zookeeper的连接,在操作zookeeper的过程中出现连接问题会自动重试。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,10);
// 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZOOKEEPER_ADDRESS) // zookeeper地址
.retryPolicy(retryPolicy) // 重试策略
.connectionTimeoutMs(15000) // 连接超时时间,默认15秒
.sessionTimeoutMs(5000) // 会话超时时间ms,默认60秒
.build();
// 打开连接
cf.start();
System.out.println("state:" + cf.getState());
也可以这样创建:
CuratorFramework cf = CuratorFrameworkFactory.newClient(ZOOKEEPER_ADDRESS,5000,15000,retryPolicy);
创建节点
cf.create()
.creatingParentsIfNeeded() // 自动创建父节点
.withMode(CreateMode.PERSISTENT) // 模式:持久/临时/有序/无序
.forPath("/lock/ms10000","10000".getBytes()); // 指定路径和数据
检查节点是否存在
Stat stat = cf.checkExists().forPath("/lock/ms10000");
获取节点数据
String nodeData = new String(cf.getData().forPath("/lock/ms10000"));
修改节点数据
cf.setData().forPath("/lock/ms10000","ms-10001".getBytes());
获取子节点
List<String> nodes = cf.getChildren().forPath("/lock");
System.out.println("/lock的子节点为:");
for (String node : nodes) {
System.out.println(node);
}
删除子节点
cf.delete()
.guaranteed()//保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
.deletingChildrenIfNeeded()//若当前节点包含子节点
.forPath("/lock");
异步绑定回调方法
比如创建节点时绑定一个回调函数,该回调函数可以输出服务器的状态码和事件类型。还可以加入一个线程池进行优化操作。
ExecutorService pool = Executors.newCachedThreadPool();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {
System.out.println("code:" + ce.getResultCode());
System.out.println("type:" + ce.getType());
System.out.println("线程为:" + Thread.currentThread().getName());
}
}, pool)
.forPath("/super/c3","c3内容".getBytes());
节点监听
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 5000, retry);
client.start();
final NodeCache cache = new NodeCache(client,"/node_1");
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] res = cache.getCurrentData().getData();
System.out.println("data: " + new String(res));
}
});
子节点监听
final PathChildrenCache cache = new PathChildrenCache(client,"/node_1",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("add:" + event.getData());
break;
case CHILD_UPDATED:
System.out.println("update:" + event.getData());
break;
case CHILD_REMOVED:
System.out.println("remove:" + event.getData());
break;
default:
break;
}
}
});
ACL权限
RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 5000, retry);
client.start();
//ACL有IP授权和用户名密码访问的模式
ACL aclRoot = new ACL(Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:root")));
List<ACL> aclList = new ArrayList<ACL>();
aclList.add(aclRoot);
String path = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.withACL(aclList)
.forPath("/node_3/node_ACL","2".getBytes());
System.out.println(path);
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.0.3:2181")
.sessionTimeoutMs(5000)//会话超时时间
.connectionTimeoutMs(5000)//连接超时时间
.authorization("digest","root:root".getBytes())//权限访问
.retryPolicy(retry)
.build();
client1.start();
String re = new String(client1.getData().forPath("/node_3/node_ACL"));
System.out.println(re);
分布式锁
在http://curator.apache.org/getting-started.html有说明,核心代码就2行。
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) // 尝试在规定的时间内获取锁
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release(); // 释放锁
}
}
这里来一个例子。
public class CuratorTest {
private final static String ZOOKEEPER_ADDRESS = "127.0.0.1:2181";
public static void main(String[] args) throws Exception {
// 重试策略,重试间隔时间为1秒,重试次数为10次。curator管理了zookeeper的连接,在操作zookeeper的过程中出现连接问题会自动重试。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,10);
// 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(ZOOKEEPER_ADDRESS)
.connectionTimeoutMs(15000)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(5000)
.build();
// 或者这样创建连接
// CuratorFramework cf = CuratorFrameworkFactory.newClient(ZOOKEEPER_ADDRESS,5000,15000,retryPolicy);
// 打开连接
cf.start();
System.out.println("=======================state:" + cf.getState());
String orderNo = "10001";
// 模拟数据库操作
DB db = new DB();
ExecutorService executorService = Executors.newFixedThreadPool(5);
// 这里有3个线程同时执行同一个订单号的操作
executorService.submit(new DistributedWorkerThread(cf,db,orderNo));
executorService.submit(new DistributedWorkerThread(cf,db,orderNo));
executorService.submit(new DistributedWorkerThread(cf,db,orderNo));
executorService.shutdown();
Thread.sleep(15000);
cf.close();
}
static class DB {
private List<String> orders = new ArrayList<>();
public void add(String order) {
if (!exist(order)) {
this.orders.add(order);
}
}
public boolean exist(String order) {
return this.orders.contains(order);
}
}
static class DistributedWorkerThread implements Runnable {
private CuratorFramework cf;
private String orderNo;
private DB db;
public DistributedWorkerThread(CuratorFramework cf,DB db,String orderNo) {
this.cf = cf;
this.db = db;
this.orderNo = orderNo;
}
public void run() {
String lockNode = "/lock/ms"+orderNo;
InterProcessMutex lock = new InterProcessMutex(cf,lockNode);
try {
if (lock.acquire(10, TimeUnit.SECONDS)) { // 这里也可以不指定时间,就是一直等直到获取到锁。
try {
if (db.exist(orderNo)) {
System.out.println(Thread.currentThread().getName() + "订单已存在!");
return;
}
System.out.println(Thread.currentThread().getName() + "在这里执行你的业务代码..orderNo:" + orderNo);
this.db.add(orderNo);
Thread.sleep(1500L);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.release();
System.out.println(Thread.currentThread().getName() + "执行完毕!");
}
}
else {
System.out.println(Thread.currentThread().getName() + "未获取到锁..");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
我们看下日志:
pool-3-thread-3在这里执行你的业务代码..orderNo:10001
pool-3-thread-3执行完毕!
pool-3-thread-1订单已存在!
pool-3-thread-1执行完毕!
pool-3-thread-2订单已存在!
pool-3-thread-2执行完毕!
Curator分布式锁的分析:https://blog.csdn.net/qiangcuo6087/article/details/79067136
Leader选举
http://curator.apache.org/getting-started.html示例
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
public void takeLeadership(CuratorFramework client) throws Exception
{
// this callback will get called when you are the leader
// do whatever leader work you need to and only exit
// this method when you want to relinquish leadership
}
}
LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue(); // not required, but this is behavior that you will probably expect
selector.start();
问题
1.java.io.IOException: Xid out of order. Got Xid 3 with err -101 expected Xid 2 for a packet with details
java.io.IOException: Xid out of order. Got Xid 3 with err -101 expected Xid 2 for a packet with details: clientPath:null serverPath:null finished:false header:: 2,15 replyHeader:: 0,0,-4 request:: '/lock/ms10000,#3130303030,v{s{31,s{'world,'anyone}}},0 response::
at org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:892)
at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:101)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1214)
参考:https://blog.csdn.net/xiaojiahao_kevin/article/details/51203083
参考:https://blog.csdn.net/qq_15370821/article/details/73692288,<>