Curator是对ZooKeeper的一个封装,其中curator-client是最底层的一个封装,主要是提供自动重连的功能
入口类 CuratorZookeeperClient本身是一个很简单的封装,只保存了retryPolicy和ensembleProvider,真正的连接管理都交给了ConnectionState来处理
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly) { retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null"); ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null"); this.connectionTimeoutMs = connectionTimeoutMs; state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly); setRetryPolicy(retryPolicy); }
ConnectionState
class ConnectionState implements Watcher, Closeable { private volatile long connectionStartMs = 0; private final Logger log = LoggerFactory.getLogger(getClass()); //负责管理Zookeeper连接 private final HandleHolder zooKeeper; private final AtomicBoolean isConnected = new AtomicBoolean(false); //zookeeper连接地址的provider private final EnsembleProvider ensembleProvider; private final int connectionTimeoutMs; private final AtomicReference<TracerDriver> tracer; private final Queue<Exception> backgroundExceptions = new ConcurrentLinkedQueue<Exception>(); 用户自定义的watcher private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>(); ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly) { this.ensembleProvider = ensembleProvider; this.connectionTimeoutMs = connectionTimeoutMs; this.tracer = tracer; if ( parentWatcher != null ) { parentWatchers.offer(parentWatcher); } //ZooKeeper真正的连接还是由HandleHolder来管理,注意到第二个参数watcher,使用的是this zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly); } }
启动时调用CuratorZookeeperClient.start(),该方法会再调用ConnectionState.start()
void start() throws Exception { log.debug("Starting"); ensembleProvider.start(); reset(); } private void reset() throws Exception { isConnected.set(false); connectionStartMs = System.currentTimeMillis(); zooKeeper.closeAndReset(); zooKeeper.getZooKeeper(); // initiate connection }
reset里调用了HandleHolder的closeAndRest()以及getZooKeeper()方法来进行初始化连接
class HandleHolder { private final ZookeeperFactory zookeeperFactory; private final Watcher watcher; private final EnsembleProvider ensembleProvider; private final int sessionTimeout; private final boolean canBeReadOnly; private volatile Helper helper; void closeAndReset() throws Exception { internalClose(); // first helper is synchronized when getZooKeeper is called. Subsequent calls // are not synchronized. helper = new Helper() { private volatile ZooKeeper zooKeeperHandle = null; private volatile String connectionString = null; @Override public ZooKeeper getZooKeeper() throws Exception { //这个锁是加在Helper对象上 synchronized(this) { if ( zooKeeperHandle == null ) { connectionString = ensembleProvider.getConnectionString(); //真正的创建Zookeeper对象 zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly); } //等待连接建立完成以后,替换掉helper实例以返回一个之前创建好的zooKeeperHandle helper = new Helper() { @Override public ZooKeeper getZooKeeper() throws Exception { return zooKeeperHandle; } @Override public String getConnectionString() { return connectionString; } }; return zooKeeperHandle; } } @Override public String getConnectionString() { return connectionString; } }; } ZooKeeper getZooKeeper() throws Exception { return helper.getZooKeeper(); } }
可以看到closeAndReset()调用是,创建了一个新的helper对象,但是此时zookeeper连接并没有创建出来
当调用getZooKeeper()时,在helper对象上加锁,并检查是否为null,以避免重复创建新的zookeeper对象
ZooKeeper对象创建完成以后,helper的引用会指向到一个新的匿名内部类对象,这个对象引用了之前创建的Zookeeper对象,这样当下次再调用getZooKeeper()时,就会直接返回
再回到ConnectionState上来,创建HandleHolder时,传入的Watcher是ConnectionState自己,我们来看看这段代码
@Override public void process(WatchedEvent event) { if ( LOG_EVENTS ) { log.debug("ConnectState watcher: " + event); } for ( Watcher parentWatcher : parentWatchers ) { TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get()); parentWatcher.process(event); timeTrace.commit(); } boolean wasConnected = isConnected.get(); boolean newIsConnected = wasConnected; if ( event.getType() == Watcher.Event.EventType.None ) { newIsConnected = checkState(event.getState(), wasConnected); } if ( newIsConnected != wasConnected ) { isConnected.set(newIsConnected); connectionStartMs = System.currentTimeMillis(); } } private boolean checkState(Event.KeeperState state, boolean wasConnected) { boolean isConnected = wasConnected; boolean checkNewConnectionString = true; switch ( state ) { default: case Disconnected: { isConnected = false; break; } case SyncConnected: case ConnectedReadOnly: { isConnected = true; break; } case AuthFailed: { isConnected = false; log.error("Authentication failed"); break; } case Expired: { isConnected = false; checkNewConnectionString = false; handleExpiredSession(); break; } case SaslAuthenticated: { // NOP break; } } if ( checkNewConnectionString && zooKeeper.hasNewConnectionString() ) { handleNewConnectionString(); } return isConnected; }
这段代码先将WatchEvent发送给之前注册的parentWatcher处理,然后再检查KeeperState
在checkState里可以看到,对于Disconnected和SyncConnected,只是处理当前连接的标志位,当Session Expired之后,应该就是对Zookeeper连接重新替换
private void handleExpiredSession() { log.warn("Session expired event received"); tracer.get().addCount("session-expired", 1); try { reset(); } catch ( Exception e ) { queueBackgroundException(e); } }
这里再次调用了reset()方法,再进入HandleHolder对象,关闭当前ZooKeeper,创建新的ZooKeeper并对外返回
最后是ConnectionState的getZooKeeper方法
ZooKeeper getZooKeeper() throws Exception { if ( SessionFailRetryLoop.sessionForThreadHasFailed() ) { throw new SessionFailRetryLoop.SessionFailedException(); } Exception exception = backgroundExceptions.poll(); if ( exception != null ) { log.error("Background exception caught", exception); tracer.get().addCount("background-exceptions", 1); throw exception; } boolean localIsConnected = isConnected.get(); if ( !localIsConnected ) { long elapsed = System.currentTimeMillis() - connectionStartMs; if ( elapsed >= connectionTimeoutMs ) { if ( zooKeeper.hasNewConnectionString() ) { handleNewConnectionString(); } else { KeeperException.ConnectionLossException connectionLossException = new KeeperException.ConnectionLossException(); if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) ) { log.error(String.format("Connection timed out for connection string (%s) and timeout (%d) / elapsed (%d)", zooKeeper.getConnectionString(), connectionTimeoutMs, elapsed), connectionLossException); } tracer.get().addCount("connections-timed-out", 1); throw connectionLossException; } } } return zooKeeper.getZooKeeper(); }
主要是检测当前的状态,如果Session超时或者连接中断,则抛出异常,否则返回HandlerHolder持有的连接
至于RetryPolicy是干嘛的,没有发现,猜测应该是在CuratorFramework里会使用到
总结
整个curator-client的核心代码就是这些了
curator-client把连接管理交给HandleHolder来处理,HandleHolder负责关闭已有连接并创建新连接,返回已创建的连接
而对于连接本身的管理是由ConnectionState在Watcher的回调里操作的,当Session Expired,让HandlerHolder重置并返回新连接