该项目是Mit6.824的开源lab lab-kvraft 的实现,主语言为go
该项目是一个键值对存储服务器,其服务端代码可以部署在多台机器上(2n+1台机器可以容忍n台机器宕机而不影响服务的可用性).当有多个客户端在网络不可靠,并且服务可能崩溃的情况下同时请求该服务器时,依旧可以保证返回结果的线性一致性:线性一致性是一个实用的特性,它可以保证你看到的视图就好像客户端发送的请求只被一台单独的服务器顺序处理.比如,如果一个客户端发送的请求得到了正确的响应,那么线性一致性将保证随后的来自任何客户端的请求都将可以看到这次更新造成的影响.线性一致性在单机上是很容易去实现的,但是在分布式系统上却十分困难,因为每台机器必须保证相同的客户端请求执行次序,并且不能给客户端返回过期的结果,与此同时还要保证能从故障中恢复到最新状态
整个客户端提供的接口有:
1.Get(key string),返回指定key存储的value
2.Put(key string, value string) 建立key-value的映射关系,如果key已存在则更新
3.Append(key string, value string) 向指定key的后面追加value
该项目底层使用的分布式一致性算法为Raft,由Raft的实现可知,Raft的leader节点会通过追加日志的方式来达成日志一致性的共识,而一旦半数以上节点都达成了共识,就会提交日志,日志一经提交就说明日志上的操作最终会应用到服务器上.因此,KVServer启动了一个线程,不断的监听Raft中提交的日志,具体操作为
func (kv *KVServer) listenToRaft() {
for {
command := <-kv.applyCh
if command.CommandValid {
kv.mu.Lock()
kv.ServerPrint("Get a Commited Command from raft")
kv.ServerPrint("CommandIndex: " + ToString(command.CommandIndex))
op := command.Command.(Op)
kv.applyIndex = command.CommandIndex
kv.indexToVersion[command.CommandIndex] = op.Version
delete(kv.versionToIndex,op.LastVersion)
_, ok := kv.versionToIndex[op.Version]
if ok {
kv.ServerPrint("Get a Commited Command that has been appllied,discard")
kv.mu.Unlock()
continue
} else {
kv.versionToIndex[op.Version] = 1
}
kv.ServerPrintf("Op: %v", op)
if op.Meth == "Get" {
} else if op.Meth == "Put" {
kv.table[op.Key] = op.Value
} else if op.Meth == "Append" {
origin := kv.table[op.Key]
kv.table[op.Key] = origin + op.Value
}
kv.mu.Unlock()
}else{
kv.mu.Lock()
kv.ServerPrint("Get a Snapshot Command from raft")
kv.applyIndex = command.SnapshotIndex
kv.readPersist(command.Snapshot)
kv.mu.Unlock()
}
time.Sleep(time.Millisecond*20)
}
}
KVServer在创建时,会传递给Raft一个管道applyCh,KVServer使用这个管道和Raft进行通信,KVServerf接受到的消息类型有两种,一种是普通的提交消息,另一种是snapshot的安装消息.对于普通的提交消息,KVServer首先会更新applyIndex,因为一旦接收到这个消息说明对应索引处的日志将要被我们应用了,另外,还要根据消息的版本来确认这个消息是否是冗余的消息.因为Raft虽然能保证日志的一致性,但是对于某些客户端由于丢包重发,不可靠的网络等问题产生的多余的日志提交却束手无策,因此我们需要在应用层解决这一问题,为每个操作打上编号(indexToVersion),这样当遇到相同编号的日志时不必执行对应操作.
在Get和PutAppend的客户端实现中,我们会将传递来的参数封装,随后向随机一个raft发起rpc,如果该raft返回OK,则说明得到了结果,返回.如果返回ErrWrongLeader,则更换服务器继续发起rpc.如果rpc调用失败(由于网络问题或服务器宕机),则更换raft服务器继续发起rpc
以Get为例
for true {
ok = ck.sendGetRpc(ck.preLeader, &args, &reply)
if !ok {
ck.CkPrint("fail rpc and change server")
ck.preLeader++
if ck.preLeader == len(ck.servers) {
ck.preLeader = 0
}
time.Sleep(10*time.Millisecond)
continue
}
if reply.Err == OK {
ck.lastVersion = args.Version
return reply.Value
}
if reply.Err == ErrNoKey {
ck.lastVersion = args.Version
return ""
}
if reply.Err == ErrWrongLeader {
ck.preLeader++
if ck.preLeader == len(ck.servers) {
ck.preLeader = 0
}
}
}
在Get和PutAppend的服务端实现中,我们会将对应操作传递给Raft让其开始共识,
index, term, isLeader := kv.rf.Start(op)
如果接收到请求的Raft不是leader,它会直接返回
if !isLeader {
kv.ServerPrint("I am not a leader, reply ErrWrongLeader")
reply.Err = ErrWrongLeader
return
}
如果是leader,会返回提交成功后期望的日志Index,以及leader当前的Term
当我们得到了期望日志出现的Index之后我们就可以轮询KVServer的applyIndex,一旦applyIndex比期望Index大,就说明当前操作已被应用到系统上,可以返回结果了
for{
if index <= kv.applyIndex {
kv.ServerPrint("ApplyIndex = " + ToString(kv.applyIndex) + ",bigger than commandIndex")
reply.Err = OK
reply.Value = kv.table[op.Key]
kv.ServerPrint("===Get Key:" + args.Key + " Result: " + reply.Value)
kv.mu.Unlock()
return
} else {
kv.ServerPrint("ApplyIndex = " + ToString(kv.applyIndex) + ",smaller than commandIndex, sleep and try again")
kv.mu.Unlock()
time.Sleep(50 * time.Millisecond)
}
}
在不断轮询的过程中,有这样一种情况,我们原本期望的位置已经被其他日志占领,或者是leader的Term发生了变化,这两种情况都说明了此时的leader已经由于网络分区等问题失效,因此我们需要再次请求其他服务器试图寻找leader,对应代码如下
for {
kv.mu.Lock()
//new add code
if term != kv.rf.GetCurrentTerm(){
kv.ServerPrint("Leader changed, reply ErrWrongLeader")
reply.Err = ErrWrongLeader
kv.mu.Unlock()
return
}
if index <= kv.applyIndex {
//new add code
if kv.indexToVersion[index] != op.Version {
kv.ServerPrint("Leader changed, reply ErrWrongLeader")
reply.Err = ErrWrongLeader
kv.mu.Unlock()
return
}
delete(kv.indexToVersion,index)
kv.ServerPrint("ApplyIndex = " + ToString(kv.applyIndex) + ",bigger than commandIndex")
reply.Err = OK
reply.Value = kv.table[op.Key]
kv.ServerPrint("===Get Key:" + args.Key + " Result: " + reply.Value)
kv.mu.Unlock()
return
} else {
kv.ServerPrint("ApplyIndex = " + ToString(kv.applyIndex) + ",smaller than commandIndex, sleep and try again")
kv.mu.Unlock()
time.Sleep(50 * time.Millisecond)
}
}
该项目中,我们通过定期生成快照来保证故障恢复.快照包含两种类型:
1.上层服务(键值对存储服务)的快照
2.Raft状态的快照
在每次崩溃时,我们单单恢复Raft或者仅仅恢复上层服务的状态都是远远不够的,只有二者保持同步才算是正确的故障恢复.同时,快照的生成和传递是双向的,Raft可以向他的上层服务发送快照,上层服务也可以通知Raft生成快照
首先来看前者,当Raft收到leader的InstallSnapshotRpc时,说明此时由于网络分区等问题已经落后太多,而我们的上层服务也是如此,因此我们需要将这份快照同步给上层服务
msg := ApplyMsg{}
msg.SnapshotValid = true
msg.SnapshotIndex = args.LastIncludedIndex
msg.SnapshotTerm = args.LastIncludedTerm
msg.Snapshot = args.SnapShot
if rf.commitIndex < args.LastIncludedIndex {
rf.commitIndex = args.LastIncludedIndex
}
if rf.lastApplied < args.LastIncludedIndex {
rf.lastApplied = args.LastIncludedIndex
}
if rf.getLastLogIndex() >= args.LastIncludedIndex {
if rf.getLogTerm(args.LastIncludedIndex) == args.LastIncludedTerm {
rf.cutLogTo(args.LastIncludedIndex + 1)
rf.lastIncludedIndex = args.LastIncludedIndex
rf.lastIncludedTerm = args.LastIncludedTerm
rf.applyCh <- msg
rf.persist()
return
}
}
rf.lastIncludedIndex = args.LastIncludedIndex
rf.lastIncludedTerm = args.LastIncludedTerm
//index-rf.startIndex
rf.log = []LogEntry{}
rf.applyCh <- msg
上层服务收到Raft发来的快照后,会对快照内容进行分析
kv.mu.Lock()
kv.ServerPrint("Get a Snapshot Command from raft")
kv.applyIndex = command.SnapshotIndex
kv.readPersist(command.Snapshot)
kv.mu.Unlock()
readPersist会将快照中记录的状态应用到自身
上层服务需要持久化的状态有:
var table map[string]string
var versionToIndex map[string]int
var indexToVersion map[int]string
var applyIndex int
再来看后者,当上层服务检测到Raft的日志太长,会通知Raft生成快照
func (kv *KVServer) testSnapshot() {
if(kv.maxraftstate == -1){
return
}
for {
time.Sleep(100 * time.Millisecond)
kv.mu.Lock()
//raft的日志太长
if kv.maxraftstate < kv.rf.GetPersister().RaftStateSize() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.table)
e.Encode(kv.versionToIndex)
e.Encode(kv.indexToVersion)
e.Encode(kv.applyIndex)
kv.ServerPrint2("Do a snapshot!! maxraft = "+ToString(kv.maxraftstate)+" and raft = "+ToString(kv.rf.GetPersister().RaftStateSize()))
kv.rf.Snapshot(kv.applyIndex,w.Bytes())
}
kv.mu.Unlock()
}
}
此时上层服务会把自己的状态转化为byte数组,然后传递给Raft,Raft收到后会切除日志中已经包含在byte数组里的部分,同时将该byte数组连同自己的状态一起持久化
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
rf.RaftPrint3B("snap index :" + RaftToString(index) + " ,discard log from " + RaftToString(rf.lastIncludedIndex) + " to it")
if(index <= rf.lastIncludedIndex){
rf.RaftPrint3B("Attention!! index less than lastIncludedIndex,discard")
rf.mu.Unlock()
return
}
rf.lastIncludedTerm = rf.getLogTerm(index)
rf.RaftPrint3B("BeforeLenLOg:"+RaftToString(len(rf.log)))
rf.RaftPrint3B("Before snapshot : "+RaftToString(rf.persister.RaftStateSize()))
for i := rf.lastIncludedIndex; i < index; i++ {
rf.log = rf.log[1:]
}
rf.RaftPrint3B("AfterLenLOg:"+RaftToString(len(rf.log)))
rf.snapShot = snapshot
rf.lastIncludedIndex = index
rf.persist()
rf.RaftPrint3B("After snapshot : "+RaftToString(rf.persister.RaftStateSize()))
rf.mu.Unlock()
}
persist方法会将自己的状态和上层服务传来的快照一起持久化
func (rf *Raft) persist() {
// Your code here (2C).
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
e.Encode(rf.lastIncludedIndex)
e.Encode(rf.lastIncludedTerm)
data := w.Bytes()
rf.persister.SaveStateAndSnapshot(data, rf.snapShot)
//rf.persister.SaveRaftState(data)
}
关于Raft的介绍以及实现敬请参考 分布式一致性算法:Raft