Java教程

实现Raft协议:Part 1 - 选主

本文主要是介绍实现Raft协议:Part 1 - 选主,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

翻译自Eli Bendersky的系列博客,已获得原作者授权。

本文是系列文章中的第一部分,本系列文章旨在介绍Raft分布式一致性协议及其Go语言实现。文章的完整列表如下:

  • 序言:介绍
  • 第一部分:选主(本文)
  • 第二部分:指令和日志复制
  • 第三部分:持久性和优化(待完成)

在这一部分,我会介绍我们的Raft实现代码的结构,并重点介绍算法的选主部分。本文的代码包括一个全功能的测试工具和一些您可以用来测试系统的案例。但是它不会响应客户端的请求,也不好维护日志,这些功能会在第2部分添加。

代码结构

简单介绍一下Raft实现的代码结构,本系列的所有部分都是通用的。

通常来说,Raft都被实现为一个可嵌入某些服务的对象。因为我们不会真正地开发一个服务,只是研究Raft协议本身,所以创建了一个简单的Server类型,其中包装了一个ConsensusModule类型,以期尽可能隔离出代码中最有趣的部分:

Architecture of a consensus module embedded into a server

一致性模块(CM)实现了Raft算法的核心,在raft.go文件中。该模块从网络细节以及与集群中其它副本的连接中完成抽象出来,ConsensusModule中与网络相关的唯一字段就是:

// id 是一致性模块中的服务器ID
id int

// peerIds 是集群中所有同伴的ID列表
peerIds []int

// server 是包含该CM的服务器. 该字段用于向其它同伴发起RPC调用
server *Server
复制代码

在实现过程中,每个Raft副本将集群中的其它副本称为“同伴”。集群中的每个同伴都有一个唯一的数值ID,以及记录其同伴ID的列表。server字段是指向模块所在Server*(在server.go中实现)的指针,后者可以允许ConsensusModule将消息发送给同伴。稍后我们将看到这是如何完成的。

这样设计的目的就是要将所有的网络细节排除在外,从而专注于Raft算法本身。总之,要将Raft论文对照到本实现的话。你只需要ConsensusModule类及其方法。Server代码是一个非常简单的Go语言网络框架,有一些细微的复杂之处来应对严格的测试。本系列文章中,我不会花时间讨论它。但是如果有什么不清楚的地方,请随意提问。

Raft服务器状态

总的来说,Raft CM就是一个具有3个状态的状态机[1]

Raft high level state machine

因为在序言中 花费了很多篇幅解释Raft如何帮助实现状态机,所以对这里可能会有一点困惑,但是必须说明一下,这里的术语*状态含义是不同的。Raft是一个实现任意复制状态机的算法,但是Raft内部也包含一个小的状态机。后面的章节中,某个地方的状态*是什么含义都可以结合上下弄清楚——如果不能的话,我肯定是指出来的。

在一个典型的稳态场景中,集群中有一个服务器是领导者,而其它副本都是追随者。尽管我们很希望系统可以一直这样运行下去,但是Raft协议的目标就是容错。因此,我们会花费大部分时间来讨论一下非典型的故障场景,如某些服务器崩溃,其它服务器断开连接,等等。

之前提到过,Raft使用的是强领导模型。领导者响应客户端请求,向日志中添加新条目并将其复制给其它追随者。每个追随者随时准备接管领导权,以防领导者故障或停止通信。这也就是上图中从追随者候选人(Candidate)的转变(“等待超时,开始选举”)。

任期(Terms)

就是正常的选举一样,Raft中也有任期。任期指的就是某一服务器作为领导者的一段时间。新的选举会触发新的任期,而且Raft算法保证在给定的任期中只有一个领导者。

但是这个比喻就到此为止吧,因为Raft中的选主跟真正的选举区别还是很大的。在Raft中,选举是更具协同性的,候选者的目标不是要不惜一切代价赢得选举——所有候选人有一个共同的目标,那就是在任意给定的任期都有合适的服务器赢得选举。什么稍后会详细讨论”合适“的含义。

选举定时器

Raft算法中的一个关键组成部分就是选举定时器。这是每个追随者都会持续运行的定时器,每次接收到当前领导者的消息时就重新启动它。领导者会发送周期性的心跳,因此当追随者接收不到这些心跳信号时,他会认为当前领导者出现故障或者断开连接,并开始新一轮选举(切换为候选者状态)。

