知道了选举相关的重要类及成员变量的作用以后,接下来我们开始分析真正执行选举逻辑的方法lookForLeader():
1) 选举前的准备工作
2) 将自己作为初始leader投出去
3)循环交换投票直至选出Leader,循环交换投票过程中,根据收到的投票发送者状态不同,有下面三种情况:
3.1) 发送者状态为LOOKING:
3.1.1) 验证自己与大家的投票谁更适合做leader
3.1.2) 判断本轮选举是否可以结束了
3.2) 发送者状态为OBSERVING:
3.3) 发送者状态为FOLLOWING/LEADING:
leader只要成功发出去一个消息,整个集群对这个消息就不会丢,因为leader选举,会选zxid最大的那个服务器。如果没发出去就挂了,消息就丢失了
public Vote lookForLeader() throws InterruptedException { // ----------------------- 1 选举前的初始化工作 --------------------- try { // Java Management eXtensions,Oracle提供的分布式应用程序监控技术 self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register( self.jmxLeaderElectionBean, self.jmxLocalPeerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); self.jmxLeaderElectionBean = null; } if (self.start_fle == 0) { //系统启动开始时间 self.start_fle = Time.currentElapsedTime(); } try { // recvset,receive set,用于存放来自于外部的选票,一个entry代表一次投票 // key为投票者的serverid,value为选票 // 该集合相当于投票箱 HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); // outofelection,out of election,退出选举 // 其中存放的是非法选票,即投票者的状态不是looking HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>(); // notTimeout,notification Timeout int notTimeout = finalizeWait; // ----------------------- 2 将自己作为初始leader投出去 --------------------- synchronized(this){ ...
self.start_fle = Time.currentElapsedTime();
为什么不用System.currentTimeMillis()?
因为系统时间是可以改的,不安全,并且系统时间返回的是毫秒,而currentElapsedTime是纳秒,更精确
获取相对于虚拟机的时间,就不会有系统时间的问题
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
receive set,用于存放来自于外部的选票,一个entry代表一次投票
key为投票者的serverid,value为选票Vote
该集合相当于投票箱,票箱记录了集群中其他节点的投票结果
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
out of election,退出选举
其中存放的是非法选票,已经退出选举的Server发来的选票,即投票者的状态不是looking
int notTimeout = finalizeWait;
notification Timeout,通知超时时间,200毫秒
发出选票后收到回复允许等待的时间
// ---------------------- 2 将自己作为初始化leader投出去 ---------------- // notTimeout,notification timeout int notTimeout = finalizeWait; synchronized(this){ // 逻辑时钟增一 logicalclock.incrementAndGet(); // 更新提案(更新自己的选票) // getInitId():返回当前server的id // getInitLastLoggedZxid():返回当前server的最大的zxid(最后一个zxid) // getPeerEpoch():返回当前Server的epoch // 将自己作为初始化leader,更新推荐信息 updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } LOG.info("New election. My id = " + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid)); // 发送通知到队列 sendNotifications();
logicalclock.incrementAndGet();
逻辑时钟加一
逻辑时钟可以这么理解:logicalclock代表选举逻辑时钟(类比现实中的第十八次全国人大、第十九次全国人大……),这个值从0开始递增,在同一次选举中,各节点的值基本相同,也有例外情况,比如在第18次选举中,某个节点A挂了,其他节点完成了Leader选举,但没过多久,该Leader又挂了,于是进入了第19次Leader选举,同时节点A此时恢复,加入到Leader选举中,那么节点A的logicallock为18,而其他节点的logicallock为19,针对这种情况,节点A的logicallock会被直接更新为19并参与到第19次Leader选举中。
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
更新当前server的推荐信息为当前server自己,注意该方法和logicalclock.incrementAndGet()一起是一个原子操作
synchronized void updateProposal(long leader, long zxid, long epoch){ if(LOG.isDebugEnabled()){ LOG.debug("Updating proposal: " + leader + " (newleader), 0x" + Long.toHexString(zxid) + " (newzxid), " + proposedLeader + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)"); } // 更新当前server的推荐信息 // 上一章有说过,这三个字段是成员变量,记录当前Server所推荐的Leader信息 proposedLeader = leader; proposedZxid = zxid; proposedEpoch = epoch; }
getInitId():获取当前server的id
private long getInitId(){ //判断是不是参与者,只有具有选举权的Server在选举时,才是参与者,否则是观察者OBSERVER //如果是参与者,返回当前Server的ServerId if(self.getLearnerType() == LearnerType.PARTICIPANT) return self.getId(); else return Long.MIN_VALUE; } public enum LearnerType { PARTICIPANT, OBSERVER; }
判断当前状态是否是参与者,即排除了观察者,不具有选举权的Server
具有选举权的Server在有Leader情况下才是Follower,在选举的情况下叫Participant,参与者
getInitLastLoggedZxid():获取当前server最后的(也是最大的)zxid,即事务Id
private long getInitLastLoggedZxid(){ //同理判断是否具有选举权 if(self.getLearnerType() == LearnerType.PARTICIPANT) return self.getLastLoggedZxid(); else return Long.MIN_VALUE; }
getPeerEpoch():获取当前server的epoch
private long getPeerEpoch(){ if(self.getLearnerType() == LearnerType.PARTICIPANT) try { return self.getCurrentEpoch(); } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } else return Long.MIN_VALUE; }
sendNotifications();
将更新过的Ledaer推荐信息发送出去(将更新过的信息写入到一个发送队列,具体的发送逻辑不在这,上一章讲过,有专门的线程去处理)
/** * Send notifications to all peers upon a change in our vote */ private void sendNotifications() { // 遍历所有具有选举权的server for (QuorumServer server : self.getVotingView().values()) { long sid = server.id; // notmsg,notification msg ToSend notmsg = new ToSend(ToSend.mType.notification,//消息类型 proposedLeader,//推荐的Leader的ServerId(myid) proposedZxid,//推荐的Leader的zxid logicalclock.get(),//此次选举的逻辑时钟 QuorumPeer.ServerState.LOOKING,//当前Server的状态 sid, // 接受者的server id proposedEpoch);//推荐的Leader的epoch if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } //放入发送队列 sendqueue.offer(notmsg); } }
/** * Send notifications to all peers upon a change in our vote */ private void sendNotifications() { // 遍历所有具有选举权的server for (QuorumServer server : self.getVotingView().values()) { long sid = server.id; // notmsg,notification msg ToSend notmsg = new ToSend(ToSend.mType.notification,//消息类型 proposedLeader,//推荐的Leader的ServerId(myid) proposedZxid,//推荐的Leader的zxid logicalclock.get(),//此次选举的逻辑时钟 QuorumPeer.ServerState.LOOKING,//当前Server的状态 sid, // 接受者的server id proposedEpoch);//推荐的Leader的epoch if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } //放入发送队列 sendqueue.offer(notmsg); } }
遍历的是什么?
self.getVotingView().values(),返回的是所有具有选举权和被选举权的Server
public Map<Long,QuorumPeer.QuorumServer> getVotingView() { return QuorumPeer.viewToVotingView(getView()); } /** * A 'view' is a node's current opinion(评价,称呼) of * the membership(成员) of the entire(整个的) ensemble(全体). * 翻译:“view”是一个节点(Server)对整个系统成员的当前称呼。 */ // 获取到zk集群中的所有server(包含participant与observer) public Map<Long,QuorumPeer.QuorumServer> getView() { return Collections.unmodifiableMap(this.quorumPeers); } static Map<Long,QuorumPeer.QuorumServer> viewToVotingView(Map<Long,QuorumPeer.QuorumServer> view) { Map<Long,QuorumPeer.QuorumServer> ret = new HashMap<Long, QuorumPeer.QuorumServer>(); // 将observer给排除出去,只获取参与者,即具有选举权的Server for (QuorumServer server : view.values()) { if (server.type == LearnerType.PARTICIPANT) { ret.put(server.id, server); } } return ret; }
ToSend notmsg = new ToSend(…)
notification msg,通知消息,即将推荐的Leader信息封装成ToSend对象放入发送队列,有专门线程去发送该消息
sid代表了消息接收者的server id
将自己作为初始leader投出去以后,接下来就会一直循环处理接收到的选票信息:
// ----------------------- 3 循环交换投票直至选出Leader --------------------- /* * Loop in which we exchange notifications until we find a leader * 循环交换通知,直到找到Leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ // recvqueue,receive queue,其中存放着接受到的所有外来的通知 // 有专门线程去处理接收其他Server发来的通知,并将接收到的信息解析封装成Notification 放入recvqueue队列 Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ if(manager.haveDelivered()){ // 重新发送,目的是为了重新再接收 sendNotifications(); } else { // 重新连接zk集群中的每一个server manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } else if(validVoter(n.sid) && validVoter(n.leader)) { //validVoter(n.sid):验证发送者的ServerId //validVoter(n.leader):验证当前通知推荐的leader的ServerId ...
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
循环交换通知,直到找到Leader(一但找到Leader,状态就不在是LOOKING) Notification n =
recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);receive queue,其中存放着接受到的所有外来的通知
有专门线程去处理接收其他Server发来的通知,并将接收到的信息解析封装成Notification 放入recvqueue队列
可以看到会有从recvqueue取出通知为空的情况
什么情况取出来是空呢?
假如广播出去8个,由于网络原因可能只收到3个,第四次取的时候就是空的
还有可能收到8个了,但是选举还没结束,再次取的时候也是空的
总之就是为了保证选举还没结束的时候,能继续收到其他Server的选票,并继续处理判断,直到选出Leader
if(manager.haveDelivered()){ //简单来说该方法就是判断是否和集群失联,返回false表示失联
manager:QuorumCnxManager,连接管理器,维护了服务器之间的TCP连接
haveDelivered:判断是否已经被交付,即检查所有队列是否为空,表示所有消息都已传递。
boolean haveDelivered() { for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) { LOG.debug("Queue size: " + queue.size()); if (queue.size() == 0) { return true; } } return false; }
queueSendMap就是之前说的连接管理器维护的发送给其他Server失败的消息副本的Map
只要有一个队列为0就返回true,后面就不看了,因为之前说过只要有一个队列为空,就说明当前Server与zk集群的连接没有问题
只有当所有队列都不为空,才说明当前Server与zk集群失联
sendNotifications();
如果**manager.haveDelivered()**返回true,表明当前Server和集群连接没有问题,所以重新发送当前Server推荐的Leader的选票通知,目的是为了重新再接收其他Server的回复
manager.connectAll();
如果**manager.haveDelivered()**返回false,表明当前Server和集群已经失联,所以重新连接zk集群中的每一个server
public void connectAll(){ long sid; for(Enumeration<Long> en = queueSendMap.keys(); en.hasMoreElements();){ sid = en.nextElement(); connectOne(sid); } }
为什么重连了,不需要重新发送通知了呢?
因为我失联了,但是发送队列中的消息是还再的,重新连接后会重新继续发送,而且其他Server在recvqueue.poll为null的时候,如果没有和集群失联,也会重新sendNotifications,所以这里是不需要的。
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);
重新发送通知或者重连集群后,将通知超时时间扩大两倍,如果超过最大通知时间,将超时时间置为最大时间
else if(validVoter(n.sid) && validVoter(n.leader)) {
如果从recvqueue取出的投票通知不为空,会先验证投票的发送者和推荐者是否合法,合法了再继续处理
// 验证指定server是否合法 private boolean validVoter(long sid) { //即判断是否具有选举权和被选举权 return self.getVotingView().containsKey(sid); }
... while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ ... if(n == null){...} else if(validVoter(n.sid) && validVoter(n.leader)) { switch (n.state) { case LOOKING: // 3.1.1) 验证自己与大家的投票谁更适合做leader // If notification > current, replace and send messages out // n.electionEpoch:外来通知所在选举的逻辑时钟 // logicalclock.get():获取到当前server的逻辑时钟 // 处理当前选举过时的情况:清空票箱,更新逻辑时钟 if (n.electionEpoch > logicalclock.get()) { // 更新当前server所在的选举的逻辑时钟 logicalclock.set(n.electionEpoch); // 清空票箱 recvset.clear(); // 判断当前server与n谁更适合做leader,无论谁更适合, // 都需要更新当前server的推荐信息,然后广播出去 if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); // 处理算来n过时的情况:n对于当前选举没有任何用处,直接丢掉 } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; // 处理n.electionEpoch 与 logicalclock.get() 相等的情况 // totalOrderPredicate()用于判断外来n与当前server所推荐的leader // 谁更适合做新的leader } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { // 更新当前server的推荐信息 updateProposal(n.leader, n.zxid, n.peerEpoch); // 广播出去 sendNotifications(); } if(LOG.isDebugEnabled()){ LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // 将外来n通知封装为一个选票,投放到“选票箱” recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // n.sid:通知发送者的ServerId // n.leader,n.zxid,n.peerEpoch:所推荐Leader的相关信息 // n.electionEpoch:外来通知所在选举的逻辑时钟 // ----------------------- 3.1.2) 判断本轮选举是否可以结束了 --------------------- if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { ...
n.electionEpoch:外来通知所在选举的逻辑时钟
logicalclock.get():获取到当前server选举的逻辑时钟
正常情况,在选举的时候每一个Server的electionEpoch应该都是相同的,即他们是在同一轮选举,是通过当前currentEpoch+1获得的,不是同步获得的。也有例外情况,比如在第18次选举中,某个节点A挂了,其他节点完成了Leader选举,但没过多久,该Leader又挂了,于是进入了第19次Leader选举,同时节点A此时恢复,加入到Leader选举中,那么节点A的logicallock为18,而其他节点的logicallock为19,针对这种情况,节点A的logicallock会被直接更新为19并参与到第19次Leader选举中。
这个时候需要比较选票所在的选举的逻辑时钟和当前Server选举的逻辑时钟是否相等,通过比较n.electionEpoch和
logicalclock.get()的值,有三种情况:
什么情况外来投票大,或者小呢?
比如5个机器,已经选举好Leader了,有两个已经通知了,另外两个不知道,这个时候刚上任的Leader又突然挂了,还没通知到另外两个机器的时候,就会造成这种情况,已经通知的那两个epoch会再次重新选举的,逻辑时钟会再加一,即epoch会在加一,未通知的那两个epoch还没变
站在未通知的Server角度,在接收到已经通知的Server回复的时候,就会发现回复的通知epoch更大
站在已经通知的Server角度,在接受到未通知的Server发来的通知时,会发现自己比通知的epoch大
if (n.electionEpoch > logicalclock.get()) {…}
处理n.electionEpoch 比 logicalclock.get() 大的情况(外来投票epoch大)
自己已经过时了,选谁都没有意义,所以做如下操作:
logicalclock.set(n.electionEpoch):更新当前server所在的选举的逻辑时钟
recvset.clear():清空票箱,之前收集的投票都已经过时了,没意义了。
totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch()):判断外来n与当前server谁更适合做新的leader(注意不是当前Server所推荐的,而就是当前Server)
updateProposal(…):选择更适合的更新当前server的推荐信息
sendNotifications():将自己的选票广播出去
else if (n.electionEpoch < logicalclock.get()) {…}
处理n.electionEpoch 比 logicalclock.get() 小的情况(外来投票epoch小)
说明外来的过时了,它的选票没有意义,不做任何处理,直接break掉switch,重新进入循环,从recvqueue取下一个通知,继续处理
else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {…}
处理n.electionEpoch 与 logicalclock.get() 相等的情况,即他们在同一轮选举中
totalOrderPredicate(…):断言,判断外来n与当前server所推荐的leader谁更适合做新的leader,返回true,则n(外来的)更适合
如果返回true,即外来的更合适,则执行下面方法:
updateProposal():更新当前server的推荐信息
sendNotifications():广播出去
处理完上面情况后,如果没有break,即是外来选票的逻辑时钟更大,或者相等,代表外来选票有效,则将选票放入选票箱:
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
将外来n通知封装为一个选票,投放到“选票箱”
特殊情况:当前服务器收到外来通知发现外来通知推荐的leader更适合以后,会更新自己的推荐信息并再次广播出去,这个时候recvqueue除了第一次广播推荐自己收到的回复外,还会收到新一轮广播的回复,对于其他Server而言有可能会回复两次通知,但对于本地Server是没有影响的,因为投票箱recvset是一个Map,key是发送消息的服务器的ServerId,每个Server只会记录一个投票,新的会覆盖旧的
接下来会尝试走《3.1.2 判断本轮选举是否可以结束了》这一步,但是如果刚开始选举,要达到相同选票过半才结束选举,所以肯定不会结束,里面的逻辑是不会走的,所以直接break掉switch,然后会循环到一开始从recvqueue取下一个通知,继续处理…
totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)
totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())
判断谁更适合做leader
该方法返回true,表示外来的更适合,即new更适合
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); // 获取权重,observer的权重为0,如果为0是observer就return false if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } // zxid:其为一个 64 位长度的 Long 类型,其中高 32 位表示 epoch,低 32 位表示 xid。 // 先比较前32位,如果newEpoch > curEpoch,肯定newZxid > curZxid,直接返回true // 如果newEpoch 和 curEpoch相同 // 在看Zxid,实际上比较的就是xid(前32位相等),如果newZxid > curZxid,直接返回true // 如果Zxid也相同,就比较ServerId return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
... case LOOKING: .... // ----------------------- 3.1.2) 判断本轮选举是否可以结束了 --------------------- /* * 尝试通过现在已经收到的信息,判断是否已经足够确认最终的leader了,通过方法termPredicate() , * 判断标准很简单:是否已经有超过半数的机器所推举的leader为当前自己所推举的leader. * 如果是,保险起见,最多再等待finalizeWait(默认200ms)的时间进行最后的确认, * 如果发现有了更新的leader信息,则把这个Notification重新放回recvqueue,显然,选举将继续进行。 * 否则,选举结束,根据选举的leader是否是自己,设置自己的状态为LEADING或者OBSERVING或者FOLLOWING。 */ if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader // 该循环有两个出口: // break:从该出口跳出,说明n的值不为null,说明在剩余的通知中找到了更适合做leader的通知 // while()条件:从该出口跳出,说明n的值为null,说明在剩余的通知中没有比当前server所推荐的leader更适合的了 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ // 将更适合的n重新放回到recvqueue,以便对其进行重新投票 recvqueue.put(n); break; } } // 若n为null,则说明当前server所推荐的leader就是最终的leader, // 则此时就可以进行收尾工作了 if (n == null) { // 修改当前server的状态,非leader即following self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); // 形成最终选票 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); // 清空recvqueue队列 leaveInstance(endVote); return endVote; } } break;
if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) {…}
终止断言:判断当前Server所推荐的leader在票箱中的支持率是否过半
/** * 终止断言。给定一组选票,决定是否有足够的票数宣布选举结束。 */ protected boolean termPredicate( HashMap<Long, Vote> votes, Vote vote) { HashSet<Long> set = new HashSet<Long>(); // 遍历票箱:从票箱中查找与选票vote相同的选票 for (Map.Entry<Long,Vote> entry : votes.entrySet()) { if (vote.equals(entry.getValue())){ set.add(entry.getKey()); } } return self.getQuorumVerifier().containsQuorum(set); }
org.apache.zookeeper.server.quorum.flexible.QuorumMaj#containsQuorum
/** * Verifies if a set is a majority. */ public boolean containsQuorum(Set<Long> set){ return (set.size() > half); }
half就是集群总数的一半
可以看到一定要大于一半,等于一半也不行,这也是服务器数量推荐奇数的原因。
基于该理论,由 5 台主机构成的集群,最多只允许 2 台宕机(至少要有3票)。而由 6 台构成的集群,其最多也只允许 2 台宕机(3票不过半,至少4票)。即,6 台与5 台的容灾能力是相同的。基于此容灾能力的原因,建议使用奇数台主机构成集群,以避免资源浪费。但从系统吞吐量上说,6 台主机的性能一定是高于 5 台的。所以使用 6 台主机并不是资源浪费。
已经过半了,但是recvqueue里面的通知还没处理完,还有可能有更适合的Leader通知
如果有更合适的,将通知重新加入recvqueue队列的尾部,并break退出循环,此时n != null ,不会进行收尾动作,会重新进行选举,最终还是会更新当前Server的推荐信息为这个更适合的Leader,并广播出去
如果没有,即n为null,则说明当前server所推荐的leader就是最终的leader,则此时就可以进行收尾工作了
if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) { // 该循环有两个出口: // break:从该出口跳出,说明n的值不为null,说明在剩余的通知中找到了更适合做leader的通知 // while()条件:从该出口跳出,说明n的值为null,说明在剩余的通知中没有比当前server所推荐的leader更适合的了 while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ // 如果发现有更合适的 // 将更适合的n重新放回到recvqueue,以便对其进行重新投票 recvqueue.put(n); // put:将指定的元素插入此队列的尾部,必要时等待可用的空间。 break; } } // 若n为null,则说明当前server所推荐的leader就是最终的leader, // 则此时就可以进行收尾工作了 if (n == null) { ... } } break;
收尾工作:
// 若n为null,则说明当前server所推荐的leader就是最终的leader, // 则此时就可以进行收尾工作了 if (n == null) { // 修改当前server的状态,非leader即following // 如果推荐的Leader就是我自己,修改我当前状态为LEADING // 如果不是我自己,判断自己是否是参与者,如果是则状态置为FOLLOWING,否则是OBSERVING self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); // 形成最终选票 Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch); // 清空recvqueue队列 leaveInstance(endVote); // 返回最终选票 return endVote; } private ServerState learningState(){ if(self.getLearnerType() == LearnerType.PARTICIPANT){ LOG.debug("I'm a participant: " + self.getId()); return ServerState.FOLLOWING; } else{ LOG.debug("I'm an observer: " + self.getId()); return ServerState.OBSERVING; } } private void leaveInstance(Vote v) { if(LOG.isDebugEnabled()){ LOG.debug("About to leave FLE instance: leader=" + v.getId() + ", zxid=0x" + Long.toHexString(v.getZxid()) + ", my id=" + self.getId() + ", my state=" + self.getPeerState()); } recvqueue.clear(); }
观察者是不参与Leader选举的,所以收到这样的选票不做任何处理
case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break;
首先要清楚两点:
当一个Server接收到其它Server的通知后,无论自己处于什么状态,其都会向那个Server发送自己的通知
一个Server若能够接收到其它Server的通知,说明该Server不是Observer,而是Participant。因为sendNotifications()方法中是不给Observer发送的
这个n.state,是收到外来通知的那个发送者的Server的状态
zk 集群中的每一台主机,在不同的阶段会处于不同的状态。每一台主机具有四种状态。
LOOKING:选举状态
FOLLOWING:Follower 的正常工作状态
OBSERVING:Observer 的正常工作状态
LEADING:Leader 的正常工作状态
代码里面有处理Status为OBSERVING的通知:
为什么Observer会发通知?:
首先没读其他代码,所以不是很清楚,但是可以推测一下,如果新增Observer,在启动的时候,它怎么知道谁是Leader?肯定是发通知,别人会告诉它,只不过这个逻辑代码不在选举代码这里
case OBSERVING 与 else if(validVoter(n.sid) && validVoter(n.leader)) { 有矛盾,肯定不会走case OBSERVING,已经在else if里面过滤了,为什么这么写?:
有可能是为了解决线程安全问题,为了程序健壮性
while ((self.getPeerState() == ServerState.LOOKING) &&(!stop))//只要当前状态是LOOKING,即没有选出Leader会一直循环 { // recvqueue,receive queue,其中存放着接受到的所有外来的通知 Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); if(n == null){ ... } else if(validVoter(n.sid) && validVoter(n.leader)) { switch (n.state) { case LOOKING: ... case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; // ----------------------- 3.3) 发送者状态为FOLLOWING/LEADING ----------------------- // ----------------------- 处理无需选举的情况 --------------------- // 首先要清楚两点: // 1) 当一个Server接收到其它Server的通知后,无论自己处于什么状态, // 其都会向那个Server发送自己的通知 // 2) 一个Server若能够接收到其它Server的通知,说明该Server不是Observer // 而是Participant。因为sendNotifications()方法中是不给Observer发送的 // 有两种场景会出现leader或follower给当前server发送通知: // 1)有新Server要加入一个正常运行的集群时,这个新的server在启动时, // 其状态为looking,要查找leader,其向外发送通知。此时的leader、 // follower的状态肯定不是looking,而分别是leading、following状态。 // 当leader、follower接收到通知后,就会向其发送自己的通知 // 此时,当前Server选举的逻辑时间与其它follower或leader的epoch相同,也有可能不同 // // 2)当其它Server已经在本轮选举中选出了新的leader,但还没有通知到当前Server // 所以当前Server的状态仍保持为looking而其它Server中的部分主机状态可能 // 已经是leading或following了 // 此时,当前Server选举的逻辑时间与其它follower或leader的epoch肯定相同 // 经过分析可知,最终的两种场景是: // 1)当前Server选举的逻辑时间与其它follower或leader的epoch相同 // 2)当前Server选举的逻辑时间与其它follower或leader的epoch不同 case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch together. * 把来自同一时代的所有通知放在一起考虑。 */ if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 判断当前server是否应该退出本轮选举了 // 其首先判断n所推荐的leader在当前Server的票箱中支持率是否过半 // 若过半,再判断n所推荐的leader在outofelection中的状态是否合法 // 若合法,则可以退出本轮选举了 if(ooePredicate(recvset, outofelection, n)) { // 收尾工作 self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } } /* * Before joining an established ensemble, verify * a majority is following the same leader. * 在加入一个既定的团队之前,要确认大多数人都是跟随同一个领导。 */ outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); // 若n所推荐的leader在你们通知所形成的集合中的支持率过半,则 // 我就知道谁是leader了,我就可以退出选举了 if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); break; } } else { ... } }
有两种场景会出现leader或follower给当前server发送通知:
1)有新Server要加入一个正常运行的集群时,这个新的server在启动时,其状态为looking,要查找leader,其向外发送通知。此时的leader、follower的状态肯定不是looking,而分别是leading、following状态。当leader、follower接收到通知后,就会向其发送自己的通知
此时,当前Server选举的逻辑时间与其它follower或leader的epoch相同,也有可能不同
2)当其它Server已经在本轮选举中选出了新的leader,但还没有通知到当前Server所以当前Server的状态仍保持为looking而其它Server中的部分主机状态可能已经是leading或following了
此时,当前Server选举的逻辑时间与其它follower或leader的epoch肯定相同 经过分析可知,即最终的两种场景是:当前Server选举的逻辑时间与其它follower或leader的epoch相同
当前Server选举的逻辑时间与其它follower或leader的epoch不同
epoch相同情况
case FOLLOWING: case LEADING: /* * Consider all notifications from the same epoch together. * 把来自同一时代的所有通知放在一起考虑。 */ if(n.electionEpoch == logicalclock.get()){ recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 判断当前server是否应该退出本轮选举了 // 其首先判断n所推荐的leader在当前Server的票箱中支持率是否过半 // 若过半,再判断n所推荐的leader在outofelection中的状态是否合法 // 若合法,则可以退出本轮选举了 if(ooePredicate(recvset, outofelection, n)) { // 收尾工作 self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } }
recvset.put(n.sid,new Vote(…));
如果是相同逻辑时钟的选举的选票通知,则将其封装成选票放入票箱,注意此时虽然选票的状态不是FOLLOWING就是LEADING,但是因为是处于同一逻辑时钟的选票,所以认为是有效的
**if(ooePredicate(recvset, outofelection, n)) {…}
**
判断当前server是否应该退出本轮选举了 recvset:票箱
outofelection:其中存放的是非法选票,即投票者的状态不是looking的选票 怎么判断?
首先判断n所推荐的leader在当前Server的票箱中支持率是否过半(即第一个参数的集合中判断是否过半)
若过半,再判断n所推荐的leader在outofelection中的状态是否合法(从第二个参数的集合中判断是否合法)
若合法,则可以退出本轮选举了
如果 ooePredicate返回true,说明当前server退出本轮选举了,执行收尾工作:改变状态、生成最终选票、清空队列
如果 ooePredicate返回false,继续往下走,处理epoch不同情况的情况
如果是上面场景分析的第二种场景,这个时候recvset往往可能已经有很多票了,不可能是空的,有一定概率在这个时候就能选出Leader了,所以epoch相同的这段代码就是针对场景2的一种优化,加快选出Leader
注意epoch相同的时候会先处理相同的情况,此时若还没有决定Leader,还会继续处理epoch不同的情况,此时其实是针对上面说的场景1
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); // 若n所推荐的leader在你们(“你们”代表LEADING和FOLLOWING的Server)通知所形成的集合中的支持率过半,则 // 我就知道谁是leader了,我就可以退出选举了 if(ooePredicate(outofelection, outofelection, n)) { synchronized(this){ logicalclock.set(n.electionEpoch); self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState()); } Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); leaveInstance(endVote); return endVote; } break;
outofelection.put(n.sid, new Vote(xxxx));
将状态为FOLLOWING/LEADING的Server发来的放到outofelection非法选票集合里
if(ooePredicate(outofelection, outofelection, n)) {…}场景1中新Server要加入一个正常运行的集群,选举的逻辑时钟肯定不一样,所以假设还没达到条件,会重新循环,再从队列取通知继续处理,在继续放到outofelection里…outofelection里选票越来越多。
若n所推荐的leader在你们(“你们”代表LEADING和FOLLOWING的Server)通知所形成的集合中的支持率过半,则我就知道谁是leader了,我就可以退出选举了
protected boolean ooePredicate(HashMap<Long,Vote> recv, HashMap<Long,Vote> ooe, Notification n) { // 首先判断n所推荐的leader在recv中的支持率是否过半, // 若过半,则执行checkLeader()。 // 而checkLeader()方法用于判断leader的状态是否合法 return (termPredicate(recv, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)) && checkLeader(ooe, n.leader, n.electionEpoch)); }
termPredicate(recv, new Vote(n.xx…))
判断n选票在集合recv中是否过半,该方法上面讲过
checkLeader(…)
如果满足过半条件,才会执行该方法
用于判断leader的状态是否合法
/** * 翻译:在这种情况下有个一leader已经选举了出来,并且有法定Server支持该leader, * 我们必须检查这个leader是否投票并已确认过其领导。我们需要这种检查,以避免server * 反复地选择一个已经崩溃并且不再领导的leader。 */ protected boolean checkLeader( HashMap<Long, Vote> votes, long leader, long electionEpoch){ //先默认true,后面排除法 boolean predicate = true; /* * If everyone else thinks I'm the leader, I must be the leader. * The other two checks are just for the case in which I'm not the * leader. If I'm not the leader and I haven't received a message * from leader stating that it is leading, then predicate is false. */ if(leader != self.getId()){ // 若推荐的leader是别人,我不是Leader if(votes.get(leader) == null) predicate = false; //如果在votes,即outofelection中不存在,肯定是false else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false; //如果存在,但是状态不是LEADING,也是false } else if(logicalclock.get() != electionEpoch) { // 如果每个人都认为我是领导,那我就是领导。 // 若推荐的leader是当前server,则判断为的逻辑时钟和推荐的epoch是否一样,不一样肯定是false predicate = false; } return predicate; }