前一篇文章介绍了一下自己在实现Raft领导选举与日志复制的细节,这一篇文章继续介绍一下关于Raft节点的State持久化与日志压缩的细节。
往期文章:
Raft算法实现之领导选举与日志复制(MIT6.824 Lab2A、B | Golang)
为了简洁起见,本文出现的代码均删去了打印日志的相关代码,文中AppendEntries RPC简称为AE RPC。
我们知道,Raft的一大优势就是Fault Tolerance,即能够在部分节点宕机、失联或者出现网络分区的情况下依旧让系统正常运行。而为了保证这一点,除了领导选举与日志复制外,我们还需要定期将论文Figure 2中的非易失性State持久化到磁盘中。这样,便于某个server宕机重启后能够从磁盘恢复这些State,这也是Lab2C的主要内容。
在实现时课程组提供了一个structurePersister
来扮演磁盘的角色,我们对磁盘的读写操作均使用该structure进行。具体地,我们需要两个函数:persist()
与readPersist(data []byte)
。前者在任何非易失性State被改变时均需要调用,表示这些State的改变被写入了磁盘;后者只需要在启动/重启时调用,即Make()
函数里,表示从磁盘读取。具体的编码解码过程课程组在Start Code的注释里已经给我们写了例子,照着写就可以了,思路很清晰。
func (rf *Raft) persist() {
w := new(bytes.Buffer)
enc := labgob.NewEncoder(w)
if enc.Encode(rf.currentTerm) != nil ||
enc.Encode(rf.votedFor) != nil ||
enc.Encode(rf.logs) != nil {
DPrintf(dError, "S%v store persist error!", rf.me, rf.currentTerm)
}
raftSt := w.Bytes()
rf.persister.Save(raftSt, nil)
}
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 {
// bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
dec := labgob.NewDecoder(r)
var currenT int
var voteF int
var logs []LogEntry
if dec.Decode(¤T) != nil ||
dec.Decode(&voteF) != nil || dec.Decode(&logs) != nil {
DPrintf(dError, "S%v read persist error!", rf.me, rf.currentTerm)
} else {
rf.mu.Lock()
rf.currentTerm = currenT
rf.votedFor = voteF
rf.logs = make([]LogEntry, len(logs))
copy(rf.logs, logs)
rf.mu.Unlock()
}
}
完成了以上两个函数之后,在currentTerm
、votedFor
、logs
被修改过的地方调用persist()
就好了。Lab2C的主要代码就这些,不是很多。但Lab2C里个人认为最主要的地方是关于nextIndex
回退的优化。
当Leader的AE RPC的reply结果为false且返回的Term与Leader一样大时,说明Follower i的日志一致性检查没有通过,Leader需要减小nextIndex[i]
的值并在下一次AE RPC中重新尝试发送。论文里每次会回退一个nextIndex
,然而这样的回退方法在某些情况下会比较慢。在Lab2C的测试里,有些测试会在某一个Follower宕机之后,Start
大量的command给Leader。这样当这个Follower重启后,会落后Leader巨量的LogEntry
。如果依旧使用每次回退一个nextIndex
的策略,会造成测试超时。
论文里第七页的最后一部分提出了一种策略,即当出现需要回退nextIndex
的情况时,直接回退整个ConflictTerm
的LogEntry
。这样就从每次只回退一个LogEntry
变为了每次回退一整个Term的LogEntry
。这个ConflictTerm
为AE RPC reply参数中新增的一个成员,表示发生了冲突的那条LogEntry
对应的Term。这里课程组在论文的基础上,进一步改进了这个优化策略。
我自己在做的时候在reply中加了两个成员:ConflictIndex
和ConflictTerm
。分三种情况:
logs
比args中的PrevLogIndex
短,那么令ConflictTerm
为-1,ConflictIndex
为Follower最后一个LogEntry
的索引。ConflictTerm
设置为该LogEntry的Term,ConflictIndex
设置为上一个Term的最后一个LogEntry的Index。对与Leader的reply处理,做如下操作:
log[]
中包含了ConflictTerm
的LogEntry
,设置nextIndex[i]
为leader的log[]
中ConflictTerm
下一个Term的第一个LogEntry的Indexlog[]
中不包含了ConflictTerm
的LogEntry,设置nextIndex[i]
为ConflictIndex + 1
更具体的细节可以看前面浅析Raft的文章中的“优化”部分,里面包含详细的图解分析。这里放一下主要的伪代码:
// follower
func (rf *Raft) CheckIfPLIMatch(args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
...
if LenLogs <= args.PrevLogIndex || (rf.logs[args.PrevLogIndex].LogTerm != args.PreLogTerm) {
reply.ConflictIndex = args.PrevLogIndex
if LenLogs <= args.PrevLogIndex {
reply.ConflictIndex = Last index of Follower's LogEntry
reply.ConflictTerm = -1
} else {
reply.ConflictTerm = rf.logs[reply.ConflictIndex].LogTerm
}
reply.Success = false
for ; reply.ConflictIndex >= 0; reply.ConflictIndex-- {
if rf.logs[reply.ConflictIndex].LogTerm != reply.ConflictTerm {
break
}
}
rf.ResetTimer()
return false
}
return true
}
// leader
func (rf *Raft) UpdateNIAndMI(reply *AppendEntriesReply, peerId int) {
leaderHasConflictT := false
leaderLastEntryWithConfT := 0
// find if leader has conflict term
...
if !leaderHasConflictT || reply.ConflictTerm == -1 {
// decrement nextIndex, conflict index is another term, we need to set + 1
rf.nextIndex[peerId] = reply.ConflictIndex + 1
} else {
rf.nextIndex[peerId] = leaderLastEntryWithConfT + 1
}
}
除了以上方法,在课程Lecture6的视频中Robert Morris教授(教授的笑容真的非常有感染力,这里是教授的MIT主页。关于教授的其他故事可以了解一下hhh)在回答同学的提问时也讲到,nextIndex
回退的方法有很多种,包括但不限于课程组的方法、二分等,大家可以自由发挥。
这里顺便说一下,课程组的Lab2C的Handout关于回退逻辑的部分的建议与课上讲得有一点点不一样:
Case 2: leader has XTerm:
nextIndex = leader’s last entry for XTerm
这里课程上讲的是
Case 2: leader has XTerm:
nextIndex = leader’s last entry for XTerm + 1
即下一个Term的第一个LogEntry
的index。当然,实际运行起来,差一个LogEntry
,速度上的差异可以忽略不计。正确性上两者都是正确的。
完成以上优化,如果Lab2B的代码没有大问题,基本就能通过Lab2C的测试了。
Lab2C的Test比Lab2B的要严格很多,虽然2C的代码量不大,但如果Lab2B的代码有问题的话,Lab2C的debug会非常头疼。我自己在2C的Tests中遇到了这个几个主要问题。这几个问题在上一篇写Lab2B的文章中也提到过:
nextIndex
与matchIndex
的更新。在上一篇文章中,我曾在AE RPC返回success时,简单地将nextIndex
更新为Leader此时的len(logs) + 1
,matchIndex
更新为len(logs)
。这样会在2C的Tests,尤其是Figure8Test中发生apply error
的报错。原因在于Leader在初始化完AE参数,发送RPC时,会Unlock mutex。此时可能会收到新的Start
请求,使得Leader的logs
发生了改变。而在后面处理reply时,如果简单地根据logs
改变nextIndex
和matchIndex
,显然会使得这两个值大于实际我们要更新的值。解决方法就是根据我们发送的参数来更新:rf.nextIndex[peerId] = args.PrevLogIndex + len(args.Entries) + 1
rf.matchIndex[peerId] = args.PrevLogIndex + len(args.Entries)
如果不采取一些措施,当某个server从宕机中恢复时,其State Machine需要从第一条LogEntry开始恢复状态。随着日志的不断增加,server的恢复时间也会变得越来越长。因此,我们需要一种措施,将部分的logs
和State Machine的状态定期保存至磁盘,以减小server的时间,即日志压缩。
论文的第七节对这一部分做了阐述,在浅析Raft算法的那篇文章中也做了对应的介绍。显然,当某个server完成了一次日志压缩后,从宕机中恢复时,对与已经“压缩”的LogEntry
不需要进行恢复。因此,我们需要一个index,用于区分已压缩的LogEntry
与未压缩的LogEntry
,即lastIncludedIndex
。此外,对与这个index上的LogEntry
,还要用一个变量保存这条LogEntry
的Term,我们称之为lastIncludeTerm
,用于日志一致性检查。两者在server第一次Start时均为0,且均需要持久化。
type Raft struct {
...
// for snapshot
lastIncludedTerm int
lastIncludedIndex int
}
Handout里首先要求我们实现SnapShot()
函数,还是借助这幅图,我们理解一下这个函数的意思:
在每个server的key/value层,当apply的command到达一定规模时(这个“规模”在Lab3里会实现,Lab2可以暂时放一放),会将自己的当前状态存至磁盘,并通知Raft层将对应的LogEntry
也存至磁盘。这个通知使用的就是SnapShot()
函数。
因此,SnapShot()
的逻辑为首先截断参数index之前(包括index)的logs
。之后,更新lastIncludeIndex
和lastIncludeTerm
,并与第二个参数snapshot
一起调用rf.persister.Save
存储至“磁盘”。注意,如果更新后的lastIncludeIndex
大于commitIndex
或lastApplied
,那么这两者也需要更新为lastIncludeIndex
。原因是这两者对应的LogEntry
以及应用后的State Machine状态已被存储,不需要再次commit或者apply。
最后,调用persistWithSnapShot
进行持久化(下面会讲这个函数)。注意这里的参数index
不能大于logs
的最后一条LogEntry
的index,因为SnapShot()
是由上层State Machine调用的,参数index
表示这个索引之前的所有LogEntry
都已经被apply到了State Machine上了,不存在的LogEntry
显然不能被apply。
为了支持以上逻辑,我们需要修改2C中的persist()
以及readPersist()
,添加对于lastIncludeIndex
和lastIncludeTerm
的持久化支持。此外,我们还需存储SnapShot()
的第二个参数snapshot []byte
(这个参数是State Machine相关的状态,Lab2并不会实际使用。只需知道如何存取即可)。最简单的做法就是给persist()
加一个参数。为了不破坏之前的结构,我自己重新写了一个函数persistWithSnapShot
,并修改了persist()
,防止SnapShot被nil
覆盖:
func (rf *Raft) PersistWithSnapShot(snapshot []byte) {
// get raftSt here
...
rf.persister.Save(raftSt, snapshot)
}
func (rf *Raft) persist() {
// get raftSt here
...
// rf.persister.Save(raftSt, nil) change to below:
rf.persister.mu.Lock()
rf.persister.raftstate = func(orig []byte) []byte {
x := make([]byte, len(orig))
copy(x, orig)
return x
}(raftSt)
// here can alos be:
// rf.persister.Save(raftSt, rf.persister.ReadSnapshot())
rf.persister.mu.Unlock()
}
这里其实有一点代码冗余hhh
注意这里因为lastIncludeIndex
的存在,我们任何之前对logs
的访问,都需要进行一定的改动。举个例子,原来的logs
长度有5,SnapShot()
后截断了三个LogEntry
,此时切片logs
的长度只剩2。显然我们访问index = 4
的LogEntry
时,不能直接logs[4]
,会造成越界。正确的访问方式为logs[4 - rf.lastIncludeIndex]
。因此,我们需要对已有代码里任何涉及到rf.logs[]
访问的地方对索引进行减去lastIncludeIndex
的操作。这一点是Lab2D最耗时的地方,稍微有一点遗漏就可能造成Bug。
当然,另一种思路是将所有的索引都换成“相对索引”。上面我们之所以要将涉及到的索引减去lastIncludeIndex
,是因为我们依旧将这些索引看为全局索引,即相对于整个系统启动时的第一条LogEntry
的索引。而相对索引则指相对于本地rf.logs[]
的索引,这样不用每次访问rf.logs[]
时都需要进行减去lastIncludeIndex
的操作。但需要修改LogEntry
结构体,加一个index
成员。我个人没有使用这种方法。
实际上,按照前一种方法修改起来并没有想象中那么繁琐。我甚至在修改过程中重构了一下代码,把原来动辄上百行的函数进行了拆分,保证每一个方法不超过三十行。重构完成后的代码结构对debug也有很大的帮助。
当Leader的AE reply中,某个Follower返回的需要更新的nextIndex
小于Leader的lastIncludeIndex
时,Leader就需要发送InstallSnapshot RPC
来让Follower“安装”对应的快照,以保持LogEntry
的一致性(显然此时Leader在nextIndex
前的Entry都已经落盘的,无法发送AE rpc至Follower)。
Follower收到InstallSnapshot RPC
后,首先检查参数中Term与自身currentTerm
的大小关系进行检查,之后对比参数的lastIncludeIndex
与自身的lastIncludeIndex
,如果后者更大,说明这个快照已经存储了,直接返回。否则根据参数的lastIncludeIndex
进行logs[]
的截断:如果参数的lastIncludeIndex
大于logs
最后一条LogEntry
的index(记得加上自身的lastIncludeIndex
),那么截断所有的logs
;否则,截断包括参数的lastIncludeIndex
在内的之前所有LogEntry
。
截断完成后,更新自身的lastIncludeIndex
与lastIncludeTerm
。如果更新后的lastIncludeIndex
大于commitIndex
或lastApplied
,那么这两者也需要更新为lastIncludeIndex
。
但其实这里还有一个小问题,这个问题藏得很“不起眼”,Lab2D的的测试甚至覆盖不到,直到做了Lab3才发现。首先看一下InstallSnapshot RPC
Handler中更新lastApplied
的代码:
func (rf *Raft) DiscordLogs(args *InstallSnapShotArgs) {
...
if rf.lastApplied < rf.lastIncludedIndex {
rf.lastApplied = rf.lastIncludedIndex
}
}
逻辑与SnapShot()
里的一致,似乎没什么问题。接下来我们考虑这样一个情况:有五条Append指令,封装为LogEntry
后index从1开始,均对key为0的键进行操作。如下所示:
append{
key:0, value:1}
append{
key:0, value:2}
append{
key:0, value:3}
append{
key:0, value:4}
append{
key:0, value:5}
若正常执行,那么key/value数据库最后key为0的value为12345
(key、value均为string)。假设Leader为S1,Follower为S2。Leader发送AE时的PrevLogIndex
初始化为0,并将所有五条LogEntry
发送给S2。S2成功接收并复制到自身logs
中,但返回reply由于网络原因没有发送至Leader,所以Leader没有更新S2的nextIndex
。但因为收到其他Follower的success reply,所以更新自身的commitIndex
,并发送新一轮的AE RPC。
S2接收到新AE RPC后,发现LeaderCommit与上次不同,更新自身RPC并提交五条命令。此时S2的状态机状态为key:0, value:12345
,lastApplied = 5
。但因为网络问题,S2的reply还是没有被Leader收到。同时,Leader apply到第三条,即index为3的LogEntry
后,State Machine发起了SnapShot()
,将此时的key/value状态机的状态key:0, value:123
,lastIncludeIndex = 3
存储至磁盘。接着。Leader发送新一轮AE RPC,发现S2的PrevLogIndex = 0
小于自己的lastIncludeIndex = 3
,所以发送InstallSnapshot RPC
给S2,将S2的key/value状态机的状态由key:0, value:12345
覆盖为了key:0, value:123
。
按照上文InstallSnapshot RPC
Handler的逻辑,此时InstallSnapshot RPC
参数的lastIncludeIndex = 3
,小于lastApplied = 5
,所以S2的lastApplied
没有改变,导致S2的key/value状态机的状态中永远缺失了append{key:0, value:4}
与append{key:0, value:5}
两条命令。
正确的做法应该覆盖S2的状态机状态时,将lastApplied
也一并改变,以便S2能够重新apply第4、5条指令。也就是说,InstallSnapshot RPC
允许lastApplied
的回退!
这一点可能与论文里强调的lastApplied
递增的特性不太符合。其实,lastApplied
在某些情况下是允许回退的。除了上面的情况外,最简单的情况就是重启后lastApplied
回退为了0,重新apply没有持久化到状态机的command,两者其实一个道理。从另一个角度想,这样做其实也是Raft“强领导人特性”的一个体现。
// 修改后代码:
func (rf *Raft) DiscordLogs(args *InstallSnapShotArgs) {
...
if rf.lastApplied != rf.lastIncludedIndex {
rf.lastApplied = rf.lastIncludedIndex
}
}
最后还有一个问题,为什么SnapShot()
函数里不需要这样实现?其实原因很简单:SnapShot()
是由上层State Machine发起的,由State Machine调用,不会覆盖State Machine的状态,由上及下。而InstallSnapshot
则是由Raft层发起的,由下至上,有可能覆盖State Machine的状态。
我自己在写的时候,Lab2D本身遇到的bug其实不多,基本都是因为一些赋值顺序或者lastIncludeIndex
的遗漏造成的Bug。Lab2D的bug更多地暴露在Lab3的测试中:一个就是上面提到的Bug。这个Bug因为涉及到State Machine的内容,所以Lab2D里面确实测试不出来。
除此以外,还有一个Lab2D测试中没有覆盖到的Bug:在不可靠网络下,Leader可能无法收到Follower的reply,所以没有更新PrevLogIndex
。如果Follower的状态机调用了SnapShot
,此时Follower的lastIncludeIndex
就会增大,使得Follower在对Leader发送的AE RPC进行一致性检查时出现了PrevLogIndex - lastIncludeIndex < 0
的情况,造成访问越界。解决办法就是在RPC Handler里面加一个if分支进行判断就好。
由于手头没有可用的服务器,在自己电脑上跑又太占用资源,所以目前自己是在树莓派上跑的千次测试。(没想到几年前买的小玩具居然派上了用场hhh)
目前已经跑完了6666次的测试,2B的测试出现几次失败,其余的Lab暂时没出现问题。看了下日志,发现失败的Lab2B测试中会有两三百毫秒左右的时间所有server都没有动静,也没有上锁,仿佛整个程序被暂停了一样。这也使得Follower超时发起新的选举,导致当时Term的command无法成功提交而测试失败(某些测试中command只会Start一次,并不会retry)。初步推测可能跟Go的垃圾回收有关系,还需进一步深入(说到底还是Raft的代码逻辑结构还需要优化)。
此外,在Lab3 client、server层的测试中,SpeedTest目前暂时过不了,问题主要在于Raft层的指令提交速度不够快,(SpeedTest要求33ms之内能够apply一条command)估计这个跟Lab2B的问题之间存在一定的关系。
后续打算再优化一下Raft层的逻辑,目前的方向是在Start()
里面接收到一条新的command时立刻发送一个AE RPC,看看能不能优化一下command apply的速度,然后再找找Lab2B的日志中存在的问题。
前前后后,总共花费了四天的时间完成Lab2C与Lab2D。总体感觉上,LabC、D写起来没有前两个Lab那么纠结。究其原因,可能是C、D两个部分本来在论文中就属于锦上添花的优化部分,并不需要像LabA、B一样从0开始写代码。当然了,Debug部分还是很头疼的。写完Lab2后,又用了四天的时间完成Lab3的第一版代码,并通过了除速度测试以外的所有测试一千次。Lab3的速度测试,还是需要从Raft层入手进行优化才行。短期内可能没有足够的时间进行了,先挖一个坑,将来如果能成功优化并通过Lab3的速度测试,会连同Lab3的文章一并发上来。
最后,这篇文章花了五小时,一万一千字,如果能帮到你,那就是有意义的~
文章浏览阅读1k次。通过使用ajax方法跨域请求是浏览器所不允许的,浏览器出于安全考虑是禁止的。警告信息如下:不过jQuery对跨域问题也有解决方案,使用jsonp的方式解决,方法如下:$.ajax({ async:false, url: 'http://www.mysite.com/demo.do', // 跨域URL ty..._nginx不停的xhr
文章浏览阅读2k次。关于在 Oracle 中配置 extproc 以访问 ST_Geometry,也就是我们所说的 使用空间SQL 的方法,官方文档链接如下。http://desktop.arcgis.com/zh-cn/arcmap/latest/manage-data/gdbs-in-oracle/configure-oracle-extproc.htm其实简单总结一下,主要就分为以下几个步骤。..._extproc
文章浏览阅读1.5w次。linux下没有上面的两个函数,需要使用函数 mbstowcs和wcstombsmbstowcs将多字节编码转换为宽字节编码wcstombs将宽字节编码转换为多字节编码这两个函数,转换过程中受到系统编码类型的影响,需要通过设置来设定转换前和转换后的编码类型。通过函数setlocale进行系统编码的设置。linux下输入命名locale -a查看系统支持的编码_linux c++ gbk->utf8
文章浏览阅读750次。今天准备从生产库向测试库进行数据导入,结果在imp导入的时候遇到“ IMP-00009:导出文件异常结束” 错误,google一下,发现可能有如下原因导致imp的数据太大,没有写buffer和commit两个数据库字符集不同从低版本exp的dmp文件,向高版本imp导出的dmp文件出错传输dmp文件时,文件损坏解决办法:imp时指定..._imp-00009导出文件异常结束
文章浏览阅读143次。当下是一个大数据的时代,各个行业都离不开数据的支持。因此,网络爬虫就应运而生。网络爬虫当下最为火热的是Python,Python开发爬虫相对简单,而且功能库相当完善,力压众多开发语言。本次教程我们爬取前程无忧的招聘信息来分析Python程序员需要掌握那些编程技术。首先在谷歌浏览器打开前程无忧的首页,按F12打开浏览器的开发者工具。浏览器开发者工具是用于捕捉网站的请求信息,通过分析请求信息可以了解请..._初级python程序员能力要求
文章浏览阅读7.6k次,点赞2次,收藏6次。@Service标注的bean,类名:ABDemoService查看源码后发现,原来是经过一个特殊处理:当类的名字是以两个或以上的大写字母开头的话,bean的名字会与类名保持一致public class AnnotationBeanNameGenerator implements BeanNameGenerator { private static final String C..._@service beanname
文章浏览阅读6.9w次,点赞73次,收藏463次。1.前序创建#include<stdio.h>#include<string.h>#include<stdlib.h>#include<malloc.h>#include<iostream>#include<stack>#include<queue>using namespace std;typed_二叉树的建立
文章浏览阅读7.1k次。在Asp.net上使用Excel导出功能,如果文件名出现中文,便会以乱码视之。 解决方法: fileName = HttpUtility.UrlEncode(fileName, System.Text.Encoding.UTF8);_asp.net utf8 导出中文字符乱码
文章浏览阅读2.1k次,点赞4次,收藏23次。第一次实验 词法分析实验报告设计思想词法分析的主要任务是根据文法的词汇表以及对应约定的编码进行一定的识别,找出文件中所有的合法的单词,并给出一定的信息作为最后的结果,用于后续语法分析程序的使用;本实验针对 PL/0 语言 的文法、词汇表编写一个词法分析程序,对于每个单词根据词汇表输出: (单词种类, 单词的值) 二元对。词汇表:种别编码单词符号助记符0beginb..._对pl/0作以下修改扩充。增加单词
文章浏览阅读773次。我在使用adb.exe时遇到了麻烦.我想使用与bash相同的adb.exe shell提示符,所以我决定更改默认的bash二进制文件(当然二进制文件是交叉编译的,一切都很完美)更改bash二进制文件遵循以下顺序> adb remount> adb push bash / system / bin /> adb shell> cd / system / bin> chm..._adb shell mv 权限
文章浏览阅读6.8k次,点赞12次,收藏125次。1. 单目相机标定引言相机标定已经研究多年,标定的算法可以分为基于摄影测量的标定和自标定。其中,应用最为广泛的还是张正友标定法。这是一种简单灵活、高鲁棒性、低成本的相机标定算法。仅需要一台相机和一块平面标定板构建相机标定系统,在标定过程中,相机拍摄多个角度下(至少两个角度,推荐10~20个角度)的标定板图像(相机和标定板都可以移动),即可对相机的内外参数进行标定。下面介绍张氏标定法(以下也这么称呼)的原理。原理相机模型和单应矩阵相机标定,就是对相机的内外参数进行计算的过程,从而得到物体到图像的投影_相机-投影仪标定
文章浏览阅读2.2k次。文章目录Wayland 架构Wayland 渲染Wayland的 硬件支持简 述: 翻译一篇关于和 wayland 有关的技术文章, 其英文标题为Wayland Architecture .Wayland 架构若是想要更好的理解 Wayland 架构及其与 X (X11 or X Window System) 结构;一种很好的方法是将事件从输入设备就开始跟踪, 查看期间所有的屏幕上出现的变化。这就是我们现在对 X 的理解。 内核是从一个输入设备中获取一个事件,并通过 evdev 输入_wayland