:所有的追随者不会同时变成候选人吗?

:选举定时器是随机的,这也是Raft协议保持简单性的关键之一。Raft通过这种随机化来降低多个追随者同时进行选举的可能性。但是即便它们在同一时刻变成候选人,在任何一个任期内只有一个服务器会被选为领导者。在极少数情况下,会出现投票分裂导致没有候选人获胜,此时将进行新一轮的选举(使用新的任期)。虽然在理论上有可能会永远在重新选举,但是每多一轮选举,发生这种情况的概率会大大降低。

:如果追随者从集群中断开连接(分区)怎么办?它不会因为没有收到领导者的消息而开始选举吗?

:这就是网络分区问题的隐蔽性,因为追随者无法区分谁被分割了。确实,这个追随者会开启新一轮选举。但是,如果这个追随者被断开连接,那么这次选举也会无果而终——因为它无法联系到其它同伴,也就不会获得任何选票。它可能会在候选者状态一直自旋(每隔一段时间就开启新一轮的选举)直到重新接入集群中。稍后我们会详细讨论这种情况。

同伴间RPC

Raft协议中,同伴间会发送两类RPC请求。详细的参数和规则可以参考论文中的Figure 2,或者本文的附录。这里简单说明一下两种请求:

  • RequestVote(RV):只有候选人状态下会使用。在一轮选举中,候选人通过该接口向同伴请求选票。返回值中包含是否同意投票的标志。
  • AppendEntries(AE):只有领导者状态下使用。领导者通过该RPC将日志条目复制给追随者,也用来发送心跳。即使没有要复制的日志条目,也会定期向追随者发送该RPC请求。

明眼人可能看出来追随者不会发送任何的RPC请求。这是对的,追随者不会向同伴发起RPC请求,但是它们在后台会运行一个选举定时器。如果定时器结束之前没有接收到当前领导者的信息,追随者就变成候选人并开始发送RV请求。

实现选举定时器

是时候开始研究代码了。除非特别说明,否则下面展示的所有代码示例都出自这个文件。我不会把ConsensusModule结构体的所有字段——你可以在代码文件中去查看。

我们的CM模块通过在goroutime中执行以下函数来实现选举定时器:

func (cm *ConsensusModule) runElectionTimer() {
    timeoutDuration := cm.electionTimeout()
    cm.mu.Lock()
    termStarted := cm.currentTerm
	cm.mu.Unlock()
	cm.dlog("election timer started (%v), term=%d", timeoutDuration, termStarted)

    /*
	  循环会在以下条件结束:
	  1 - 发现不再需要选举定时器
	  2 - 选举定时器超时,CM变为候选人
	  对于追随者而言,定时器通常会在CM的整个生命周期中一直在后台运行。
	*/
    ticker := time.NewTicker(10 * time.Millisecond)
	defer ticker.Stop()
	for {
		<-ticker.C

		cm.mu.Lock()
        // CM不再需要定时器
		if cm.state != Candidate && cm.state != Follower {
			cm.dlog("in election timer state=%s, bailing out", cm.state)
			cm.mu.Unlock()
			return
		}
		
        // 任期变化
		if termStarted != cm.currentTerm {
			cm.dlog("in election timer term changed from %d to %d, bailing out", termStarted, cm.currentTerm)
			cm.mu.Unlock()
			return
		}

		// 如果在超时之前没有收到领导者的信息或者给其它候选人投票,就开始新一轮选举
		if elapsed := time.Since(cm.electionResetEvent); elapsed >= timeoutDuration {
			cm.startElection()
			cm.mu.Unlock()
			return
		}
		cm.mu.Unlock()
	}
}
复制代码

首先通过调用cm.electionTimeout()选择一个(伪)随机的选举超时时间,我们这里根据论文的建议将范围设置为150ms到300ms。像ConsensusModule中的大多数方法一样,runElectionTimer在访问属性时会先锁定结构体对象。这一步是必不可少的,因为我们要尽可能地支持并发,而这也是Go的优势之一。这也意味着代码需要顺序执行,而不能分散到多个事件处理程序中。不过,RPC请求同时也在发生,所以我们必须保护共享数据结构。我们后面会介绍RPC处理器。

