zk的选举流程涉及很多个线程的并发控制 明确每个线程的职责后 进行流程的具体分析
- 首先准备好环境 https://blog.csdn.net/zhaoyu_nb/article/details/88663599
- 正式开始 其实是从makeLEStrategy().lookForLeader()方法的调用开始的 这里根据配置文件的electorArg来指定具体的选举算法,一般默认使用3 其他的都弃用了
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = createCnxnManager();
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
- lookForLeader()做的第一件事 就是 sendNotifications();
private void sendNotifications() {
for (QuorumServer server : self.getVotingView().values()) {
long sid = server.id;
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch);
// if(LOG.isDebugEnabled()){
LOG.info("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
// }
sendqueue.offer(notmsg);
}
}
其中 : self.getVotingView()来自下面
QuorumPeer
public Map<Long,QuorumPeer.QuorumServer> getView() {
return Collections.unmodifiableMap(this.quorumPeers);
}
/**
* Observers are not contained in this view, only nodes with
* PeerType=PARTICIPANT.
*/
public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
return QuorumPeer.viewToVotingView(getView());
}
QuorumPeerMain
quorumPeer.setQuorumPeers(config.getServers());
这里将自己的投票信息 挨个发给自己集群中的每个节点
sendqueue.offer(notmsg);这个方法只是把要发送的数据放到队列中 结合前面一篇文章 我们知道这个消息 其实最后会被
FastLeaderElection.Messenger.WorkerSender线程交给QuorumCnxManager的queueSendMap 同时进行连接
public void toSend(Long sid, ByteBuffer b) {
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid)); // recvQueue 存放收到的消息
} else {
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
ArrayBlockingQueue<ByteBuffer> bqExisting = queueSendMap.putIfAbsent(sid, bq); queueSendMap 存放发送的消息
if (bqExisting != null) {
addToSendQueue(bqExisting, b);
} else {
addToSendQueue(bq, b);
}
connectOne(sid);
}
ps :
这里还有一点需要主要 在QuorumCnxManager.Listener 接收其他节点连接的时候会比较myid 如果小于自己的myid 那么它会主动断开这个连接 因为默认在选举的时候 会有比自己myid大的节点
可是在 connectOne(sid);方法里面 刚好是反过来的
这样的最终结果就是 在开始选选举的时候 sendNotifications() 时候 只向myid比自己小的节点发送自己的选票信息,根据后面策略 myid大的有优先做为leaer的条件 一开始就抢选票 所以开始的时候 myid越小 收到的选票越多 处理的东西也越多 反过来myid越大 收到的选票也越少 不过zk 选择leader不只看myid 还有其他的条件
connectOne(sid);
QuorumCnxManager.initiateConnection(Socket sock, Long sid)
QuorumCnxManager.startConnection(Socket sock, Long sid)
// If lost the challenge, then drop the new connection
if (sid > this.mySid) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + this.mySid + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
- 选票的处理
4.1 WorkerReceiver 的处理
这里要提一点 与其他节点的数据交换都是通过
QuorumCnxManager 来完成的 FastLeaderElection的两个工作线程 实际是将要发送的数据 交给queueSendMap
从recvQueue 拿基础数据 然后做一点处理 变成自己能用的东西
QuorumCnxManager.SendWorker ====>queueSendMap;
QuorumCnxManager.RecvWorker ======>recvQueue;
public final ArrayBlockingQueue recvQueue;
- 从QuorumCnxManager的接收消息对列中获取消息
/*
* If it is from an observer, respond right away.
* Note that the following predicate assumes that
* if a server is not a follower, then it must be
* an observer. If we ever have any other type of
* learner in the future, we'll have to change the
* way we check for observers.
*/
if(!self.getVotingView().containsKey(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
这一步是比较有意思的 因为在.lookForLeader()处理投票的方法中有这么一条 所以上一条处理投票的代码 用处估计只有一个
那就集群新增节点 不过这样没有处理之前的配置文件 好像不太健壮 是不是通过jmx修改之前server的配置?
if(self.getVotingView().containsKey(n.sid)) {...}
else
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
- 将buffer中的消息读取出来
- 根据消息的状态处理消息
1. 如果自己的状态是如果也为looking 放入 recvqueue(此时投票的状态可能是leading或者follower)
2. 如果自己的状态是如果也为looking 判断该消息状态 如果也为lookig 同时逻辑时钟小于自己的 则向该服务发送一条消息 leader为自己选举的leader(不一定是自己)相当于拉票了
3. 如果自己的状态不是looking状态 请求的服务的状态是 looking 向该服务发送自己当前的投票信息
4.2 lookForLeader处理投票信息
FastLeaderElection.lookForLeader()
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); 这里存放的是节点状态为looking的
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();这里存放的是leading following的
-
处理looking状态的投票
- 当自己服务的状态为looking的时候
Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);
会从recvqueue队列里拿消息 - 根据集群内的server返回的消息进行处理 如果不是集群内配置的服务 直接跳过这个消息 打印警告日志
- 先与获取的信息进行比较 (这里就开始决定选哪个了)
- 如果自身的逻辑时钟较小 则删队列中已经获取到的消息 更新选票的信息 然后发送notify消息
- 如果自身的逻辑时钟较大 则直接忽略该消息
- 如果逻辑时钟一样 比较信息 然后发送notify消息 - 将获取到的消息存recvset的Map中 sid->vote
- 先与获取的信息进行比较 (这里就开始决定选哪个了)
- 这里判断自己收都到的投票是否足够结束一轮投票 这里两种策略 不过我们一般都是使用票数过半作为条件
然后返回最后的投票信息- 如果票数过半 最后等待一段时间 看投票信息是否有变化
- 这里开始修改当前服务的状态
在获取超过一般的服务器的数据后 一般这个时候是可以 确定自己可以作为什么角色
- 当自己服务的状态为looking的时候
-
处理following和leading状态的投票
- FOLLOWING LEADING
是放在一个逻辑里处理的
如果自己是leader 就做判断
如果自己不是leader 或者只是新加入集群的一员 就将消息放入
outofelection进行验证 同时返回自己最后的投票信息 并更新自己的状态
- FOLLOWING LEADING