做了幸运28源码下载论坛:haozbbs.com Q1446595067 两版实现,先是直接用zk的接口做的,后来又用curator做了个。主要是用来在集群环境中确定一个主节点来完成一些需要单独执行的任务(我的需求是向es中补充数据)。
自己实现的版本,主要思路是抢占式创建一个EPHEMERAL型zk路径,创建成功的就认为是leader,创建失败则作为follower监听此路径,当路径消失时再次抢占。创建zkelection时传入一个监听对象linstener,在zk相关事件触发时调用linstener的相关方法进行处理(比如在ApplicationContext中发布一个事件),此版本主要逻辑下:
@Deprecated
abstract class ZookeeperOperations implements Watcher, AutoCloseable{
private final static Logger logger=Logger.getLogger(ZookeeperElection.class);
private ZooKeeper zooKeeper;
private String hosts;
private String digest;
private int timeoutInMs;
public ZookeeperOperations( String hosts, int timeoutInMs) {
this(hosts,null,timeoutInMs);
}
public ZookeeperOperations(String hosts, String digest, int timeoutInMs) {
this.hosts = hosts;
this.digest = digest;
this.timeoutInMs = timeoutInMs;
}
protected void infoLog(String message){
if (logger.isInfoEnabled()){
logger.info(message);
}
}
protected synchronized ZooKeeper zooKeeper(){
if (zooKeeper==null){
reconnect();
}
return zooKeeper;
}
protected synchronized void reconnect(){
try{
if(zooKeeper!=null){
close();
}
}catch (Exception e){}
do{
infoLog("尝试连接到Zookeeper : "+hosts);
try {
zooKeeper=new ZooKeeper(hosts,timeoutInMs,this);
if (digest!=null){
zooKeeper.addAuthInfo("digest",digest.getBytes());
}
return ;
}catch (IOException e){
zooKeeper=null;
logger.error("Zookeeper连接失败,准备在20秒后重试",e);
}
try {
Thread.sleep(20000l);
} catch (InterruptedException e) {}
}while (zooKeeper==null);
}
@Override
public void close() throws InterruptedException {
if (zooKeeper!=null){
zooKeeper.close();
}
}
private ArrayList<ACL> acls(){
if(digest==null){
return ZooDefs.Ids.OPEN_ACL_UNSAFE;
}
try {
Id id1 = new Id("digest", DigestAuthenticationProvider.generateDigest(digest));
return new ArrayList<>(Collections.singletonList( new ACL(ZooDefs.Perms.ALL, id1)));
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
return ZooDefs.Ids.OPEN_ACL_UNSAFE;
}
/**
* 尝试递归的创建持久目录
* @param path
* @throws KeeperException
* @throws InterruptedException
*/
protected void createPathRecursive(ZooKeeper zooKeeper,String path) throws KeeperException, InterruptedException {
Stack<String> tmp=new Stack<>();
if(path.charAt(0)!='/'){
path="/"+path;
}
boolean needCheck=true;
infoLog("准备检查目录 "+path);
do{
String parent=path.substring(0,path.lastIndexOf('/'));
if(zooKeeper.exists(path,false)==null){
if ((!needCheck)||
parent.equals("")||
zooKeeper.exists(parent,false)!=null){
try {
infoLog("目录 "+path+" 不存在,准备创建");
zooKeeper.create(path,new byte[0], acls(), CreateMode.PERSISTENT);
infoLog("目录 "+path+" 创建成功");
} catch (KeeperException e) {
if(!(e instanceof KeeperException.NodeExistsException)){
throw e;
}else {
infoLog("目录 "+path+" 已被其它服务创建成功");
}
}
needCheck=false;
if(tmp.size()>0){
path=tmp.pop();
}else {
break;
}
}else {
tmp.push(path);
path=parent;
}
}else if(tmp.size()>0){
infoLog("目录 "+ path +" 已存在");
path=tmp.pop();
}else {
infoLog("目录 "+ path +" 已存在");
break;
}
}while (true);
}
}
@Deprecated
public class ZookeeperElection extends ZookeeperOperations implements Runnable {
private String root;
private ZkElectionListener listener;
private CountDownLatch latch=new CountDownLatch(1);
private final static Logger logger=Logger.getLogger(ZookeeperElection.class);
public ZookeeperElection(String hosts, String root, int timeoutInMs, ZkElectionListener listener) {
super(hosts,timeoutInMs);
this.root=root;
this.listener=listener;
}
public ZookeeperElection(String hosts, String root, String digest,int timeoutInMs, ZkElectionListener listener) {
super(hosts,digest,timeoutInMs);
this.root=root;
this.listener=listener;
}
private String leaderPath(){
return root+"/leader";
}
private String nodePath(){
return root+"/node";
}
@Override
public void run() {
ZooKeeper zooKeeper=zooKeeper();
try {
String zNode=leaderPath();
//循环尝试获取
do{
Stat stat=zooKeeper.exists(root,false);
if (stat==null){
createPathRecursive(zooKeeper,root);
}
boolean wait=false;
if(!findLeader(zNode)){
boolean success=tryLeader(zNode);
if(success){
infoLog(Thread.currentThread().getName()+" 已选举作为leader节点");
listener.onLeader();
wait=true;
}
}else {
infoLog(Thread.currentThread().getName()+" 已选举作为follower节点");
// tryFollow(zNode,stateListener);
listener.onFollower();
wait=true;
}
if (wait){
if(latch.getCount()==0){
latch=new CountDownLatch(1);
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
infoLog(Thread.currentThread().getName()+" 选举失败,3秒后重拾");
Thread.sleep(3000);
}
}while (true);
} catch (KeeperException e) {
logger.error("连接zookeeper失败",e);
} catch (InterruptedException e) {
logger.error(e);
}
}
/**
* 生成当前服务器标识
* @return
*/
protected byte[] serverTag(){
try {
return InetAddress.getLocalHost().getAddress();
} catch (Exception e) {}
return new byte[0];
}
private boolean findLeader(String zNode){
try {
byte[] data=zooKeeper().getData(zNode,true,null);
return data!=null;
} catch (KeeperException e) {
if(e instanceof KeeperException.NoNodeException){
return false;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
private boolean tryLeader(String zNode) throws KeeperException {
ZooKeeper zooKeeper=zooKeeper();
String leader=null;
try {
leader=zooKeeper.create(zNode,serverTag(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zooKeeper.exists(zNode,true);
if(logger.isDebugEnabled()){
logger.debug("leader path : "+leader);
}
} catch (KeeperException e) {
if (e instanceof KeeperException.NodeExistsException){
logger.error(Thread.currentThread().getName()+" 参与选举Leader竞争失败,已有其它node被选举为Leader");
}else {
throw e;
}
} catch (Exception e) {
logger.error(Thread.currentThread().getName()+" 参与选举Leader失败",e);
}
if(leader!=null){
return true;
}
return false;
}
private boolean tryFollow(String zNode){
ZooKeeper zooKeeper=zooKeeper();
try {
String node=zooKeeper.create(nodePath(),serverTag(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
infoLog(node+" 节点已注册");
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public void process(WatchedEvent event) {
Event.EventType type=event.getType();
switch (type){
case None:{
//连接事件处理
choseState(event.getState());
} break;
case NodeDataChanged:{
infoLog(event.getPath()+" 数据变动");
} break;
case NodeCreated: {
infoLog(event.getPath()+" 数据新建");
} break;
case NodeChildrenChanged:{
infoLog(event.getPath()+" 子数据变动");
} break;
case NodeDeleted:{
infoLog(event.getPath()+" 数据删除");
if(event.getPath().equals(leaderPath())){
listener.onLeaderOff();
latch.countDown();
}
} break;
}
}
private void choseState(Event.KeeperState state){
switch (state){
case AuthFailed:{
logger.error("Zookeeper连接验证失败");
listener.onDisconnect();
} break;
case SyncConnected:{
infoLog("Zookeeper节点连接成功");
listener.onConnect();
} break;
case Disconnected:{
infoLog("Zookeeper连接已断开");
listener.onDisconnect();
} break;
case Expired:{
infoLog("Zookeeper会话超时断开,正在准备重新连接");
listener.onDisconnect();
reconnect();
latch.countDown();
}
}
}
}
此方法逼格不高,所以后来又用curator的分布式同步锁机制做了另外的实现。
虽然curator实现了一个选举,但他的实现我觉得存在一点问题。比如当一个节点被选举为leader时,并没有对自身路径进行监听。当你主动删除了zk上的leader节点路径时(比如版本平滑升级时),follower节点已经进行下一轮选举并选出新的leader节点了,但之前的leader节点依然认为自己是leader节点,还会继续执行leader节点的操作。所以用了他的锁机制自己又弄了下。
大概如下:
/**
* 使用Curator做的leader选举,类似LeaderSelector机制,但做了自我lockpath路径监听。
* <br></>LeaderSelector的缺点在于它没有监听自身lockpath的路径变更。
* 因此当手工删除lockpath时,它不会发现其它follower已经升级为leader,还会认为自己是leader。
* Created by lewis on 18-6-21.
*/
public class ZkCuratorElection extends ZkCurator implements AutoCloseable{
private final static Logger logger=Logger.getLogger(ZkCuratorElection.class);
public static ZkCuratorElection create(String connectString, int retryIntervalInMs, int retryTimes){
return new ZkCuratorElection(connectString,retryIntervalInMs,retryTimes);
}
protected ZkCuratorElection(String connectString, int retryIntervalInMs, int retryTimes) {
super(connectString, retryIntervalInMs, retryTimes);
}
public LeaderTask tryLeader(@NotNull String path, String id){
return new LeaderTask(this,(Void v)->{
start();
PathObserverMutex mutex=new PathObserverMutex(client,path);
mutex.setId(id);
return mutex;
});
}
@Override
public void close() throws Exception {
super.close();
}
private AtomicBoolean hasRun=new AtomicBoolean(false);
/**
* 用于接受参数
*/
public static class LeaderTask {
private volatile boolean loopInLeaderShip=false;
private volatile boolean autoQueueOnLost=false;
private ZkCuratorElection election;
private Function<Void,PathObserverMutex> prepareStart;
/**
* 获得锁后循环调用任务
* @return
*/
public LeaderTask loopInLeaderShip(){
this.loopInLeaderShip=true;
return this;
}
/**
* 失去锁后重新加入请求锁的队列
* @return
*/
public LeaderTask autoQueueAfterLost(){
this.autoQueueOnLost=true;
return this;
}
private LeaderTask(ZkCuratorElection election, Function<Void,PathObserverMutex> prepareStart) {
this.election = election;
this.prepareStart = prepareStart;
}
/**
* 设置要相应的任务
* @param function
* @return
*/
public ClosableLockerThread accept(Runnable function){
return new ClosableLockerThread(){
public void stopNextRun(){
loopInLeaderShip=false;
autoQueueOnLost=false;
}
private PathObserverMutex mutex;
private Function<Void,Void> triggerStart;
private Function<Void,Void> triggerGet;
private Function<Void,Void> triggerLost;
private Function<Void,Void> triggerRelease;
private Function<Void,Void> triggerExit;
@Override
public void run() {
try {
if(!election.hasRun.compareAndSet(false,true)){
throw new RuntimeException("任务不能多次启动");
}
this.mutex=prepareStart.apply(null);
mutex.setTriggerGet(triggerGet);
mutex.setTriggerLost(triggerLost);
mutex.setTriggerRelease(triggerRelease);
if(triggerStart!=null){
triggerStart.apply(null);
}
do{
try{
mutex.acquire();
if (logger.isDebugEnabled()){
logger.debug(Thread.currentThread().getName()+" 获得zk锁,开始执行分配的任务");
}
do{
if(mutex.holdingLock()){
function.run();
}
}while (mutex.holdingLock()&&loopInLeaderShip);
if (logger.isDebugEnabled()){
logger.debug(Thread.currentThread().getName()+" 失去zk锁,分配的任务不再执行");
}
}finally {
mutex.release();
}
}while (autoQueueOnLost);
}catch (Exception e){
logger.warn("job 运行出现错误,即将退出执行",e);
}finally {
if (logger.isDebugEnabled()){
logger.debug(Thread.currentThread().getName()+" 任务已完全退出,准备关闭zk连接");
}
try {
election.close();
} catch (Exception e) {}
if(triggerExit!=null){
triggerExit.apply(null);
}
}
}
public boolean holdingLock() {
return mutex!=null&&mutex.holdingLock();
}
@Override
public ClosableLockerThread onStart(Function<Void, Void> trigger) {
this.triggerStart=trigger;
return this;
}
@Override
public ClosableLockerThread onGetLock(Function<Void, Void> trigger) {
this.triggerGet=trigger;
return this;
}
@Override
public ClosableLockerThread onLostLock(Function<Void, Void> trigger) {
this.triggerLost=trigger;
return this;
}
@Override
public ClosableLockerThread onReleaseLock(Function<Void, Void> trigger) {
this.triggerRelease=trigger;
return this;
}
@Override
public ClosableLockerThread onExit(Function<Void, Void> trigger) {
triggerExit=trigger;
return this;
}
};
}
}
public static abstract class ClosableLockerThread extends Thread {
/**
* 下一轮任务将不再执行,退出线程
*/
public abstract void stopNextRun();
/**
* 当前线程是否拿到锁了
* @return
*/
public abstract boolean holdingLock();
/**
* 当线程准备完毕,开始尝试竞争锁前调用
* @param trigger
* @return
*/
public abstract ClosableLockerThread onStart(Function<Void,Void> trigger);
/**
* 得到锁时会回调
* @param trigger
* @return
*/
public abstract ClosableLockerThread onGetLock(Function<Void,Void> trigger);
/**
* 失去锁时会回调
* @param trigger
* @return
*/
public abstract ClosableLockerThread onLostLock(Function<Void,Void> trigger);
/**
*
* @param trigger
* @return
*/
public abstract ClosableLockerThread onReleaseLock(Function<Void,Void> trigger);
/**
* 线程退出时会回调
* @param trigger
* @return
*/
public abstract ClosableLockerThread onExit(Function<Void,Void> trigger);
}
/**
* 带路径监控的锁
*/
public static class PathObserverMutex extends InterProcessMutex{
private CuratorFramework client;
private volatile boolean holdingLock=false;
private String id;
private Function<Void,Void> triggerGet;
private Function<Void,Void> triggerRelease;
private Function<Void,Void> triggerLost;
public PathObserverMutex(CuratorFramework client, String path) {
super(client, path);
this.client=client;
}
public PathObserverMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
super(client, path, driver);
this.client=client;
}
@Override
public void acquire() throws Exception {
super.acquire();
holdingLock=true;
this.client.checkExists().usingWatcher(new PathDeleteWatcher()).inBackground().forPath(getLockPath());
if(triggerGet!=null){
triggerGet.apply(null);
}
}
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception {
holdingLock=super.acquire(time, unit);
if (holdingLock){
this.client.checkExists().usingWatcher(new PathDeleteWatcher()).inBackground().forPath(getLockPath());
if(triggerGet!=null){
triggerGet.apply(null);
}
}
return holdingLock;
}
@Override
protected byte[] getLockNodeBytes() {
return (id!=null)?id.getBytes(Charset.forName("UTF-8")):super.getLockNodeBytes();
}
public boolean holdingLock(){
return holdingLock;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public void setTriggerGet(Function<Void, Void> triggerGet) {
this.triggerGet = triggerGet;
}
public void setTriggerLost(Function<Void, Void> triggerLost) {
this.triggerLost = triggerLost;
}
public void setTriggerRelease(Function<Void, Void> triggerRelease) {
this.triggerRelease = triggerRelease;
}
@Override
public void release() throws Exception {
super.release();
if(this.triggerRelease!=null){
triggerRelease.apply(null);
}
}
private class PathDeleteWatcher implements Watcher{
private void rebuildWatcher(){
//对于其它事件相应,则因为监听已失效,需要补一次监听
PathDeleteWatcher watcher=new PathDeleteWatcher();
try {
PathObserverMutex.this.client
.checkExists()
.usingWatcher(watcher)
.inBackground()
.forPath(getLockPath());
} catch (Exception e) {
//如果监听添加失败,则认为已失去锁
PathObserverMutex.this.holdingLock=false;
logger.warn("构造监听事件异常",e);
}
}
private void resetLost(){
boolean beforeHold=PathObserverMutex.this.holdingLock;
PathObserverMutex.this.holdingLock=false;
if(beforeHold&&PathObserverMutex.this.triggerLost!=null){
PathObserverMutex.this.triggerLost.apply(null);
}
}
@Override
public void process(WatchedEvent event) {
if (logger.isDebugEnabled()){
logger.debug("路径事件触发 "+event.toString());
}
switch (event.getState()){
case SyncConnected: tellType(event.getType()); break;
default: resetLost(); break;
}
}
private void tellType(Event.EventType type){
switch (type){
case NodeDeleted:{
//如果是节点已被删除,则认为已失去锁
resetLost();
} break;
default: rebuildWatcher();
}
}