主循环中运行了一个周期为10ms的ticker。还有更有效的方法可以实现等待事件,但是使用这种写法的代码是最简单的。每过10ms都会执行一次循环,理论上说定时器可以在整个等待过程中sleep,但是这样会导致服务响应速度下降,而且日志中的调试/跟踪操作会更困难。我们会检查状态是否跟预期一致[2],以及任期有没有改变,如果有任何问题,我们就停止选举定时器。

如果距离上一次”选举重置事件“时间过长,服务器会开始新一轮选举并变成候选人。什么是选举重置事件?可以是任何能够终止选举的事件——比如,收到了有效的心跳信息,为其它候选人投票。我们很快会看到这部分代码。

成为候选者

前面提到,如果追随者在一段时间内没有收到领导者或其它候选人的信息,它就会开始新一轮的选举。在查看代码之前,我们先思考一下进行选举需要做哪些事情:

  1. 将状态切换为候选人并增加任期,因为这是算法对每次选举的要求。
  2. 发送RV请求给其它同伴,请他们在本轮选举中为自己投票。
  3. 等待RPC请求的返回值,并统计我们是否获得足够多的票数成为领导者。

在Go语言中,这个逻辑可以在一个函数中完成:

func (cm *ConsensusModule) startElection() {
  cm.state = Candidate
  cm.currentTerm += 1
  savedCurrentTerm := cm.currentTerm
  cm.electionResetEvent = time.Now()
  cm.votedFor = cm.id
  cm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)

  var votesReceived int32 = 1

  // 向其它所有服务器发送RV请求
  for _, peerId := range cm.peerIds {
    go func(peerId int) {
      args := RequestVoteArgs{
        Term:        savedCurrentTerm,
        CandidateId: cm.id,
      }
      var reply RequestVoteReply

      cm.dlog("sending RequestVote to %d: %+v", peerId, args)
      if err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        cm.dlog("received RequestVoteReply %+v", reply)

        // 状态不是候选人,退出选举(可能退化为追随者,也可能已经胜选成为领导者)
        if cm.state != Candidate {
          cm.dlog("while waiting for reply, state = %v", cm.state)
          return
        }

        // 存在更高任期(新领导者),转换为追随者
        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in RequestVoteReply")
          cm.becomeFollower(reply.Term)
          return
        } else if reply.Term == savedCurrentTerm {
          if reply.VoteGranted {
            votes := int(atomic.AddInt32(&votesReceived, 1))
            if votes*2 > len(cm.peerIds)+1 {
              // 获得票数超过一半,选举获胜,成为最新的领导者
              cm.dlog("wins election with %d votes", votes)
              cm.startLeader()
              return
            }
          }
        }
      }
    }(peerId)
  }

  // 另行启动一个选举定时器,以防本次选举不成功
  go cm.runElectionTimer()
}
复制代码

候选人首先给自己投票——将votesReceived初始化为1,并赋值cm.votedFor = cm.id

然后并行地向所有的同伴发送RPC请求。每个RPC都是在各自的goroutine中完成的,因为我们的RPC调用的同步的——程序会阻塞至收到响应为止,这可能需要一段时间。

这里正好可以演示一下RPC是如何实现的:

cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply);
复制代码

我们使用ConsensusModule.server中保存的Server指针来发起远程调用,并指定ConsensusModule.RequestVotes作为请求的方法名,最终会调用第一个参数指定的同伴服务器中的RequestVote方法。

如果RPC调用成功,因为已经过去了一段时间,我们必须检查服务器状态来决定下一步操作。如果我们的状态不是候选人,退出。什么时候会出现这种情况呢?举例来说,我们可能因为其它RPC请求返回了足够多的票数而胜选成为领导者,或者某个RPC请求从其它服务器收到了更高的任期,因此我们退化为跟随者。一定要要记住,在网络不稳定的情况下,RPC请求可能需要很长时间才能到达——当我们收到回复时,可能其它代码已经继续执行了,在这种情况下优雅地放弃非常重要。

如果收到回复时我们还是候选人状态,先检查回复信息中的任期并与我们发送请求时的任期进行比较。如果返回信息中的任期更高,我们就恢复到追随者状态。例如,我们在收集选票时其它服务器胜选就会出现该情况。

如果返回的任期与我们发送时一致,检查是否赞成投票。我们使用原子变量votes从多个goroutine中安全地收集选票,如果服务器收到了大多数的赞成票(包括自己的赞成票),就变成领导者。

