MySQL的after_sync半同步与raft 保证一致性的方式有些类似。
after_sync是master在sync binlog后等待从库把事务写到relay log后的ack,拿到ack后,在commit,然后在返回给客户端提交成功的信息。
raft中的日志有commit和applied 两个列表,commited 代表日志写入了文件,applied代表日志应用到了状态机。
raft也是leader先写日志,然后在等待大部分的节点信息都写入了日志,提交了,然后leader在提交,提交日志后,leader在应用到状态机,然后在返回给客户端提交成功的信息, 给其他节点提交信息,其他节点应用日志到状态机,其他节点网络慢的情况下,leader会不停重试传输。
针对leader1宕机的几种状态下的故障。参考
https://www.cnblogs.com/mindwind/p/5231986.html
https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
https://mit-public-courses-cn-translatio.gitbook.io/mit6-824/lecture-06-raft1/6.6-ying-yong-ceng-jie-kou
针对上面的场景有几个疑问
一:场景3中,新主会间接提交之前的日志,客户端在重试,不是重复执行了吗?状态机执行了2次命令。
这个状态机不会重复执行,因为之前的leader已经挂了,还没有apply log,所以不会发送apply 的请求出去。
二:如果leader apply log,返回给客户端确认,但是follower没有收到apply的信号,leader就挂了,虽然新主上有commit的日志,但是不会apply,怎么办?
这个是会应用的,新的leader,在接受用户请求之前会执行
r.dispatchLogs([]*logFuture{noop})
分发一个noop日志
func (r *Raft) processLogs(index uint64, future *logFuture) { // Reject logs we've applied already lastApplied := r.getLastApplied() if index <= lastApplied { r.logger.Printf("[WARN] raft: Skipping application of old log: %d", index) return } // Apply all the preceding logs for idx := r.getLastApplied() + 1; idx <= index; idx++ { // Get the log, either from the future or from our log store if future != nil && future.log.Index == idx { r.processLog(&future.log, future, false) } else { l := new(Log) if err := r.logs.GetLog(idx, l); err != nil { r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", idx, err) panic(err) } r.processLog(l, nil, false) } // Update the lastApplied index and term r.setLastApplied(idx) }
在进行for idx := r.getLastApplied() + 1; idx <= index; idx++ 这个判断的时候
lastapplied一定比index小,index就是lastindex,已提交的日志在lastindex前,这样之前leader提交状态的日志会被applied。
raft实现了cap中的cp ,没有保证a ,a代表的是用户的请求一定有响应,在出现脑裂的情况下,如果一个leader的请求没有被大多数节点接受,那么就没有办法提交,没法给客户响应,如果3个节点中有2个节点挂掉,就剩一个节点,其实这个时候请求过来后,判断不是主,不会执行,也会返回给客户端not leader,所以这里的响应,是指的不会处理请求,进行业务逻辑处理。
在raft代码中,我们可以看到是在fsm apply log后才向客户端发送的响应
case commitEntry := <-r.fsmCommitCh: // Apply the log if a command var resp interface{} if commitEntry.log.Type == LogCommand { start := time.Now() resp = r.fsm.Apply(commitEntry.log) metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) } // Update the indexes lastIndex = commitEntry.log.Index lastTerm = commitEntry.log.Term // Invoke the future if given if commitEntry.future != nil { commitEntry.future.response = resp commitEntry.future.respond(nil) }
follower的幂等实现,就是判断entries的第一个index,是否小于等于当前follower的最后一个logindex,如果是,把这之间的删除掉
// Process any new entries if n := len(a.Entries); n > 0 { start := time.Now() first := a.Entries[0] last := a.Entries[n-1] // Delete any conflicting entries lastLogIdx, _ := r.getLastLog() if first.Index <= lastLogIdx { r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", first.Index, lastLogIdx) if err := r.logs.DeleteRange(first.Index, lastLogIdx); err != nil { r.logger.Printf("[ERR] raft: Failed to clear log suffix: %v", err) return } }