Counter演示程序的构成,可以参考官方文档:
https://www.sofastack.tech/projects/sofa-jraft/counter-example/
CounterServer是主启动入口,进去以后就进行了相关的配置,最后调用了集群的start方法,启动集群:
// 启动 this.node = this.raftGroupService.start();
启动后的大概流程如下:
当预投票或投票流程发起的时候,都会初始化一个投票箱:prevVoteCtx / voteCtx 都是 Ballot 类型的对象,主要字段是quorum(法定票数)
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf()); this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
法定票数quorum在投票箱Ballot的初始化方法init()中生成,为集群主机数减半:
this.quorum = this.peers.size() / 2 + 1;
这里投票箱的操作,和我们常识中的投票箱不太一样,我们常识中的投票箱,是初始一个空箱子,然后不断增加票数。这里的投票箱,是初始一个法定票数,每得到1票,则法定票数减去1,直到减到0为止,则认为投票表决通过。
投票箱有个grant方法,用于进行法定票数的自减:
当法定票数自减到0后,表示投票数量超过半数,则该投票箱达到投票通过的结果:
public boolean isGranted() { return this.quorum <= 0 && this.oldQuorum <= 0; }
集群中所有主机启动后,都会开启这个定时器,周期触发,时间一到,就发起投票流程,投自己一票,然后给集群其它主机发请求,要求对方回应自己的选举请求。
所以选举计时器是集群中比较关键的一个计时器,每个主机通过该计时器都有可能自荐为leader,防止集群无主的情况出现。作为follower,计时器的重置是收到leader的心跳请求触发的,直观的可以看看第一部分提到的raft算法演示动画:http://thesecretlivesofdata.com/raft/
选举计时器在NodeImpl初始化的时候定义,并在随后开启。
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) { @Override protected void onTrigger() { handleElectionTimeout(); } @Override protected int adjustTimeout(final int timeoutMs) { return randomTimeout(timeoutMs); } };
// Learner node will not trigger the election timer. if (!isLearner()) { this.electionTimer.restart(); } else { LOG.info("Node {} is a learner, election timer is not started.", this.nodeId); }
其中,handleElectionTimeout方法用于处理计时器到期后的动作;randomTimeout用于产生随机时间,这样集群中不同主机的计时器到期时间随机,可以防止所有主机在同一时间开始选举自己,然后都拿不到过半投票,又重新开始在同一时间发起选举,陷入死循环...,但是如果有的主机先开始发起选举,则很大概率可以获取大多数的选票,成为leader。
至于这个RepeatedTimer是个什么东西,我们看看其内容:
public Timer createTimer(final String name) { return new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048); }
可以看到,这个重复计数器是一个HashedWheelTimer,基于哈希轮算法,具体原理可参见:
netty中的定时机制HashedWheelTimer - 简书
21 技巧篇:延迟任务处理神器之时间轮 HashedWheelTimer.md
总结:集群中每个主机启动时都会开启一个electionTimer,成为leader后会暂停自己的electionTimer,follower会通过接收leader的heartbeat请求,持续保持electionTimer计时,一旦leader挂掉或者网络故障,收不到leader的heartbeat请求的follower都会因为electionTimer的计时到期,触发选举自荐流程,准备选举自己为新的leader。
前面还留了个问题,为什么先发起投票,就能大概率成为leader呢?下面看集群中一个主机收到预投票/投票请求后的处理逻辑:
在开始源码之前,有两个比较重要的概念需要了解
1. term:leader任期,集群中一个新leader产生后,term值会自增1
2. logIndex:日志索引,集群中的日志索引也是自增的,leader每次发起appendEntries(追加日志记录)请求的时候,日志内容会追加,日志索引会自增
当主机收到预投票请求后:
1. 请求的term比自己的term小,则返回reject,这种情况显然对方的数据没有自己的新,对方的领导任期都少了几轮。
2. 请求的logIndex比自己的小,也返回reject,这种情况也是对方的数据没自己的新。
3. 其它情况,返回granted,认可对方的选举自荐。
当主机收到投票请求后,同样会执行上面的逻辑,但是额外的:
如果发现请求的term比自己大,则更新自己保存的term值,向请求者看齐,这样整个集群的term值很快会保持最新值。
上面的流程可以看到,选举流程分为preVote和vote的好处是,在preVote,集群中的其它主机并不更新自己的term值,只有当候选者真的获取到了集群过半选票之后,再在正式的vote中大家才更新自己term值。可以防止有的主机,和在任的leader通信中断,于是它不断发起自荐请求,每次term都加一,但是最终又不会获选,但是拉高了集群整体的term值,并导致在任的正常leader因为term值过低而stepDown。
有了preVote环节,即使有主机带着自增的term号发起自荐,但是因其logIndex不是最新的,有可能不会获得过半票数,自荐失败退出,于是集群的term号不会更新,保留原值。
发起投票和响应投票相关的代码:
com.alipay.sofa.jraft.core.NodeImpl#preVote //预投票 com.alipay.sofa.jraft.core.NodeImpl#electSelf //投票 com.alipay.sofa.jraft.core.NodeImpl#handlePreVoteRequest //处理预投票请求 com.alipay.sofa.jraft.core.NodeImpl#handleRequestVoteRequest //处理投票请求
preVote流程:
当收到预投票数量过半时,开始投票流程
投票流程:
当收到投票数量过半时,开始becomeLeader流程
从上面的流程图还可以发现两个Timer:
一句话解释:
voteTimer:是为了防止正式投票流程超时的一个计时器,在指定的时间内没达成一致,候选者会退出投票流程,重新开始发起新一轮的预投票。
(为什么preVote过程不需要一个timer?因为preVote大家都没有修改自己的term值,都有机会发起preVote,整个过程不会因为少量主机的故障而hang住)
stepDownTimer:防止集群过半主机挂掉以后,leader仍然在这种不健康的状态继续提供服务。
leader发送心跳请求,其实底层是复用的发送日志追加请求,只是日志内容是空的而已:
public static void sendHeartbeat(final ThreadId id, final RpcResponseClosure<AppendEntriesResponse> closure) { final Replicator r = (Replicator) id.lock(); if (r == null) { RpcUtils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Peer %s is not connected", id)); return; } //id unlock in send empty entries. r.sendEmptyEntries(true, closure); }
注意SOFA_JRaft里面的RPC调用,都是采取的回调方式,所以可以看到心跳请求发送的时候,有注册一个回调函数:
RpcResponseClosure<AppendEntriesResponse> heartbeatDone; // Prefer passed-in closure. if (heartBeatClosure != null) { heartbeatDone = heartBeatClosure; } else { heartbeatDone = new RpcResponseClosureAdapter<AppendEntriesResponse>() { @Override public void run(final Status status) { onHeartbeatReturned(Replicator.this.id, status, request, getResponse(), monotonicSendTimeMs); } }; } this.heartbeatInFly = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request, this.options.getElectionTimeoutMs() / 2, heartbeatDone);
在回调函数onHeartbeatReturned里面,发起新一轮的心跳计时:
r.startHeartbeatTimer(startTimeMs);
注意:可以看到这里的心跳计时器不是前面那种RepeatedTimer,能反复自动执行,而是一个延迟执行的计时器,开启后只执行一次,然后在收到对方的成功响应后,再开启下一轮执行。
作为follower,在收到日志追加请求以后,在handleAppendEntriesRequest里面,对本地的lastLeaderTimestamp字段更新为当前最新时间,这样electionTimer就不会处理超时逻辑。
updateLastLeaderTimestamp(Utils.monotonicMs());
private void updateLastLeaderTimestamp(final long lastLeaderTimestamp) { this.lastLeaderTimestamp = lastLeaderTimestamp; }
其中,electionTimer使用该时间参数的地方为:
private boolean isCurrentLeaderValid() { return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs(); }
可以看到,只要lastLeaderTimestamp是最新的,即被leader刚刚更新过,则认为leader是活着的,则electionTimer不会触发自荐选举流程,等待下一轮的计时到期。