注意这里的startElection方法是非阻塞的。方法中会更新一些状态,启动一批goroutine并返回。因此,还应该在goroutine中启动新的选举定时器——也就是最后一行代码所做的事。这样可以保证,如果本轮选举没有任何结果,在定时结束后会开始新一轮的选举。这也解释了runElectionTimer中的状态检查:如果本轮选举确实将该服务器变成了领导者,那么并发运行的runElectionTimer在观察到服务器状态与期望值不同时会直接返回。

成为领导者

我们已经看到,当投票结果显示当前服务器胜选时,startElection中会调用startLeader方法,其代码如下:

func (cm *ConsensusModule) startLeader() {
  cm.state = Leader
  cm.dlog("becomes Leader; term=%d, log=%v", cm.currentTerm, cm.log)

  go func() {
    ticker := time.NewTicker(50 * time.Millisecond)
    defer ticker.Stop()

    // 只要当前服务器是领导者,就要周期性发送心跳
    for {
      cm.leaderSendHeartbeats()
      <-ticker.C

      cm.mu.Lock()
      if cm.state != Leader {
        cm.mu.Unlock()
        return
      }
      cm.mu.Unlock()
    }
  }()
}
复制代码

这实际上是一个相当简单的方法:所有的内容就是心跳定时器——只要当前的CM是领导者,这个goroutine就会每隔50ms调用一次leaderSendHeartbeats。下面是leaderSendHeartbeats对应的代码:

func (cm *ConsensusModule) leaderSendHeartbeats() {
  cm.mu.Lock()
  savedCurrentTerm := cm.currentTerm
  cm.mu.Unlock()

  // 向所有追随者发送AE请求
  for _, peerId := range cm.peerIds {
    args := AppendEntriesArgs{
      Term:     savedCurrentTerm,
      LeaderId: cm.id,
    }
    go func(peerId int) {
      cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, 0, args)
      var reply AppendEntriesReply
      if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
        cm.mu.Lock()
        defer cm.mu.Unlock()
        // 如果响应消息中的任期大于当前任期,则表明集群有新的领导者,转换为追随者
        if reply.Term > savedCurrentTerm {
          cm.dlog("term out of date in heartbeat reply")
          cm.becomeFollower(reply.Term)
          return
        }
      }
    }(peerId)
  }
}
复制代码

这里的逻辑有点类似于startElection,为每个同伴启动一个goroutine来发送RPC请求。这里的RPC请求是没有日志内容的AppendEntries(AE),在Raft中扮演心跳的角色。

与处理RV的响应时相同,如果RPC返回的任期高于我们自己的任期值,则当前服务器变为追随者。这里正好查看一下becomeFollower方法:

func (cm *ConsensusModule) becomeFollower(term int) {
  cm.dlog("becomes Follower with term=%d; log=%v", term, cm.log)
  cm.state = Follower
  cm.currentTerm = term
  cm.votedFor = -1
  cm.electionResetEvent = time.Now()

  // 启动选举定时器
  go cm.runElectionTimer()
}
复制代码

该方法中首先将CM的状态变为追随者,并重置其任期和其它重要的状态属性。这里还启动了一个新的选举定时器,因为这是每个追随者都要在后台运行的任务。

应答RPC请求

到目前为止,我们已经看到了实现代码中的主动部分——启动RPC、计时器以及状态转换的部分。但是在我们看到服务器方法(其它同伴远程调用的过程)之前,演示的代码都是不完整的。我们先从RequestVote开始:

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d]", args, cm.currentTerm, cm.votedFor)

  // 请求中的任期大于本地任期,转换为追随者状态
  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in RequestVote")
    cm.becomeFollower(args.Term)
  }

  // 任期相同,且未投票或已投票给当前请求同伴,则返回赞成投票;否则,返回反对投票。
  if cm.currentTerm == args.Term &&
    (cm.votedFor == -1 || cm.votedFor == args.CandidateId) {
    reply.VoteGranted = true
    cm.votedFor = args.CandidateId
    cm.electionResetEvent = time.Now()
  } else {
    reply.VoteGranted = false
  }
  reply.Term = cm.currentTerm
  cm.dlog("... RequestVote reply: %+v", reply)
  return nil
}
复制代码

注意这里检查了“dead”状态,稍后会讨论这一点。

