前面几篇zookeeper的文章简单分析了执行流程,接下来打算从横向来分析一下zk的一些特性,先从session开始。
这一篇http://iwinit.iteye.com/blog/1754611分析了单机情况下session建立,在集群环境下建立session不太一样,是一个proposal的过程,先假设集群由leader,followerA,followerB组成,我们的client去连followerA。follower和leader初始化之后,初始化的sessionTracker不一样,leader中是SessionTrackerImpl,follower中是LearnerSessionTracker,主要区别和类同点:
1.follower中不会启动超时检查线程,只是简单得记录了session信息,主要数据结构是
//session更新信息 HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>(); //session超时信息 private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
而leader会启动超时线程,而且数据结构也多一些
//session实体 HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>(); //同一个超时时间点的session,给超时线程用 HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>(); //session超时信息 ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
2.addSession时,leader会创建session实体,follower只是简单的记录了一下session信息
3.sessionId初始化算法一样
//1字节server_id+当前时间的后5个字节+2字节0,保证全局唯一 public static long initializeNextSession(long id) { long nextSid = 0; nextSid = (System.currentTimeMillis() << 24) >> 8; nextSid = nextSid | (id <<56); return nextSid; }
连下来client开始创建session
1.客户端发送ConnectionRequest给followerA
2.followerA处理
session超时时间协商
//处理session时间,minSessionTimeout为ticktime2倍,maxSessionTimeout为ticktime20倍 int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); //客户端发送的sessionid,重试时不为0 long sessionId = connReq.getSessionId(); //客户端重试,则reopen,后文分析 if (sessionId != 0) { long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } //创建session else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); }
sessionid和密码初始化
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { //sessionid递增 long sessionId = sessionTracker.createSession(timeout); //随机密码 Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); //4个字节的超时时间 ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); //异步提交执行链 submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId; }
提交请求
private void submitRequest(ServerCnxn cnxn, long sessionId, int type, int xid, ByteBuffer bb, List<Id> authInfo) { //初始化时,xid为0,bb为4个字节的session超时时间 Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); submitRequest(si); }
根据之前http://iwinit.iteye.com/blog/1777109分析的Processor链图,FollowerRequestProcessor执行。和create请求一样,createSession是事务请求需要投票,FollowerA发送投票packet给leader。
3.leader处理,learnerHandler中收到request请求,提交leader的processor链
PrepRequestProcessor处理
//头信息 request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type); .... case OpCode.createSession: request.request.rewind(); int to = request.request.getInt(); //txn信息,就是一个session超时 request.txn = new CreateSessionTxn(to); request.request.rewind(); //leader中创建session zks.sessionTracker.addSession(request.sessionId, to); //owner属于followerA zks.setOwner(request.sessionId, request.getOwner()); break;
之后ProposalRequestProcessor发起投票,并写入log
4.followerA和followerB处理投票
public void logRequest(TxnHeader hdr, Record txn) { Request request = new Request(null, hdr.getClientId(), hdr.getCxid(), hdr.getType(), null, null); request.hdr = hdr; request.txn = txn; request.zxid = hdr.getZxid(); //添加到pending队列 if ((request.zxid & 0xffffffffL) != 0) { pendingTxns.add(request); } /写入log,并发送ack包给leader syncProcessor.processRequest(request); }
5.leader收到投票,发起commit,然后自己commit
6.leader的FinalRequestProcessor处理,添加session,不需要返回client数据
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); //createSession不需要修改db状态,啥都不做 rc = getZKDatabase().processTxn(hdr, txn); //又添加了一次session if (opCode == OpCode.createSession) { if (txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.addSession(sessionId, cst .getTimeOut()); } else { LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } return rc; }
7.followerA和followerB处理commit,区别在于CommitRequestProcessor中,followerA中的Request会带上connection信息而followerB中的reqesut没有connection信息。所以在FinalRequestProcessor中,followerB创立完session就返回了,而followerA还需要写回client响应
最后看下leader的sessionTracker超时机制,构造SessionTrackerImpl
public SessionTrackerImpl(SessionExpirer expirer, ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime, long sid) { super("SessionTracker"); this.expirer = expirer; this.expirationInterval = tickTime; this.sessionsWithTimeout = sessionsWithTimeout; //下一个检查点,向上取整为expirationInterval倍数,expirationInterval就是配置的ticktime nextExpirationTime = roundToInterval(System.currentTimeMillis()); this.nextSessionId = initializeNextSession(sid); for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); } }
主线程循环
synchronized public void run() { try { while (running) { //下一个超时点没到,就等待 currentTime = System.currentTimeMillis(); if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue; } //同一个时间点超时的session SessionSet set; set = sessionSets.remove(nextExpirationTime); //超时session处理 if (set != null) { for (SessionImpl s : set.sessions) { setSessionClosing(s.sessionId); expirer.expire(s); } } //下一个check point nextExpirationTime += expirationInterval; } } catch (InterruptedException e) { LOG.error("Unexpected interruption", e); } LOG.info("SessionTrackerImpl exited loop!"); }
session更新时
synchronized public boolean touchSession(long sessionId, int timeout) { ...... //session对象 SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing if (s == null || s.isClosing()) { return false; } //这个session的下一个超时点,向上取整为ticktime倍数 long expireTime = roundToInterval(System.currentTimeMillis() + timeout); //时间点比老的时间还小,不更新 if (s.tickTime >= expireTime) { // Nothing needs to be done return true; } //先从老的超时set中remove掉,再添加到新的set中,超时线程会定时check SessionSet set = sessionSets.get(s.tickTime); if (set != null) { set.sessions.remove(s); } //下个超时点 s.tickTime = expireTime; //添加到新的超时set中 set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s); return true; }
简单小节
1.session创建需要投票处理
2.结果是每台server上的内存中都会建立相同的session记录
3.sessionid通过serverid+时间保证唯一
4.session超时检查由leader负责,以ticktime定时检查
5.session更新时,会修改自己这个session所属的超时set,超时时间是ticktime倍数