最近在项目中需要在多台机器中,固定一台执行相关任务,本来是写死ip的,但是想想这实在是太硬了,就想到用zk获取锁来搞。因为第一次用zk,碰见了一些常识性问题,在这里纪录一下。
想法一:
使用可重入共享锁,然后监听加锁的节点。
理由:如果机器挂掉了,或者在发布,重新部署,这个锁会被释放,节点发生改变。
TestingServer server = new TestingServer();
server.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString(server.getConnectString())
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client1.start();
InterProcessMutex lock = new InterProcessMutex(client1, PATH);
lock.acquire(10, TimeUnit.SECONDS)
CuratorFramework client2 = CuratorFrameworkFactory.builder().connectString(server.getConnectString())
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client2.start();
final NodeCache cache = new NodeCache(client2, PATH);
NodeCacheListener listener = new NodeCacheListener() {
public void nodeChanged() throws Exception {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
System.out.println(data.getPath());
} else {
System.out.println("节点被删除!");
}
}
};
cache.getListenable().addListener(listener);
cache.start();
结果:NodeCache只监听节点的增加,删除,数据变更。而加锁的情况是,在PATH路径下新增一个lock节点。释放锁是删掉lock节点。
想法二:
既然是PATH路径下新增节点,那就监听child,path下所有节点好了
PathChildrenCache cache = new PathChildrenCache(client2, PATH, true);
cache.start();
PathChildrenCacheListener cacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("事件类型:" + pathChildrenCacheEvent.getType());
if (null != pathChildrenCacheEvent.getData()) {
System.out.println("节点数据:" + pathChildrenCacheEvent.getData().getPath() + " = " + new String(pathChildrenCacheEvent.getData().getData()));
}
}
};
cache.getListenable().addListener(cacheListener);
根据测试代码打印的结果
client1 start
client2 start
client2 check node not exists after node cache
事件类型:CHILD_ADDED
节点数据:/base/test/_c_13f15fc3-03cd-4152-9632-11aa98a9b708-lock-0000000000 = 192.168.0.105
client1 get lock
事件类型:CHILD_ADDED
节点数据:/base/test/_c_568dfd7e-6bbf-408b-b1c8-ee278ab1f745-lock-0000000001 = 192.168.0.105
事件类型:CHILD_REMOVED
节点数据:/base/test/_c_568dfd7e-6bbf-408b-b1c8-ee278ab1f745-lock-0000000001 = 192.168.0.105
client2 does not get lock after client1 get lock
client1 find node exists after lock
client2 can check node if exists without lock
事件类型:CHILD_REMOVED
节点数据:/base/test/_c_13f15fc3-03cd-4152-9632-11aa98a9b708-lock-0000000000 = 192.168.0.105
client1 closed
client2 get lock after client closed
事件类型:CHILD_ADDED
节点数据:/base/test/_c_97f0605e-c1aa-45c6-a6ad-dfe82eab64d0-lock-0000000002 = 192.168.0.105
根据打印的日志,我们可以发现,当一个client试图去获取锁的时候,都会在PATH路径下增加一个lock节点,如果没有拿到锁,相关的节点就会被删除。这就是前面的两个ADD,一个REMOVE
后面的REMOVE就是关闭client1时发生的,接着的ADD就是client2获取锁时增加的。
测试代码:
public static void main(String[] args) throws Exception {
final String PATH = "/base/test";
TestingServer server = new TestingServer();
server.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString(server.getConnectString())
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
CuratorFramework client2 = CuratorFrameworkFactory.builder().connectString(server.getConnectString())
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client1.start();
System.out.println("client1 start");
client2.start();
System.out.println("client2 start");
// final NodeCache cache = new NodeCache(client2, PATH);
// NodeCacheListener listener = new NodeCacheListener() {
// public void nodeChanged() throws Exception {
// ChildData data = cache.getCurrentData();
// if (null != data) {
// System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
// System.out.println(data.getPath());
// } else {
// System.out.println("节点被删除!");
// }
// }
// };
// cache.getListenable().addListener(listener);
// cache.start();
PathChildrenCache cache = new PathChildrenCache(client2, PATH, true);
cache.start();
PathChildrenCacheListener cacheListener = new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("事件类型:" + pathChildrenCacheEvent.getType());
if (null != pathChildrenCacheEvent.getData()) {
System.out.println("节点数据:" + pathChildrenCacheEvent.getData().getPath() + " = " + new String(pathChildrenCacheEvent.getData().getData()));
}
}
};
cache.getListenable().addListener(cacheListener);
if(client2.checkExists().forPath(PATH) == null){
System.out.println("client2 check node not exists after node cache");
}else{
System.out.println("client2 check node exists after node cache");
}
InterProcessMutex lock = new InterProcessMutex(client1, PATH);
InterProcessMutex lock2 = new InterProcessMutex(client2, PATH);
if(lock.acquire(10, TimeUnit.SECONDS)){
System.out.println("client1 get lock");
if(lock2.acquire(10, TimeUnit.SECONDS)){
System.out.println("client2 get lock after client1 get lock");
}else{
System.out.println("client2 does not get lock after client1 get lock");
}
Stat stat = client1.checkExists().forPath(PATH);
if(stat == null){
System.out.println("client1 find node not exists after lock");
client1.inTransaction().create().withMode(CreateMode.EPHEMERAL).forPath(PATH)
.and().setData().forPath(PATH,String.valueOf(System.currentTimeMillis()).getBytes())
.and().commit();
if(client1.checkExists().forPath(PATH) == null){
System.out.println("client1 create node failed");
return;
}else {
System.out.println("client1 create node success");
}
}else{
System.out.println("client1 find node exists after lock");
}
if(client2.checkExists().forPath(PATH) == null){
System.out.println("client2 check node not exists");
}else{
System.out.println("client2 can check node if exists without lock");
CloseableUtils.closeQuietly(client1);
System.out.println("client1 closed");
if(lock2.acquire(10, TimeUnit.SECONDS)){
System.out.println("client2 get lock after client closed");
}else{
System.out.println("client2 does not get lock after client closed");
}
}
}else{
System.out.println("client1 do not get lock");
return;
}
}
这种做法需要自己去监控节点的变化,判断eventType,有没有其他更简单的方法呢。
想法三:
最后选择了curator整理好的leaderselector。这个地方就没什么坑了,网上的资料也是一大堆。
值得注意的就是:
1、takeLeadership方法,如果执行完,当前的实例会推出leader,重新进行选举,如果要一直保持leader,就写个死循环吧。
2、可以调用autoRequeue方法,将当前实例加入重新选举的队列,还有可能再次成为leader,否则就再也没有机会了。
3、最好做一下连接变化时的异常处理,可以继承LeaderSelectorListenerAdapter类。