首先是一段熟悉的逻辑,检查任期是否过时并转换为追随者。如果它已经是一个追随者,状态不会改变但是其它状态属性会重置。

否则,如果调用者的任期与我们一致,而且我们尚未给其它候选人投票,那我们就赞成该选票。我们决不会向旧任期发起的RPC请求投票。

下面是AppendEntries的代码:

func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
  cm.mu.Lock()
  defer cm.mu.Unlock()
  if cm.state == Dead {
    return nil
  }
  cm.dlog("AppendEntries: %+v", args)

  // 请求中的任期大于本地任期,转换为追随者状态
  if args.Term > cm.currentTerm {
    cm.dlog("... term out of date in AppendEntries")
    cm.becomeFollower(args.Term)
  }

  reply.Success = false
  if args.Term == cm.currentTerm {
    // 如果当前状态不是追随者,则变为追随者
    if cm.state != Follower {
      cm.becomeFollower(args.Term)
    }
    cm.electionResetEvent = time.Now()
    reply.Success = true
  }

  reply.Term = cm.currentTerm
  cm.dlog("AppendEntries reply: %+v", *reply)
  return nil
}
复制代码

这里的逻辑也跟论文的图2中的选主部分一致,需要理解的一个复杂点在于:

if cm.state != Follower {
  cm.becomeFollower(args.Term)
}
复制代码

Q:如果服务器是领导者呢——为什么要变成其它领导者的追随者?

A:Raft协议保证了在任一给定的任期都只有唯一的领导者。如果你自己研究RequestVote的逻辑,以及startElection中发送RV请求的代码,你会发现在集群中不会有两个使用相同任期的领导者存在。这个条件对于那些发现其它同伴赢得本轮选举的候选人很重要。

状态和goroutine

我们有必要回顾一下CM中可能存在的所有状态,以及其对应运行的不同goroutine:

追随者:当CM被初始化为追随者,或者每次执行becomeFollower方法时,都会启动新的goroutine运行runElectionTimer,这是追随者的附属操作。请注意,在短时间内可能同时运行多个选举定时器。假设一个追随者收到了领导者发出的带有更高任期的RV请求,这将触发一次becomeFollower调用并启动一个新的定时器goroutine。但是旧的goroutine只要注意到任期的变化就会自然退出。

候选人:候选人也有一个并行运行的选举定时器goroutine,但是除此之外,它还有一些发送RPC请求的goroutine。它与追随者具有相同的保护措施,可以在新选举开始时停止”旧“的选举goroutine。一定要记住,RPC goroutine可能需要很长时间才能完成,因此,如果RPC调用返回时,它们发现自身的任期已经过时,那么它们必须安静地退出。

领导者:领导者没有选举定时goroutine,但是它肯定有一个每隔50ms执行一次的心跳goroutine。

代码中还有一个附加的状态——Dead状态。这纯粹是为了有序关闭CM。调用”Stop“方法会将状态置为Dead,所有的goroutine在观察到该状态后会立即退出。

这些goroutine的运行可能会让人担忧——如果其中一些goroutine滞留在后台,该怎么办?或者出现更糟的情况,这些goroutine不断泄漏而且数量无限制地增长,怎么办?这也就是泄漏检查的目的,而且一些测试案例中也启用了泄漏检查。这些测试中会执行非常规的一系列Raft选举操作,并保证在测试结束后没有任何游离的goroutine在运行(在调用stop方法之后,给这些goroutine一些时间去退出)。

服务器失控和增加任期

作为这一部分的总结,我们来研究一个可能出现的复杂场景以及Raft如何应对。我觉得这个例子很有趣,也很有启发性。这里我试图以故事的方式来呈现,但是你最好有一张纸来记录各服务器的状态。如果你没法理解这个示例——请发邮件告知我,我很乐意将它改得更清楚一些。

想象一个有三台服务器A,B和C的集群。假设A是领导者,起始任期是1,并且集群正在完美运行着。A每隔50ms都想B、C发一次心跳AE请求,并在几毫秒内得到及时响应。每一次的AE请求都会重置B、C中的electionResetEvent属性,因此它们也都很愿意继续做追随者。

