• MIT6.824-lab3A-Key/value service without snapshots(基本的KV服务)


    3A(基本的KV服务)

    前言

    lab3 的内容是要在 lab2 的基础上实现一个高可用的 KV 存储服务,算是要将 raft 真正的用起来。

    而在该任务中,要处理一个很重要的事情,就是线性化语义,也可以要求每个请求要具有幂等性。

    考虑这样一个场景,客户端向服务端提交了一条日志,服务端将其在 raft 组中进行了同步并成功 commit,接着在 apply 后返回给客户端执行结果。然而不幸的是,该 rpc 在传输中发生了丢失,客户端并没有收到写入成功的回复。因此,客户端只能进行重试直到明确地写入成功或失败为止,这就可能会导致相同地命令被执行多次,从而违背线性一致性。

    有人可能认为,只要写请求是幂等的,那重复执行多次也是可以满足线性一致性的,实际上则不然。考虑这样一个例子:对于一个仅支持 put 和 get 接口的 raftKV 系统,其每个请求都具有幂等性。设 x 的初始值为 0,此时有两个并发客户端,客户端 1 执行 put(x,1),客户端 2 执行 get(x) 再执行 put(x,2),问(客户端 2 读到的值,x 的最终值)是多少。对于线性一致的系统,答案可以是 (0,1),(0,2) 或 (1,2)。然而,如果客户端 1 执行 put 请求时发生了上段描述的情况,然后客户端 2 读到 x 的值为 1 并将 x 置为了 2,最后客户端 1 超时重试且再次将 x 置为 1。对于这种场景,答案是 (1,1),这就违背了线性一致性。归根究底还是由于幂等的 put(x,1) 请求在状态机上执行了两次,有两个 LZ 点。因此,即使写请求的业务语义能够保证幂等,不进行额外的处理让其重复执行多次也会破坏线性一致性。当然,读请求由于不改变系统的状态,重复执行多次是没问题的。

    基本思路便是:每个 client 都需要一个唯一的标识符,它的每个不同命令需要有一个顺序递增的 commandId,clientId 和这个 commandId可以唯一确定一个不同的命令,从而使得各个 raft 节点可以记录保存各命令是否已应用以及应用以后的结果。

    如果默认一个客户端只能串行执行请求的话,服务端这边只需要记录一个 map,其 key 是 clientId,其 value 是该 clientId 执行的最后一条日志的 commandId 和状态机的输出即可。

    任务

    • 完成client和server基本构造;
    • 客户端方面主要完成Get、Put、PutAppend三个函数的处理。客户端可以向键/值服务发送三种不同的 RPC:Put(key, value)、Append(key, arg) 和 Get(key)。该服务维护一个简单的键/值对数据库。键和值是字符串。 Put(key, value) 替换数据库中特定键的值,Append(key, arg) 将 arg 附加到键的值,Get(key) 获取键的当前值。一个不存在的键的 Get 应该返回一个空字符串。 Append 到不存在的键应该像 Put 一样
    • server中实现 PutAppend() 和 Get() RPC 处理程序。这些处理程序应该使用 Start() 在 Raft 日志中输入一个 Op,在 server完成 Op 结构定义,以便它描述 Put/Append/Get 操作;
    • server要完成applyCh的处理,对于每一个要引用的命令,要进行处理。每个服务器都应该在 Raft 提交它们时执行 Op 命令,即当它们出现在 applyCh 上时。当 Raft 提交其 Op 时,RPC 处理程序应该注意到,然后回复 RPC。

    任务须知

    • 我被append坑了,它的意思是追加,我原本以为不存在的key会直接报ErrNoKey,结果TestSpeed3A一直通不过,再仔细看了以下任,发现了这样一句话:

      An Append to a non-existent key should act like Put。也就是说如果此时key不存在,会直接设置一个key;

    • 一定要满足线性化语义,做到exactly once语义,即At Least Once + 幂等性。因此每一个命令都要有唯一标识,即clientId+commandId;客户端针对server发送来的不同的err都要进行不同的处理,直到接收到命令的正确执行结果;

    • 在调用Start()向raft提交命令后,需要进行等待,直到该命令提交并应用完成,或者超时;

    • 当收到Applych中发来的可引用的命令(Op),则根据操作符进行不同的处理,处理完成后,唤醒等待协程;

    • 在server中,当应用get命令前,不需要判断命令是否重复;在应用put、append命令前,需要判断命令是否重复,即幂等性的处理;

    • 客户端并不知道哪一个节点是leader,因此根据返回的错误调整请求的节点。

    代码

    RPC相关(Common)

    const (
    	OK             = "OK"
    	ErrNoKey       = "ErrNoKey"
    	ErrWrongLeader = "ErrWrongLeader"
    	ErrTimeOut     = "ErrTimeOut"
    	ErrServer      = "ErrServer"
    )
    
    type Err string
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    rpc中服务器返回给客户端的五种状态码,字面意思理解就行了。不同的状态码会进行不同的处理,比如:

    • OK表示命令执行成功,可以退出了;
    • ErrNoKey表示没有这个key,直接退出;
    • ErrWrongLeader表示请求的节点并不是leader,因此按照某种规则换一个节点再进行请求,我这里使用的策略是轮询请求;
    • ErrTimeOut表示请求超时,继续请求;
    • ErrServer表示server出现了一些问题crash了,可以换一个节点继续请求。
    type PutAppendArgs struct {
        Key   string
        Value string
        Op    string // "Put" or "Append"
        // You'll have to add definitions here.
        // Field names must start with capital letters,
        // otherwise RPC will break.
        ClientId  int64
        CommandId int64
    }
    
    type PutAppendReply struct {
        Err Err
    }
    
    type GetArgs struct {
        Key       string
        ClientId  int64
        CommandId int64
        // You'll have to add definitions here.
    }
    
    type GetReply struct {
        Err   Err
        Value string
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    get和put在rpc通信中的args和reply,字面意思理解。

    client

    //客户端
    type Clerk struct {
    	servers []*labrpc.ClientEnd
    	// You will have to modify this struct.
    	clientId int64
    	leaderId int
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    一共三个属性,分别是每一个服务端的ClientEnd、客户端id、当前请求的leaderId(并不一定是leader,仅仅是一个请求标识)

    //生成一个客户端
    func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
        ck := new(Clerk)
        ck.servers = servers
        ck.clientId = nrand()
        // You'll have to add code here.
        return ck
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    创建客户端的代码也很简单,nrand()是自带的函数,返回一个int64的随机数。

    接下来就是三种命令的处理了:

    当发送一个get请求:

    //根据key获取value
    func (ck *Clerk) Get(key string) string {
    	// You will have to modify this function.
    	//DPrintf("%v client get key:%s.", ck.clientId, key)
    	args := GetArgs{
    		Key:       key,
    		ClientId:  ck.clientId,
    		CommandId: nrand(),
    	}
    	leaderId := ck.leaderId
    	for {
    		reply := GetReply{}
    		ok := ck.servers[leaderId].Call("KVServer.Get", &args, &reply)
    		if !ok {
    			//如果请求失败,等一段时间再请求,换一个节点再请求
    			DPrintf("%v client get key %v from server %v,not ok.", ck.clientId, key, leaderId)
    			time.Sleep(ChangeLeaderInterval)
    			leaderId = (leaderId + 1) % len(ck.servers)
    			continue
    		} else if reply.Err != OK {
    			DPrintf("%v client get key %v from server %v,reply err = %v!", ck.clientId, key, leaderId, reply.Err)
    		}
    
    		switch reply.Err {
    		case OK:
    			DPrintf("%v client get key %v from server %v,value: %v,OK.", ck.clientId, key, leaderId, reply.Value, leaderId)
    			ck.leaderId = leaderId
    			return reply.Value
    		case ErrNoKey:
    			DPrintf("%v client get key %v from server %v,NO KEY!", ck.clientId, key, leaderId)
    			ck.leaderId = leaderId
    			return ""
    		case ErrTimeOut:
    			continue
    		default:
    			time.Sleep(ChangeLeaderInterval)
    			leaderId = (leaderId + 1) % len(ck.servers)
    			continue
    		}
    
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    主要就是发送Get RPC,获取指定key的value,以及状态码,根据不同的状态码进行不同的处理,前面对状态码也有介绍。

    而Put和Append请求如下:

    func (ck *Clerk) Put(key string, value string) {
    	ck.PutAppend(key, value, "Put")
    }
    func (ck *Clerk) Append(key string, value string) {
    	ck.PutAppend(key, value, "Append")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这两种命令都是调用同一个函数进行处理的,因为他们的请求参数是相同的:

    func (ck *Clerk) PutAppend(key string, value string, op string) {
    	DPrintf("%v client PutAppend,key:%v,value:%v,op:%v", ck.clientId, key, value, op)
    	// You will have to modify this function.
    	args := PutAppendArgs{
    		Key:       key,
    		Value:     value,
    		Op:        op,
    		ClientId:  ck.clientId,
    		CommandId: nrand(),
    	}
    	leaderId := ck.leaderId
    	for {
    		reply := PutAppendReply{}
    		ok := ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply)
    		if !ok {
    			//可能当前请求的server不是leader,换一个server再访问
    			DPrintf("%v client set key %v to %v to server %v,not ok.", ck.clientId, key, value, leaderId)
    			time.Sleep(ChangeLeaderInterval)
    			leaderId = (leaderId + 1) % len(ck.servers)
    			continue
    		} else if reply.Err != OK {
    			DPrintf("%v client set key %v to %v to server %v,reply err = %v!", ck.clientId, key, value, leaderId, reply.Err)
    		}
    
    		switch reply.Err {
    		case OK:
    			DPrintf("%v client set key %v to %v to server %v,OK.", ck.clientId, key, value, leaderId)
    			ck.leaderId = leaderId
    			return
    		case ErrNoKey:
    			DPrintf("%v client set key %v to %v to server %v,NOKEY!", ck.clientId, key, value, leaderId)
    			return
    		case ErrTimeOut:
    			continue
    		case ErrWrongLeader:
    			//换一个节点继续请求
    			time.Sleep(ChangeLeaderInterval)
    			leaderId = (leaderId + 1) % len(ck.servers)
    			continue
    		case ErrServer:
    			//换一个节点继续请求
    			time.Sleep(ChangeLeaderInterval)
    			leaderId = (leaderId + 1) % len(ck.servers)
    			continue
    		default:
    			log.Fatal("client rev unknown err", reply.Err)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    和Get的处理大体相同,发送PutAppend RPC,获取指定key的value,以及状态码,根据不同的状态码进行不同的处理。

    server

    服务端的代码主要分为:

    • 数据结构定义;
    • 初始化代码;
    • RPC接收处理代码;
    • Applych处理代码,即命令应用代码;
    数据结构
    const WaitCmdTimeOut = time.Millisecond * 500 // cmd执行超过这个时间,就返回timeout
    const MaxLockTime = time.Millisecond * 10     // debug
    
    type Op struct {
        // Your definitions here.
        // Field names must start with capital letters,
        // otherwise RPC will break.
        ReqId     int64 //用来标识commandNotify
        CommandId int64
        ClientId  int64
        Key       string
        Value     string
        Method    string
    }
    
    type CommandResult struct {
        Err   Err
        Value string
    }
    
    type KVServer struct {
        mu      sync.Mutex
        me      int
        rf      *raft.Raft
        applyCh chan raft.ApplyMsg
        dead    int32 // set by Kill()
        stopCh  chan struct{}
    
        maxraftstate int // snapshot if log grows this big
    
        // Your definitions here.
        commandNotifyCh map[int64]chan CommandResult
        lastApplies     map[int64]int64 //k-v:ClientId-CommandId
        data            map[string]string
    
        //持久化
        persister      *raft.Persister
    
        //用于互斥锁
        lockStartTime time.Time
        lockEndTime   time.Time
        lockMsg       string
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • Op是对接收到的命令进行的一个解析封装,发送给raft,便于对Applych的处理;

    • CommandResult是对每一个命令应用结果的封装;

    • KVServer是最重要的,定义了server的结构,简单介绍下几个属性:

      maxraftstate:当raft的日志数量达到这个数量,就进行一次snapshot;

      commandNotifyCh:用于命令应用后对请求协程的唤醒;

      lastApplies:实现线性化语义,k-v = clientid-commandid;

      data:具体数据;

      persister:用于保存初始化server的persister,其实它的用处只有一点:获取raft的日志长度用于snapshot判断。因为raft的属性都是私有的,没法访问,为了保证不在raft中进行修改,因此保存一个persister用于调用。

    初始化代码
    func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
        // call labgob.Register on structures you want
        // Go's RPC library to marshall/unmarshall.
        labgob.Register(Op{})
    
        kv := new(KVServer)
        kv.me = me
        kv.maxraftstate = maxraftstate
        kv.persister = persister
    
        // You may need initialization code here.
        kv.lastApplies = make(map[int64]int64)
        kv.data = make(map[string]string)
    
        kv.stopCh = make(chan struct{})
        //读取快照
        kv.readPersist(true, 0, 0, kv.persister.ReadSnapshot())
    
        kv.commandNotifyCh = make(map[int64]chan CommandResult)
        kv.applyCh = make(chan raft.ApplyMsg)
        kv.rf = raft.Make(servers, me, persister, kv.applyCh)
    
        go kv.handleApplyCh()
    
        return kv
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    初始化代码,读取快照部分可以忽略,用于3B中。

    RPC接收处理代码
    //处理Get rpc
    func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
    	// Your code here.
    	DPrintf("server %v in rpc Get,args: %+v", kv.me, args)
    
    	_, isLeader := kv.rf.GetState()
    	if !isLeader {
    		reply.Err = ErrWrongLeader
    		return
    	}
    
    	op := Op{
    		ReqId:     nrand(),
    		ClientId:  args.ClientId,
    		CommandId: args.CommandId,
    		Key:       args.Key,
    		Method:    "Get",
    	}
    	//等待命令执行
    	res := kv.waitCmd(op)
    	reply.Err = res.Err
    	reply.Value = res.Value
    
    	DPrintf("server %v in rpc Get,args:%+v,reply:%+v", kv.me, args, reply)
    }
    
    //处理Put rpc
    func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    	// Your code here.
    	DPrintf("server %v in rpc PutAppend,args: %+v", kv.me, args)
    	_, isLeader := kv.rf.GetState()
    	if !isLeader {
    		reply.Err = ErrWrongLeader
    		return
    	}
    	op := Op{
    		ReqId:     nrand(),
    		ClientId:  args.ClientId,
    		CommandId: args.CommandId,
    		Key:       args.Key,
    		Value:     args.Value,
    		Method:    args.Op,
    	}
    	//等待命令执行
    	res := kv.waitCmd(op)
    	reply.Err = res.Err
    
    	DPrintf("server %v in rpc PutAppend,args:%+v,reply:%+v", kv.me, args, reply)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    Get和PutAppend两个函数主要用于处理客户端发送来的两种RPC,除了部分代码不同,主要是:

    • 判断该节点是否是leader,其实一个kv数据库get应该是不需要指定leader节点的,但是这么做,会出现错误,而且官方的任务书中也说了Get也要调用Start()方法,因此,就只能leader来进行处理了;
    • 封装command,即Op;
    • 调用waitCmd等待处理;
    • 返回;

    waitCmd:

    //调用start向raft请求命令
    func (kv *KVServer) waitCmd(op Op) (res CommandResult) {
    	DPrintf("server %v wait cmd start,Op: %+v.\n", kv.me, op)
    
    	//提交命令,其实这里的start要改,一个kv数据库get命令可以发生在所有节点上
    	index, term, isLeader := kv.rf.Start(op)
    	if !isLeader {
    		res.Err = ErrWrongLeader
    		return
    	}
    
    	kv.lock("waitCmd")
    	ch := make(chan CommandResult, 1)
    	kv.commandNotifyCh[op.ReqId] = ch
    	kv.unlock("waitCmd")
    	DPrintf("start cmd: index:%d, term:%d, op:%+v", index, term, op)
    
    	t := time.NewTimer(WaitCmdTimeOut)
    	defer t.Stop()
    	select {
    	case <-kv.stopCh:
    		DPrintf("stop ch waitCmd")
    		kv.removeCh(op.ReqId)
    		res.Err = ErrServer
    		return
    	case res = <-ch:
    		kv.removeCh(op.ReqId)
    		return
    	case <-t.C:
    		kv.removeCh(op.ReqId)
    		res.Err = ErrTimeOut
    		return
    
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    主要的处理步骤:

    • 调用Start提交该命令;
    • 创建一个用于处理该命令的唤醒ch;
    • 阻塞等待ch的返回,不管是哪个ch返回,都要删除前一步创建的ch,防止内存泄漏。

    具体的关闭函数也很简单:

    func (kv *KVServer) removeCh(reqId int64) {
    	kv.lock("removeCh")
    	defer kv.unlock("removeCh")
    	delete(kv.commandNotifyCh, reqId)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    命令应用代码

    接下来就是整个server中最重要的函数了:

    //应用每一条命令
    func (kv *KVServer) handleApplyCh() {
    	for {
    		select {
    		case <-kv.stopCh:
    			DPrintf("get from stopCh,server-%v stop!", kv.me)
    			return
    		case cmd := <-kv.applyCh:
    			//处理快照命令,读取快照的内容
    			if cmd.SnapshotValid {
    				//等待完成
    			}
    			//处理普通命令
    			if !cmd.CommandValid {
    				continue
    			}
    			cmdIdx := cmd.CommandIndex
    			DPrintf("server %v start apply command %v:%+v", kv.me, cmdIdx, cmd.Command)
    			op := cmd.Command.(Op)
    			kv.lock("handleApplyCh")
    
    			if op.Method == "Get" {
    				//处理读
    				e, v := kv.getValueByKey(op.Key)
    				kv.notifyWaitCommand(op.ReqId, e, v)
    			} else if op.Method == "Put" || op.Method == "Append" {
    				//处理写
    				//判断命令是否重复
    				isRepeated := false
    				if v, ok := kv.lastApplies[op.ClientId]; ok {
    					if v == op.CommandId {
    						isRepeated = true
    					}
    				}
    
    				if !isRepeated {
    					switch op.Method {
    					case "Put":
    						kv.data[op.Key] = op.Value
    						kv.lastApplies[op.ClientId] = op.CommandId
    					case "Append":
    						e, v := kv.getValueByKey(op.Key)
    						if e == ErrNoKey {
    							//按put处理
    							kv.data[op.Key] = op.Value
    							kv.lastApplies[op.ClientId] = op.CommandId
    						} else {
    							//追加
    							kv.data[op.Key] = v + op.Value
    							kv.lastApplies[op.ClientId] = op.CommandId
    						}
    					default:
    						kv.unlock("handleApplyCh")
    						panic("unknown method " + op.Method)
    					}
    
    				}
    				//命令处理成功
    				kv.notifyWaitCommand(op.ReqId, OK, "")
    			} else {
    				kv.unlock("handleApplyCh")
    				panic("unknown method " + op.Method)
    			}
    
    			DPrintf("apply op: cmdId:%d, op: %+v, data:%v", cmdIdx, op, kv.data[op.Key])
    			//每应用一条命令,就判断是否进行持久化
    			kv.saveSnapshot(cmdIdx)
    
    			kv.unlock("handleApplyCh")
    		}
    
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    处理逻辑是在一个for循环中,从applyCh中会获取两种命令:快照命令和普通命令。具体的结构可以看raft。包含的持久化部分可以忽略,只要知道在那个地方要进行什么处理就行。这里仅仅介绍普通命令的执行步骤:

    • 获取applyCh中的数据后,先进行一个转换,转成我们的Op结构;
    • 如果是Get操作,简单的根据key获取value,并唤醒等待的协程;
    • 如果是Put、Append操作,先要判断命令是否重复,如果不重复,表明可以执行,因此根据Put和Append分别进行处理。这里要注意的一点就是Append命令中如果key不存在,就是一个Put操作。最后唤醒等待的协程。

    其中的两个函数:

    func (kv *KVServer) getValueByKey(key string) (err Err, value string) {
        if v, ok := kv.data[key]; ok {
            err = OK
            value = v
        } else {
            err = ErrNoKey
        }
        return
    }
    
    func (kv *KVServer) notifyWaitCommand(reqId int64, err Err, value string) {
        if ch, ok := kv.commandNotifyCh[reqId]; ok {
            ch <- CommandResult{
                Err:   err,
                Value: value,
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    测试结果

    在这里插入图片描述

  • 相关阅读:
    手把手实现 CSS 加载动画(一)
    (5)点云数据处理学习——其它官网例子2
    淘宝/天猫API接口详情介绍(A类标准接口)
    一篇文章扒掉“桥梁Handler”的底裤
    Spring(七)- Spring Bean的生命周期
    Modbus RTU(Remote Terminal Unit)与RS-485协议介绍(主站设备(Master)、从站设备(Slave))
    没有炫光的台灯有哪些?2023五款优秀护眼台灯
    Visual Studio Code(vscode)下载慢 插件安装失败解决方案
    flask 可插拔视图
    Docker设置开启远程访问
  • 原文地址:https://blog.csdn.net/qq_44766883/article/details/126333690