每个 shardkv 服务器都作为副本组的一部分运行。每个副本组为某些键空间分片提供 Get、Put 和 Append 操作。在 client.go 中使用 key2shard() 来查找 key 属于哪个 shard。多个副本组合作为完整的分片集提供服务。 shardctrler 服务的单个实例将分片分配给副本组;当这个分配发生变化时,副本组必须相互传递分片,同时确保客户端不会看到不一致的响应。
我们的存储系统必须为使用其客户端接口的应用程序提供可线性化的接口。也就是说,对 shardkv/client.go 中的 Clerk.Get()、Clerk.Put() 和 Clerk.Append() 方法的完整应用程序调用必须以相同的顺序影响所有副本。 Clerk.Get() 应该看到最近的 Put/Append 写入同一个键的值。即使 Gets 和 Puts 与配置更改几乎同时到达,也必须如此。
这一部分我觉的是整个MIT6.824中最难的一部分,lab2虽然也很难,但是只要按照论文写基本不会出现很大的问题,而这个部分如果没有一些分布式数据库源码和设计基础的,我觉得很难上手和理解。
这里shardKV实现有很多种,我按我的理解和实现简述的任务:
shardKV与shardctrler之间的通信是要通过每一个shardKV内部的client来进行;不同group的shardKV通过自建RPC来进行通信;同一个group的shardKV通过start来进行通信并达到一致;
每一个shardkv不应调用分片控制器的 Join() 处理程序。测试代码将在适当的时候调用 Join();
Shardkv将需要定期调用Query()轮询 shardctrler 以了解新配置。测试预计我们的代码大约每 100 毫秒轮询一次;更频繁是可以的,但少得多可能会导致问题;
ShardKV定期从 shardctrler 获取最新配置,并添加代码以在接收组不负责客户端key的分片时拒绝客户端请求;
在配置更改期间,一对组可能需要在它们之间双向移动分片。如果我们看到死锁,这是一个可能的来源。
配置更改可能会涉及部分shard的重新分配,在新配置中,目标节点会请求源节点获取该shard的数据,而在请求并完成数据的导入这个过程中,就涉及到了数据的一致性问题,一个简单的方法就是:在这个过程中,目标结果不会处理涉及尚未迁移完成的新shard的命令。虽然概念上很简单,但这种方法在生产级系统中是不可行的,每当机器被带入或取出时,它都会导致所有客户端长时间停顿,最好继续为不受正在进行的配置更改影响的分片提供服务,这种方法其实我也有一个想法:请求目标节点,如果发现该shard还没有迁移完成,就返回源节点group+一个重定向Move标识,使得客户端可以带上标识请求源节点,源节点如果该shard还没有进行迁移,则进行运行并保存,等目标节点进行迁移时,就会获取已经执行好命令的数据,这样子的话实现就会很复杂了;
configNum其实类似于Redis中的configEpoch,因为网络中因为一致性的问题,不可能在同一个时间点所有的节点的configNum都是一致的,因为不同的configNum,同一个shard可能会分配给不同得节点,因此以configNum为依据,可以找到指定的shard处理节点;
这里遇到一个很坑的点,在这个lab中,每当调用readPersister读取快照时,因为要对Raft的快照状态进行修改,因此就必须调用CondInstallSnapshot函数,但是这样就会导致测试一直阻塞,就算该函数为空也没办法通过,只能不调用该函数,将CondInstallSnapshot的逻辑写到InstallSnapshot RPC的处理代码中才能通过测试,不知道什么问题,太离谱了。(我在写kvraft这个lab的时候,readPersister的实现基本相同,但是那个调用CondInstallSnapshot可以,在这个lab中就是不行)
而且在本lab中,我发现频繁的上锁和解锁不仅会导致运行变慢,还会出现很多奇怪的现象,但绝不是死锁的问题。(待解决)
Config中存储的Groups是每一个gid下的所有servername,通过servername可以获取对应的endpoint从而进行不同group间的通信:
func(servername string) *labrpc.ClientEnd {
name := randstring(20)
end := cfg.net.MakeEnd(name)
cfg.net.Connect(name, servername)
cfg.net.Enable(name, true)
return end
}
在进行数据迁移时,源节点可以向目标节点的任何一个节点获取要迁移的shard数据,因为这部分数据都是一致的,而不用特定的从leader获取,减少网络中的请求包。
这部分主要代码分为如下部分:
基本就这些吧,在下面一些简单的功能性函数我就不贴出来了。
const (
PullConfigInterval = time.Millisecond * 100
PullShardsInterval = time.Millisecond * 200
WaitCmdTimeOut = time.Millisecond * 500
CallPeerFetchShardDataTimeOut = time.Millisecond * 500
CallPeerCleanShardDataTimeOut = time.Millisecond * 500
MaxLockTime = time.Millisecond * 10 // debug
)
type ShardKV struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
make_end func(string) *labrpc.ClientEnd
gid int
ctrlers []*labrpc.ClientEnd
maxraftstate int // snapshot if log grows this big
// Your definitions here.
stopCh chan struct{}
commandNotifyCh map[int64]chan CommandResult //用于命令apply后的唤醒
lastApplies [shardctrler.NShards]map[int64]int64 //k-v:ClientId-CommandId
config shardctrler.Config //记录当前的config
oldConfig shardctrler.Config //保存上一个config,进行shard迁移时,目标节点根据这个config来获取源节点,从而获取shard数据和请求清除shard数据
meShards map[int]bool //记录自己分配到的shard
data [shardctrler.NShards]map[string]string
inputShards map[int]bool //当前这个config相较于上一个config新指派的shard,只有input为空了才能更新下一个config
outputShards map[int]map[int]MergeShardData // configNum -> shard -> data。当某一个config,当前节点的shard移除,则记录当前config的所有移除shard的mergeShardData
//cleanOutputDataNotifyCh map[string]chan struct{} //用来通知等待协程clean完成
scc *shardctrler.Clerk //保存一个shardctrler的客户端,因为要向shardctrler发送query获取配置信息
//持久化
persister *raft.Persister
//定时任务计时器
pullConfigTimer *time.Timer //定期获取config
pullShardsTimer *time.Timer //定期检查inputShard并请求数据
//用于互斥锁
lockStartTime time.Time
lockEndTime time.Time
lockMsg string
}
没什么好介绍的。。。
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, ctrlers []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
// call labgob.Register on structures you want
// Go's RPC library to marshall/unmarshall.
labgob.Register(Op{})
kv := new(ShardKV)
kv.me = me
kv.maxraftstate = maxraftstate
kv.make_end = make_end
kv.gid = gid
kv.ctrlers = ctrlers
// Your initialization code here.
kv.persister = persister
kv.scc = shardctrler.MakeClerk(kv.ctrlers)
// Use something like this to talk to the shardctrler:
// kv.mck = shardctrler.MakeClerk(kv.ctrlers)
kv.applyCh = make(chan raft.ApplyMsg)
kv.stopCh = make(chan struct{})
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
//初始化自身数据
kv.data = [shardctrler.NShards]map[string]string{}
for i, _ := range kv.data {
kv.data[i] = make(map[string]string)
}
kv.lastApplies = [shardctrler.NShards]map[int64]int64{}
for i, _ := range kv.lastApplies {
kv.lastApplies[i] = make(map[int64]int64)
}
kv.inputShards = make(map[int]bool)
kv.outputShards = make(map[int]map[int]MergeShardData)
//kv.cleanOutputDataNotifyCh = make(map[string]chan struct{})
config := shardctrler.Config{
Num: 0,
Shards: [shardctrler.NShards]int{},
Groups: map[int][]string{},
}
kv.config = config
kv.oldConfig = config
//读取快照内容
kv.readPersist(true, 0, 0, kv.persister.ReadSnapshot())
kv.commandNotifyCh = make(map[int64]chan CommandResult)
//设置定时器
kv.pullConfigTimer = time.NewTimer(PullConfigInterval)
kv.pullShardsTimer = time.NewTimer(PullShardsInterval)
kv.ticker()
return kv
}
主要逻辑就是三个协程的处理,后面会依次介绍:
func (kv *ShardKV) ticker() {
//处理applyCh
go kv.handleApplyCh()
//定时获取config信息
go kv.pullConfig()
//定时获取input shard(如果有的话)
go kv.fetchShards()
}
func (kv *ShardKV) pullConfig() {
for {
select {
case <-kv.stopCh:
return
case <-kv.pullConfigTimer.C:
//只有leader才能获取
_, isLeader := kv.rf.GetState()
if !isLeader {
kv.pullConfigTimer.Reset(PullConfigInterval)
break
}
kv.lock("pullconfig")
lastNum := kv.config.Num
kv.log("pull config,last: %d", lastNum)
kv.unlock("pullconfig")
config := kv.scc.Query(lastNum + 1)
if config.Num == lastNum+1 {
//找到新的config
kv.log("pull config,new config:%+v", config)
kv.lock("pullconfig")
//这一个判断很关键,必须当前shard全部迁移完成才能获取下一个config
if len(kv.inputShards) == 0 && kv.config.Num+1 == config.Num {
kv.log("pull config,start config:%+v", config)
kv.unlock("pullconfig")
//请求该命令
kv.rf.Start(config.Copy())
} else {
kv.unlock("pullconfig")
}
}
kv.pullConfigTimer.Reset(PullConfigInterval)
}
}
}
这一部分就是定时从shardctrler获取config的逻辑,注意几点:
type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
ReqId int64 //用来标识commandNotify
CommandId int64
ClientId int64
Key string
Value string
Method string
ConfigNum int
}
type CommandResult struct {
Err Err
Value string
}
Op和CommandResult和之前lab中的变化不是很大。
Op中的ConfigNum是用来记录客户端请求时args中的configNum,当Op从applyCh中获取并应用时,只有configNum和当前节点的configNum相同,才能执行。
这一部分和kvraft区别不大:
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
// Your code here.
res := kv.waitCommand(args.ClientId, args.CommandId, "Get", args.Key, "", args.ConfigNum)
reply.Err = res.Err
reply.Value = res.Value
}
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
res := kv.waitCommand(args.ClientId, args.CommandId, args.Op, args.Key, args.Value, args.ConfigNum)
reply.Err = res.Err
}
func (kv *ShardKV) waitCommand(clientId int64, commandId int64, method, key, value string, configNum int) (res CommandResult) {
kv.log("wait cmd start,clientId:%d,commandId: %d,method: %s,key-value:%s %s,configNum %d", clientId, commandId, method, key, value, configNum)
op := Op{
ReqId: nrand(),
ClientId: clientId,
CommandId: commandId,
Method: method,
Key: key,
ConfigNum: configNum,
Value: value,
}
index, term, isLeader := kv.rf.Start(op)
if !isLeader {
res.Err = ErrWrongLeader
kv.log("wait cmd NOT LEADER.")
return
}
kv.lock("waitCommand")
ch := make(chan CommandResult, 1)
kv.commandNotifyCh[op.ReqId] = ch
kv.unlock("waitCommand")
kv.log("wait cmd notify,index: %v,term: %v,op: %+v", index, term, op)
t := time.NewTimer(WaitCmdTimeOut)
defer t.Stop()
select {
case <-t.C:
res.Err = ErrTimeOut
case res = <-ch:
case <-kv.stopCh:
res.Err = ErrServer
}
kv.removeCh(op.ReqId)
kv.log("wait cmd end,Op: %+v.res:%+v", op, res)
return
}
waitCommand主要的处理步骤:
//定时获取shard
func (kv *ShardKV) fetchShards() {
for {
select {
case <-kv.stopCh:
return
case <-kv.pullShardsTimer.C:
//判断是否有要input的shard
_, isLeader := kv.rf.GetState()
if isLeader {
kv.lock("pullshards")
for shardId, _ := range kv.inputShards {
//注意要从上一个config中请求shard的源节点
go kv.fetchShard(shardId, kv.oldConfig)
}
kv.unlock("pullshards")
}
kv.pullShardsTimer.Reset(PullShardsInterval)
}
}
}
主要就是主节点检查inputShards,如果有还没有迁移完成的shard,就调用fetchShard函数进行迁移:
//获取指定的shard
func (kv *ShardKV) fetchShard(shardId int, config shardctrler.Config) {
args := FetchShardDataArgs{
ConfigNum: config.Num,
ShardNum: shardId,
}
t := time.NewTimer(CallPeerFetchShardDataTimeOut)
defer t.Stop()
for {
//依次请求group中的每个节点,但只要获取一个就好了
for _, s := range config.Groups[config.Shards[shardId]] {
reply := FetchShardDataReply{}
srv := kv.make_end(s)
done := make(chan bool, 1)
go func(args *FetchShardDataArgs, reply *FetchShardDataReply) {
done <- srv.Call("ShardKV.FetchShardData", args, reply)
}(&args, &reply)
t.Reset(CallPeerFetchShardDataTimeOut)
select {
case <-kv.stopCh:
return
case <-t.C:
case isDone := <-done:
if isDone && reply.Success == true {
kv.lock("pullShard")
if _, ok := kv.inputShards[shardId]; ok && kv.config.Num == config.Num+1 {
replyCopy := reply.Copy()
mergeShardData := MergeShardData{
ConfigNum: args.ConfigNum,
ShardNum: args.ShardNum,
Data: replyCopy.Data,
CommandIndexes: replyCopy.CommandIndexes,
}
kv.log("pullShard get data:%+v", mergeShardData)
kv.unlock("pullShard")
kv.rf.Start(mergeShardData)
//不管是不是leader都返回
return
} else {
kv.unlock("pullshard")
}
}
}
}
}
}
因为新shard所在的源group每一个节点都有相同的数据,因此这里可以依次请求每一个节点,只要从一个节点得到了数据(不一定是leader节点),就可以结束请求了。
而获取新shard的数据也需要调用Start通过raft来进行同步,这里并不需要阻塞等待Start的结果,因为没必要。
调用的RPC是ShardKV.FetchShardData,后面会介绍。
当进行shard迁移时,目标节点如果已经导入了该shard的数据,就可以调用该函数请求源节点删除该shard的数据了,因为迁移完成后,源节点没必要保存旧的shard数据了。
//发送给shard源节点,可以删除shard数据了
//一般在apply command中处理好input的shard,发送给源节点删除保存的shard数据
func (kv *ShardKV) callPeerCleanShardData(config shardctrler.Config, shardId int) {
args := CleanShardDataArgs{
ConfigNum: config.Num,
ShardNum: shardId,
}
t := time.NewTimer(CallPeerCleanShardDataTimeOut)
defer t.Stop()
for {
//因为并不知道哪一个节点是leader,因此群发吧
for _, group := range config.Groups[config.Shards[shardId]] {
reply := CleanShardDataReply{}
srv := kv.make_end(group)
done := make(chan bool, 1)
go func(args *CleanShardDataArgs, reply *CleanShardDataReply) {
done <- srv.Call("ShardKV.CleanShardData", args, reply)
}(&args, &reply)
t.Reset(CallPeerCleanShardDataTimeOut)
select {
case <-kv.stopCh:
return
case <-t.C:
case isDone := <-done:
if isDone && reply.Success == true {
return
}
}
}
kv.lock("callPeerCleanShardData")
if kv.config.Num != config.Num+1 || len(kv.inputShards) == 0 {
kv.unlock("callPeerCleanShardData")
break
}
kv.unlock("callPeerCleanShardData")
}
}
会轮询源group的所有节点发送RPC请求,因为并不知道哪一个节点时主节点,只要某一个节点返回成功,就可以了。
调用的RPC是ShardKV.CleanShardData,后面会介绍。
主要是两种RPC的处理:
//请求获取shard
func (kv *ShardKV) FetchShardData(args *FetchShardDataArgs, reply *FetchShardDataReply) {
kv.log("get req fetchsharddata:args:%+v, reply:%+v", args, reply)
defer kv.log("resp fetchsharddata:args:%+v, reply:%+v", args, reply)
kv.lock("fetchShardData")
defer kv.unlock("fetchShardData")
//必须是过去的config
if args.ConfigNum >= kv.config.Num {
return
}
reply.Success = false
if configData, ok := kv.outputShards[args.ConfigNum]; ok {
if shardData, ok := configData[args.ShardNum]; ok {
reply.Success = true
reply.Data = make(map[string]string)
reply.CommandIndexes = make(map[int64]int64)
for k, v := range shardData.Data {
reply.Data[k] = v
}
for k, v := range shardData.CommandIndexes {
reply.CommandIndexes[k] = v
}
}
}
return
}
//请求清除shard
func (kv *ShardKV) CleanShardData(args *CleanShardDataArgs, reply *CleanShardDataReply) {
kv.log("get req CleanShardData:args:%+v, reply:%+v", args, reply)
defer kv.log("resp CleanShardData:args:%+v, reply:%+v", args, reply)
kv.lock("cleanShardData")
//必须是过去的config
if args.ConfigNum >= kv.config.Num {
kv.unlock("cleanShardData")
return
}
kv.unlock("cleanShardData")
_, _, isLeader := kv.rf.Start(*args)
if !isLeader {
return
}
// 简单处理下。。。
for i := 0; i < 10; i++ {
kv.lock("cleanShardData")
exist := kv.OutputDataExist(args.ConfigNum, args.ShardNum)
kv.unlock("cleanShardData")
if !exist {
reply.Success = true
return
}
time.Sleep(time.Millisecond * 20)
}
//采用下面这种方式获取start结果,其实会慢一些,还会出现锁的问题
//kv.lock("CleanShardData")
//ch := make(chan struct{}, 1)
//kv.cleanOutputDataNotifyCh[fmt.Sprintf("%d%d", args.ConfigNum, args.ShardNum)] = ch
//kv.unlock("CleanShardData")
//t := time.NewTimer(WaitCmdTimeOut)
//defer t.Stop()
//
//select {
//case <-t.C:
//case <-ch:
//case <-kv.stopCh:
//}
//
//kv.lock("removeCh")
删除ch
//if _, ok := kv.cleanOutputDataNotifyCh[fmt.Sprintf("%d%d", args.ConfigNum, args.ShardNum)]; ok {
// delete(kv.cleanOutputDataNotifyCh, fmt.Sprintf("%d%d", args.ConfigNum, args.ShardNum))
//}
判断是否还存在
//exist := kv.OutputDataExist(args.ConfigNum, args.ShardNum)
//kv.unlock("removeCh")
//if !exist {
// reply.Success = true
//}
return
}
func (kv *ShardKV) handleApplyCh() {
for {
select {
case <-kv.stopCh:
kv.log("get from stopCh,server-%v stop!", kv.me)
return
case cmd := <-kv.applyCh:
//处理快照命令,读取快照的内容
if cmd.SnapshotValid {
kv.log("%v get install sn,%v %v", kv.me, cmd.SnapshotIndex, cmd.SnapshotTerm)
kv.lock("waitApplyCh_sn")
kv.readPersist(false, cmd.SnapshotTerm, cmd.SnapshotIndex, cmd.Snapshot)
kv.unlock("waitApplyCh_sn")
continue
}
//处理普通命令
if !cmd.CommandValid {
continue
}
cmdIdx := cmd.CommandIndex
//处理不同的命令
if op, ok := cmd.Command.(Op); ok {
kv.handleOpCommand(cmdIdx, op)
} else if config, ok := cmd.Command.(shardctrler.Config); ok {
kv.handleConfigCommand(cmdIdx, config)
} else if mergeData, ok := cmd.Command.(MergeShardData); ok {
kv.handleMergeShardDataCommand(cmdIdx, mergeData)
} else if cleanData, ok := cmd.Command.(CleanShardDataArgs); ok {
kv.handleCleanShardDataCommand(cmdIdx, cleanData)
} else {
panic("apply command,NOT FOUND COMMDN!")
}
}
}
}
这个函数和kvraft、shardctrler并没有多大的不同,处理逻辑是在一个for循环中,从applyCh中会获取五种命令:①快照命令;②客户端普通命令;③更新配置命令;④迁入的shard数据保存命令;⑤迁出的shard数据清除命令。
其实也就分别是调用五种函数来进行处理,会依次进行介绍。
//处理get、put、append命令
func (kv *ShardKV) handleOpCommand(cmdIdx int, op Op) {
kv.log("start apply command %v:%+v", cmdIdx, op)
kv.lock("handleApplyCh")
defer kv.unlock("handleApplyCh")
shardId := key2shard(op.Key)
//判断能够执行该命令
if err := kv.ProcessKeyReady(op.ConfigNum, op.Key); err != OK {
kv.notifyWaitCommand(op.ReqId, err, "")
return
}
if op.Method == "Get" {
//处理读
e, v := kv.getValueByKey(op.Key)
kv.notifyWaitCommand(op.ReqId, e, v)
} else if op.Method == "Put" || op.Method == "Append" {
//处理写
//判断命令是否重复
isRepeated := false
if v, ok := kv.lastApplies[shardId][op.ClientId]; ok {
if v == op.CommandId {
isRepeated = true
}
}
if !isRepeated {
switch op.Method {
case "Put":
kv.data[shardId][op.Key] = op.Value
kv.lastApplies[shardId][op.ClientId] = op.CommandId
case "Append":
e, v := kv.getValueByKey(op.Key)
if e == ErrNoKey {
//按put处理
kv.data[shardId][op.Key] = op.Value
kv.lastApplies[shardId][op.ClientId] = op.CommandId
} else {
//追加
kv.data[shardId][op.Key] = v + op.Value
kv.lastApplies[shardId][op.ClientId] = op.CommandId
}
default:
panic("unknown method " + op.Method)
}
}
//命令处理成功
kv.notifyWaitCommand(op.ReqId, OK, "")
} else {
panic("unknown method " + op.Method)
}
kv.log("apply op: cmdId:%d, op: %+v, data:%v", cmdIdx, op, kv.data[shardId][op.Key])
//每应用一条命令,就判断是否进行持久化
kv.saveSnapshot(cmdIdx)
}
这里主要下ProcessKeyReady函数:
//判断能否执行客户端发来的命令
func (kv *ShardKV) ProcessKeyReady(configNum int, key string) Err {
//config不对
if configNum == 0 || configNum != kv.config.Num {
kv.log("process key ready err config.")
return ErrWrongGroup
}
shardId := key2shard(key)
//没有分配该shard
if _, ok := kv.meShards[shardId]; !ok {
kv.log("process key ready err shard.")
return ErrWrongGroup
}
//正在迁移,这里有优化的空间,如果没有迁移完成,可以直接请求目标节点完成操作并返回,但是这样就太复杂了,这里简略了
if _, ok := kv.inputShards[shardId]; ok {
kv.log("process key ready err waitShard.")
return ErrWrongGroup
}
return OK
}
只要不满足任意一个条件,都不能执行客户端发来的普通命令。
//处理config命令,即更新config
//主要是处理meshard、inputshard、outputshard
func (kv *ShardKV) handleConfigCommand(cmdIdx int, config shardctrler.Config) {
kv.log("start handle config %v:%+v", cmdIdx, config)
kv.lock("handleApplyCh")
defer kv.unlock("handleApplyCh")
if config.Num <= kv.config.Num {
kv.saveSnapshot(cmdIdx)
return
}
if config.Num != kv.config.Num+1 {
panic("applyConfig err")
}
oldConfig := kv.config.Copy()
outputShards := make([]int, 0, shardctrler.NShards)
inputShards := make([]int, 0, shardctrler.NShards)
meShards := make([]int, 0, shardctrler.NShards)
for i := 0; i < shardctrler.NShards; i++ {
if config.Shards[i] == kv.gid {
meShards = append(meShards, i)
if oldConfig.Shards[i] != kv.gid {
inputShards = append(inputShards, i)
}
} else {
if oldConfig.Shards[i] == kv.gid {
outputShards = append(outputShards, i)
}
}
}
//处理当前的shard
kv.meShards = make(map[int]bool)
for _, shardId := range meShards {
kv.meShards[shardId] = true
}
//处理移出的shard
//保存当前所处配置的所有移除的shard数据
d := make(map[int]MergeShardData)
for _, shardId := range outputShards {
mergeShardData := MergeShardData{
ConfigNum: oldConfig.Num,
ShardNum: shardId,
Data: kv.data[shardId],
CommandIndexes: kv.lastApplies[shardId],
}
d[shardId] = mergeShardData
//初始化数据
kv.data[shardId] = make(map[string]string)
kv.lastApplies[shardId] = make(map[int64]int64)
}
kv.outputShards[oldConfig.Num] = d
//处理移入的shard
kv.inputShards = make(map[int]bool)
if oldConfig.Num != 0 {
for _, shardId := range inputShards {
kv.inputShards[shardId] = true
}
}
kv.config = config
kv.oldConfig = oldConfig
kv.log("apply op: cmdId:%d, config:%+v", cmdIdx, config)
kv.saveSnapshot(cmdIdx)
}
主要逻辑:
如果configNum小于等于当前配置的configNum,直接返回;
如果configNum不是当前配置的configNum+1,报错,理论上是不会出现报错的,因为在leader query到新的config后会进行判断的;
根据新的config的shard信息更新三个数据结构:
①meshard:当前config负责的shard;
②outputShard:当前config相比于上一个config不再负责的shard信息,主要用于供迁移的目标节点获取shard数据;
③inputShard:当前config相比于上一个config新负责的shard。
后两个数据结构主要用于shard数据迁移。
保存olgConfig,并尝试进行一次快照。
//处理新的shard数据,即input shard
func (kv *ShardKV) handleMergeShardDataCommand(cmdIdx int, data MergeShardData) {
kv.log("start merge Shard Data %v:%+v", cmdIdx, data)
kv.lock("handleApplyCh")
defer kv.unlock("handleApplyCh")
if kv.config.Num != data.ConfigNum+1 {
return
}
if _, ok := kv.inputShards[data.ShardNum]; !ok {
return
}
kv.data[data.ShardNum] = make(map[string]string)
kv.lastApplies[data.ShardNum] = make(map[int64]int64)
for k, v := range data.Data {
kv.data[data.ShardNum][k] = v
}
for k, v := range data.CommandIndexes {
kv.lastApplies[data.ShardNum][k] = v
}
delete(kv.inputShards, data.ShardNum)
kv.log("apply op: cmdId:%d, mergeShardData:%+v", cmdIdx, data)
kv.saveSnapshot(cmdIdx)
go kv.callPeerCleanShardData(kv.oldConfig, data.ShardNum)
}
//处理已经迁移走的shard,即output shard
func (kv *ShardKV) handleCleanShardDataCommand(cmdIdx int, data CleanShardDataArgs) {
kv.log("start clean shard data %v:%+v", cmdIdx, data)
kv.lock("handleApplyCh")
defer kv.unlock("handleApplyCh")
//如果要清除的shard确实是在outputShard中,且没有被清除,则需要清除
if kv.OutputDataExist(data.ConfigNum, data.ShardNum) {
delete(kv.outputShards[data.ConfigNum], data.ShardNum)
}
//通知等待协程
//if ch, ok := kv.cleanOutputDataNotifyCh[fmt.Sprintf("%d%d", data.ConfigNum, data.ShardNum)]; ok {
// ch <- struct{}{}
//}
kv.saveSnapshot(cmdIdx)
}
当shard数据迁移中,目标节点在handleMergeShardDataCommand中完成了数据迁移,会调用callPeerCleanShardData向源节点发送RPC请求清除该shard的数据,源节点接收到该RPC,就会提交同步一条命令,而这条命令的处理就是handleCleanShardDataCommand。
主要就是从outputShards中清除某个config-shard的数据。
//读取快照
//两处调用:初始化阶段;收到Snapshot命令,即接收了leader的Snapshot
func (kv *ShardKV) readPersist(isInit bool, snapshotTerm, snapshotIndex int, data []byte) {
if data == nil || len(data) < 1 {
return
}
//只要不是初始化调用,即如果收到一个Snapshot命令,就要执行该函数
//不知道为什么,只要在ShardKV中调用该函数,就会导致测试一直阻塞,就算该函数为空也没办法通过,只能注释掉,将CondInstallSnapshot的逻辑写到Raft中的InstallSnapshot RPC的处理代码中
//if !isInit {
// res := kv.rf.CondInstallSnapshot(snapshotTerm, snapshotIndex, data)
// if !res {
// log.Panicln("kv read persist err in CondInstallSnapshot!")
// return
// }
//}
//对数据进行同步
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var kvData [shardctrler.NShards]map[string]string
var lastApplies [shardctrler.NShards]map[int64]int64
var inputShards map[int]bool
var outputShards map[int]map[int]MergeShardData
var config shardctrler.Config
var oldConfig shardctrler.Config
var meShards map[int]bool
if d.Decode(&kvData) != nil ||
d.Decode(&lastApplies) != nil ||
d.Decode(&inputShards) != nil ||
d.Decode(&outputShards) != nil ||
d.Decode(&config) != nil ||
d.Decode(&oldConfig) != nil ||
d.Decode(&meShards) != nil {
log.Fatal("kv read persist err")
} else {
kv.data = kvData
kv.lastApplies = lastApplies
kv.inputShards = inputShards
kv.outputShards = outputShards
kv.config = config
kv.oldConfig = oldConfig
kv.meShards = meShards
}
}
没什么好说的,只是有一点很离谱,只要我调用了CondInstallSnapshot函数,TestConcurrent3就会导致测试一直阻塞,就算该函数为空也没办法通过,只能注释掉,将CondInstallSnapshot的逻辑写到Raft中的InstallSnapshot RPC的处理代码中。
//保存快照
func (kv *ShardKV) saveSnapshot(logIndex int) {
//判断条件,满足一定的日志量才能进行持久化
if kv.maxraftstate == -1 || kv.persister.RaftStateSize() < kv.maxraftstate {
return
}
//生成快照数据
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if e.Encode(kv.data) != nil ||
e.Encode(kv.lastApplies) != nil ||
e.Encode(kv.inputShards) != nil ||
e.Encode(kv.outputShards) != nil ||
e.Encode(kv.config) != nil ||
e.Encode(kv.oldConfig) != nil ||
e.Encode(kv.meShards) != nil {
panic("gen snapshot data encode err")
}
data := w.Bytes()
kv.rf.Snapshot(logIndex, data)
}
这部分代码没什么好说的,主要是一些RPC参数和回复结构定义和回复状态码定义,
//回复状态码
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
ErrWrongGroup = "ErrWrongGroup"
ErrWrongLeader = "ErrWrongLeader"
ErrTimeOut = "ErrTimeOut"
ErrServer = "ErrServer"
)
type Err string
//主要是applyCh的处理中,ApplyMsg的Command是一个interface,因此要向labgob注册具体实现才能进行编解码
func init() {
//labgob.Register(PutAppendArgs{})
//labgob.Register(PutAppendReply{})
//labgob.Register(GetArgs{})
//labgob.Register(GetReply{})
//labgob.Register(FetchShardDataArgs{})
//labgob.Register(FetchShardDataReply{})
labgob.Register(CleanShardDataArgs{})
//labgob.Register(CleanShardDataReply{})
labgob.Register(MergeShardData{})
}
// Put or Append
type PutAppendArgs struct {
// You'll have to add definitions here.
Key string
Value string
Op string // "Put" or "Append"
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
ClientId int64
CommandId int64
ConfigNum int
}
type PutAppendReply struct {
Err Err
}
func (c *PutAppendArgs) copy() PutAppendArgs {
r := PutAppendArgs{
Key: c.Key,
Value: c.Value,
Op: c.Op,
ClientId: c.ClientId,
CommandId: c.CommandId,
ConfigNum: c.ConfigNum,
}
return r
}
type GetArgs struct {
Key string
// You'll have to add definitions here.
ClientId int64
CommandId int64
ConfigNum int
}
type GetReply struct {
Err Err
Value string
}
func (c *GetArgs) copy() GetArgs {
r := GetArgs{
Key: c.Key,
ClientId: c.ClientId,
CommandId: c.CommandId,
ConfigNum: c.ConfigNum,
}
return r
}
//用于向目标节点获取input shard
type FetchShardDataArgs struct {
ConfigNum int
ShardNum int
}
type FetchShardDataReply struct {
Success bool
CommandIndexes map[int64]int64
Data map[string]string
}
func (reply *FetchShardDataReply) Copy() FetchShardDataReply {
res := FetchShardDataReply{
Success: reply.Success,
Data: make(map[string]string),
CommandIndexes: make(map[int64]int64),
}
for k, v := range reply.Data {
res.Data[k] = v
}
for k, v := range reply.CommandIndexes {
res.CommandIndexes[k] = v
}
return res
}
//用于请求目标节点清除指定的output shard
type CleanShardDataArgs struct {
ConfigNum int
ShardNum int
}
type CleanShardDataReply struct {
Success bool
}
//用于存储output shard的数据,以及充当input shard在apply的命令
type MergeShardData struct {
ConfigNum int
ShardNum int
CommandIndexes map[int64]int64 //当前shard的所有客户端的最后一条命令id
Data map[string]string
}