在某个时刻,由于网络路由器的临时故障,服务器B与A、C之间出现了网络分区。A仍然每隔50ms发一次AE请求,但是这些AE要求要么立即失败,或者是由于底层RPC引擎的超时导致失败。A对此无能为力,但是问题也不大。我们目前还没有涉及到日志复制,但是因为3台服务器中的2台都是正常的,集群仍然可以提交客户端指令。

那么B呢?假设在断开连接的时候,它的选举超时设置为了200ms。在断开连接大约200ms后,B的runElectionTimer会意识到在选举等待时间内没有收到领导者的信息,B无法区分是谁出了错,所以它就变为了候选者并开启一轮选举。

因此B的任期将变为2(而A和C的任期仍然是1)。B会向A和C发送RV请求,要求他们为自己投票;当然,这些请求会丢失在网络中。不要惊慌!B中的startElection方法也启动了另一个goroutine执行runElectionTimer任务,假设这个goroutine会等待250ms(要记住,我们的超时时间是在150ms-300ms之间随机选择的),以查看上一轮选举是否出现实质性的结果。因为B仍然被完全隔离,也就不会发生什么,因此runElectionTimer会发起另一轮选举,并将任期增加到3。

如此这般,B的服务器在几秒钟之后自我重置并重新上线,与此同时,B由于每隔一段时间都发起选举,它的任期已经变为8。

这时网络分区问题已经修复,B重新连接到了A和C。

不久之后,A发送的AE请求到了。回想一下,A每隔50ms都会发送心跳信息,即使B一直没有回复。

B的AppendEntries被调用,并且回复信息中携带的任务为8.

A在leaderSendHeartbeats方法中收到此回复,检查回复信息中的任期后发起比自己的任期更高。A将自身的任期改为8并变成追随者。集群暂时失去了领导者。

接下来根据定时的不同,可能会出现多种情况。B是候选者,但是它可能在网络恢复之前已经发送了RV请求;C是追随者,但是由于在选举超时内没有收到A的AE请求,也会变成候选人;A变成了追随者,也可能因为选举超时变成候选人。

所以其中任何一个服务器都可能在下一轮的选举中胜选,注意,这只是因为我们在这里并没有复制任何日志。我们将在下一部分看到,实际情况下,A和C可能会在B离线的时候添加了一些写的客户端指令,因此它们的日志是最新的。因此,B不会变成新的领导者——会出现新的一轮选举,而且A或C会胜选。在下一部分我们会再次讨论这个场景。

假如在B断开连接之后没有新增任何指令,则重新连接之后更换领导者也是完全可以的。

看起来可能有些效率低下——确实如此。这里更换领导者是不必要的,因为A在整个场景中都是非常健康的。但是,以牺牲特殊情况下的效率为代价,保证算法逻辑的简单性,这也是Raft做出的选择之一。算法在一般情形(没有任何异常)下的效率更重要,因为集群99.9%的时间都处于该状态。

下一步

为了确保你对实现的理解不仅仅局限在理论,我强烈建议你运行一下代码。

代码库中的README文件对于代码交互、运行测试用例、观察结果提供了详细的说明。代码中附带了很多针对特定场景的测试(包括前面章节中提到的场景),运行一个测试用例并查看Raft日志对于学习很有意义。注意到代码中调用的cm.dlog(...)了吗?仓库中提供了一个工具可以将这些日志在HTML文件中进行可视化——可以在README文件中查看说明。运行代码,查看日志,也可以在代码中随意添加自己的dlog,以便更好地理解代码中的不同部分是在何时运行的。

本系统的第2部分会描述更完整的Raft实现,其中处理了客户端的指令,并在集群中复制这些日志。敬请关注!

附:

Raft论文中的图2如下所示,这里对其做简要的翻译及说明。其中有部分关于日志复制和提交的,可以在看完下一篇之后重新对照理解。

Raft-RPC参数

状态 State

服务器中的状态字段有三类,分别进行介绍。

所有服务器中都需要持久化保存的状态(在响应RPC请求之前需要更新到稳定的存储介质中)

字段 说明
currentTerm 服务器接收到的最新任期(启动时初始化为0,单调递增)
votedFor 在当前任期内收到赞同票的候选人ID(如果没有就是null)
log[] 日志条目;每个条目中 包含输入状态机的指令,以及领导者接收条目时的任期(第一个索引是1)

所有服务器中经常修改的状态字段:

字段 说明
commitIndex 确认已提交的日志条目的最大索引值(初始化为0,单调递增)
lastApplied 应用于状态机的日志条目的最大索引值(初始化为0,单调递增)

领导者服务器中经常修改的状态字段(选举之后重新初始化):

字段 说明
nextIndex[] 对于每个服务器,存储要发送给该服务器的下一条日志条目的索引(初始化为领导者的最新日志索引+1)
matchIndex[] 对于每个服务器,存储确认复制到该服务器的日志条目的最大索引值(初始化为0,单调递增)

AE请求

AE请求即AppendRntries请求,由领导者发起,用于向追随者复制客户端指令,也用于维护心跳。

请求参数
参数 说明
term 领导者的任期
leaderId 领导者ID,追随者就可以对客户端进行重定向
prevLogIndex 紧接在新日志条目之前的条目的索引
prevLogTerm prevLogIndex对应条目的任期
entries[] 需要报错的日志条目(为空时是心跳请求;为了高效可能会发送多条日志)
leaderCommit 领导者的commitIndex
返回值
参数 说明
term currentTerm,当前任期,回复给领导者。领导者用于自我更新
success 如果追随者保存了prevLogIndexprevLogTerm相匹配的日志条目,则返回true
接收方实现:
  1. 如果term < currentTerm,返回false
  2. 如果日志中prevLogIndex对应条目的任期与prevLogTerm不匹配,返回false
  3. 如果本地已存在的日志条目与新的日志冲突(索引相同,但是任期不同),删除本地已存在的条目及其后所有的条目;
  4. 追加所有log中未保存的新条目;
  5. 如果leaderCommit > commitIndex,就将commitIndex设置为leaderCommit和最新条目的索引中的较小值。

RV请求

候选人执行,用于在发起选举时收集选票。

请求参数
字段 说明
term 候选人的任期
candidateId 请求选票的候选人ID
lastLogIndex 候选人的最新日志条目对应索引
lastLogTerm 候选人的最新日志条目对应任期
返回值
字段 说明
term currentTerm,当前任期,回复给候选人。候选人用于自我更新
voteGranted true表示候选人获得了赞成票
接收方实现:
  1. 如果term < currentTerm返回false
  2. 如果votedFor为空或等于candidateId,并且候选人的日志至少与接收方的日志一样新,投出赞成票。

服务器响应规则

按照服务器当下的状态(所处的角色),分别进行介绍:

所有服务器:
  • 如果commitIndex > lastApplied:增加lastApplied,将log[lastApplied]应用到状态机;
  • 如果RPC请求或者响应中携带的任期T满足T > currentTerm:设置currentTerm = T,转换为追随者。
追随者:
  • 响应候选人和领导者的RPC请求;
  • 如果超时等待时间内没有收到当前领导者的AE请求或者给候选人投出选票:转换为候选人。
候选人:
  • 刚转换为候选人时,启动选举:
    • 增加当前任期,currentTerm
    • 给自己投票
    • 重置选举定时器
    • 向其它所有服务器发送RV请求
  • 如果接收到多数服务器的赞成票:变成领导者
  • 如果接收到新领导者发出的AE请求:转换为追随者
  • 如果选举等待超时:开始新一轮选举
领导者:
  • 当选时:向每个服务器发送初始化的空AE请求(心跳);在空闲时间也重复发送AE请求,防止追随者出现等待超时;
  • 如果从客户端接收到指令:向本地日志中追加条目,在新指令被应用到状态机之后响应客户端;
  • 如果最新的日志索引index与追随者下一条日志索引nextIndex 满足 index ≥ nextIndex:向追随者发送AE请求,携带从nextIndex开始的所有日志条目:
    • 如果成功:更新追随者对应的nextIndexmatchIndex
    • 如果AE由于日志不一致而失败:减小nextIndex并重试;
  • 如果存在N,满足N > commitIndex,多数的matchIndex[i] ≥ N,并且log[N].term == currentTerm:设置commitIndex = N

  1. 这张示意图与Raft论文中的图4是相同的。正好也可以提醒一下,在本系列的文章中。我都假设您已经读过这篇论文了。 ↩︎

  2. 检查状态是否追随者和候选人可能看起来有点奇怪。难道服务器可以不通过runElectionTimer发起的选举而突然成为领导者吗? 继续往后阅读了解候选人是如何重启选举计数器的。 ↩︎

这篇关于实现Raft协议:Part 1 - 选主的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!