• MIT6.824-lab2A-leader选举(leader election)


    2A(leader选举)

    前言

    当进入第二部分,就是正式的开始raft的设计了,相信要写这个项目的人应该对raft算法不算陌生,如果在编写raft的代码的时候,可以仔细琢磨一下raft的论文。

    👉 https://pdos.csail.mit.edu/6.824/papers/raft-extended.pdf

    而在第二部分,一共分为四个小任务:leader选举、日志复制、持久化、日志压缩。

    代码写在raft文件夹中,除了进行对应的任务的完成,我强烈建议阅读test_test.goconfig.go这两个文件,以及labrpc.go文件。

    任务

    实现 Raft 领导者选举和心跳(AppendEntries RPCs 暂时不用考虑日志条目)。 第 2A 部分的目标是选举单个领导者,如果没有失败,领导者仍然是领导者,如果旧领导者失败或数据包发送到,则新领导者接管。

    • 要实现自己的raft结构定义和初始化(Make),设置相关的定时器,并完成ticker函数;
    • 填写 RequestVoteArgsRequestVoteReply 结构。 修改 Make() 以创建一个后台 goroutine,该 goroutine 将通过在一段时间没有收到其他对等方的消息时发送 RequestVote RPC 来定期启动领导者选举。 这样,对等点将了解谁是领导者,如果已经有领导者,或者自己成为领导者。 实现 RequestVote() RPC 处理程序,以便服务器相互投票;
    • 要实现心跳,填写AppendEntriesArgsAppendEntriesReply结构。 这里并不需要完全实现该RPC的作用,仅仅充当心跳作用。 编写一个 AppendEntries RPC 处理程序方法来重置选举超时,以便其他服务器在已经被选举时不会作为领导者继续前进。 确保不同节点的选举超时不会总是同时触发,否则所有节点只会为自己投票,没有人会成为领导者;
    • 测试代码要求leader每秒发送心跳RPC不超过十次;
    • 测试代码要求我们的 Raft 在旧领导者失败后的 5 秒内选举新领导者(如果大多数节点仍然可以通信)。 但是请记住,在分裂投票的情况下,领导者选举可能需要多轮(如果数据包丢失或候选人不幸选择了相同的随机退避时间,则可能发生这种情况)。 我们必须选择足够短的选举超时(以及心跳间隔),即使选举需要多轮,也很可能在不到 5 秒的时间内完成;
    • 测试代码在永久关闭实例时调用 Raft 的 rf.Kill()。 我们可以使用 rf.killed() 检查是否已调用 Kill()。 我们需要在所有循环中都这样做,以避免死掉的 Raft 实例打印出令人困惑的消息。

    代码

    raft基本结构

    type LogEntry struct {
        Term    int
        Command interface{}
    }
    
    type Raft struct {
        mu        sync.Mutex          // Lock to protect shared access to this peer's state
        peers     []*labrpc.ClientEnd // RPC end points of all peers,每一个clientEnd对应了向该peer通信的端点
        persister *Persister          // Object to hold this peer's persisted state
        me        int                 // this peer's index into peers[]
        dead      int32               // set by Kill()
    
        // Your data here (2A, 2B, 2C).
        // Look at the paper's Figure 2 for a description of what
        // state a Raft server must maintain.
        role        Role
        currentTerm int
        votedFor    int
        logs        []LogEntry
        commitIndex int
        lastApplied int
        nextIndex   []int
        matchIndex  []int
    
        electionTimer       *time.Timer
        appendEntriesTimers []*time.Timer
        applyTimer          *time.Timer
        applyCh             chan ApplyMsg //这个chan是用来提交应用的日志,具体的处理在config.go文件中
        notifyApplyCh       chan struct{}
        stopCh              chan struct{}
    
        lastSnapshotIndex int // 快照中最后一条日志的index,是真正的index,不是存储在logs中的index
        lastSnapshotTerm  int
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    上面是整个第二部分所设计出来的数据结构,并不会都在2A中进行使用,有些不懂的部分可以暂时不用考虑。

    const (
        Role_Follower  = 0
        Role_Candidate = 1
        Role_Leader    = 2
    )
    
    const (
        ElectionTimeout   = time.Millisecond * 300 // 选举超时时间/心跳超时时间
        HeartBeatInterval = time.Millisecond * 150 // leader 发送心跳
        ApplyInterval     = time.Millisecond * 100 // apply log
        RPCTimeout        = time.Millisecond * 100
        MaxLockTime       = time.Millisecond * 10 // debug
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    针对每一个节点的状态和时间定义了如上的一些常量。

    raft基本函数

    先来看一下Make函数,没创建一个raft节点,都需要调用Make函数进行初始化:

    //传入了当前节点对其它节点的通信ClientEnd、用于持久化恢复的Persister、提交应用日志的applyCh
    func Make(peers []*labrpc.ClientEnd, me int,
              persister *Persister, applyCh chan ApplyMsg) *Raft {
        DPrintf("make a raft,me: %v", me)
        rf := &Raft{}
        rf.peers = peers
        rf.persister = persister
        rf.me = me
    
        // Your initialization code here (2A, 2B, 2C).
        rf.role = Role_Follower
        rf.currentTerm = 0
        rf.votedFor = -1
        rf.logs = make([]LogEntry, 1) //下标为0存储快照
        // initialize from state persisted before a crash
        rf.commitIndex = 0
        rf.lastApplied = 0
        rf.nextIndex = make([]int, len(rf.peers))
        rf.matchIndex = make([]int, len(rf.peers))
        //读取持久化数据,暂时可以不用实现
        rf.readPersist(persister.ReadRaftState())
    
        rf.electionTimer = time.NewTimer(rf.getElectionTimeout())
        rf.appendEntriesTimers = make([]*time.Timer, len(rf.peers))
        for i := 0; i < len(rf.peers); i++ {
            rf.appendEntriesTimers[i] = time.NewTimer(HeartBeatInterval)
        }
        rf.applyTimer = time.NewTimer(ApplyInterval)
        rf.applyCh = applyCh
        rf.notifyApplyCh = make(chan struct{}, 100)
        rf.stopCh = make(chan struct{})
    
        // start ticker goroutine to start elections
        rf.ticker()
    
        return rf
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    其实Make函数主要就做了三件事情:

    • 初始化Raft的基本结构;
    • 根据传入的Persister,读入持久化数据;(这一步暂时可以不用实现)
    • 定义多个定时器,并调用ticker()函数进行处理。

    ticker这个函数很重要,它是Raft节点在初始化后,真正要运行的部分:

    // The ticker go routine starts a new election if this peer hasn't received
    // heartsbeats recently.
    func (rf *Raft) ticker() {
    	go func() {
    		for {
    			select {
    			case <-rf.stopCh:
    				return
    			case <-rf.applyTimer.C:
    				rf.notifyApplyCh <- struct{}{}
    			case <-rf.notifyApplyCh: //当有日志记录提交了,要进行应用
    				rf.startApplyLogs()
    			}
    		}
    	}()
    
    	//选举定时
    	go func() {
    		for rf.killed() == false {
    
    			// Your code here to check if a leader election should
    			// be started and to randomize sleeping time using
    			// time.Sleep().
    			select {
    			case <-rf.stopCh:
    				return
    			case <-rf.electionTimer.C:
    				rf.startElection()
    			}
    
    		}
    	}()
    
    	//leader发送日志定时
    	for i, _ := range rf.peers {
    		if i == rf.me {
    			continue
    		}
    		go func(cur int) {
    			for rf.killed() == false {
    				select {
    				case <-rf.stopCh:
    					return
    				case <-rf.appendEntriesTimers[cur].C:
    					rf.sendAppendEntriesToPeer(cur)
    				}
    			}
    		}(i)
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    主要是创建三个goroutine来分别进行三种处理:

    • 第一部分:当有日志进行提交了,需要调用startApplyLogs来进行处理;(不是2A部分的内容,暂时不讲)
    • 第二部分:超过了心跳时间,就调用startElection发起选举;
    • 第三部分:每隔一段时间发调用sendAppendEntriesToPeer送append entries rpc给其他节点,只有leader才能真正发送。

    以上两个部分是raft.go文件在2A部分的主要逻辑了,以下就是一些功能函数的编写:

    func (rf *Raft) Kill() {
        atomic.StoreInt32(&rf.dead, 1)
        // Your code here, if desired.
    }
    
    func (rf *Raft) killed() bool {
        z := atomic.LoadInt32(&rf.dead)
        return z == 1
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    上面两个函数是杀死raft节点和判断是否被杀死。

    func (rf *Raft) changeRole(newRole Role) {
    	if newRole < 0 || newRole > 3 {
    		panic("unknown role")
    	}
    	rf.role = newRole
    	switch newRole {
    	case Role_Follower:
    	case Role_Candidate:
    		rf.currentTerm++
    		rf.votedFor = rf.me
    		rf.resetElectionTimer()
    	case Role_Leader:
    		//leader只有两个特殊的数据结构:nextIndex,matchIndex
    		_, lastLogIndex := rf.getLastLogTermAndIndex()
    		for i := 0; i < len(rf.peers); i++ {
    			rf.nextIndex[i] = lastLogIndex + 1
    			rf.matchIndex[i] = lastLogIndex
    		}
    		rf.resetElectionTimer()
    	default:
    		panic("unknown role")
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    以上函数是每一个raft节点要改变角色时需要做出的变化。并不一定要编写的完全一样,有些在改变角色是一同处理也是可以的。

    func (rf *Raft) GetState() (int, bool) {
        // Your code here (2A).
        flag := false
        if rf.role == Role_Leader {
            flag = true
        }
        return rf.currentTerm, flag
    }
    
    func (rf *Raft) getElectionTimeout() time.Duration {
        t := ElectionTimeout + time.Duration(rand.Int63())%ElectionTimeout
        return t
    }
    
    //返回当前状态机的最后一条日志的任期和索引
    //索引是一直会增大的,但是我们的日志队列却不可能无限增大,在队列中下标0存储快照
    func (rf *Raft) getLastLogTermAndIndex() (int, int) {
        return rf.logs[len(rf.logs)-1].Term, rf.lastSnapshotIndex + len(rf.logs) - 1
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    上面四个函数主要完成三个作用:

    • GetState:获取当前节点的term和leader标记;
    • getElectionTimeout:获取一个心跳过期时间,每一个节点的心跳过期时间大部分情况下是要不一样的;
    • getLastLogTermAndIndex:获取最后一条日志的term和index信息。

    vote rpc

    接下来要实现的函数就是跟进行选举相关的。

    先看下rpc的参数和回复结构体:

    type RequestVoteArgs struct {
    	// Your data here (2A, 2B).
    	Term         int
    	CandidateId  int
    	LastLogIndex int
    	LastLogTerm  int
    }
    
    type RequestVoteReply struct {
    	// Your data here (2A).
    	Term        int
    	VoteGranted bool
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这个没什么好说的,基本都是根据论文来实现的。

    当收到心跳,或者当前选举结束,都要重置心跳时间,也就是选举时间:

    func (rf *Raft) resetElectionTimer() {
        rf.electionTimer.Stop()
        rf.electionTimer.Reset(rf.getElectionTimeout())
    }
    
    • 1
    • 2
    • 3
    • 4

    在每一次心跳过期了,当前节点就会调用startElection()进行发起选举:

    func (rf *Raft) startElection() {
        rf.mu.Lock()
        rf.resetElectionTimer()
        if rf.role == Role_Leader {
            rf.mu.Unlock()
            return
        }
    
        rf.changeRole(Role_Candidate)
        DPrintf("%v role %v,start election,term: %v", rf.me, rf.role, rf.currentTerm)
    
        lastLogTerm, lastLogIndex := rf.getLastLogTermAndIndex()
        args := RequestVoteArgs{
            CandidateId:  rf.me,
            Term:         rf.currentTerm,
            LastLogTerm:  lastLogTerm,
            LastLogIndex: lastLogIndex,
        }
        rf.persist()
        rf.mu.Unlock()
    
        allCount := len(rf.peers)
        grantedCount := 1
        resCount := 1
        grantedChan := make(chan bool, len(rf.peers)-1)
        for i := 0; i < allCount; i++ {
            if i == rf.me {
                continue
            }
            //对每一个其他节点都要发送rpc
            go func(gch chan bool, index int) {
                reply := RequestVoteReply{}
                rf.sendRequestVote(index, &args, &reply)
                gch <- reply.VoteGranted
                if reply.Term > args.Term {
                    rf.mu.Lock()
                    if reply.Term > rf.currentTerm {
                        //放弃选举
                        rf.currentTerm = reply.Term
                        rf.changeRole(Role_Follower)
                        rf.votedFor = -1
                        rf.resetElectionTimer()
                        rf.persist()
                    }
                    rf.mu.Unlock()
                }
            }(grantedChan, i)
    
        }
    
        for rf.role == Role_Candidate {
            flag := <-grantedChan
            resCount++
            if flag {
                grantedCount++
            }
            DPrintf("vote: %v, allCount: %v, resCount: %v, grantedCount: %v", flag, allCount, resCount, grantedCount)
    
            if grantedCount > allCount/2 {
                //竞选成功
                rf.mu.Lock()
                DPrintf("before try change to leader,count:%d, args:%+v, currentTerm: %v, argsTerm: %v", grantedCount, args, rf.currentTerm, args.Term)
                if rf.role == Role_Candidate && rf.currentTerm == args.Term {
                    rf.changeRole(Role_Leader)
                }
                if rf.role == Role_Leader {
                    rf.resetAppendEntriesTimersZero()
                }
                rf.persist()
                rf.mu.Unlock()
                DPrintf("%v current role: %v", rf.me, rf.role)
            } else if resCount == allCount || resCount-grantedCount > allCount/2 {
                DPrintf("grant fail! grantedCount <= len/2:count:%d", grantedCount)
                return
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    上诉的代码的主要步骤:

    1. 判断是否是leader,因为leader是不会收到心跳的,因此如果是leader就没必要再此发起选举;
    2. 修改为候选人角色,并根据当前状态创建一个rpc参数;
    3. 分别创建一个goroutine调用sendRequestVote函数发送给每一个其它节点,结果由grantedChan进行接收;
    4. 当前goroutine就进入一个循环,主要分为三种情况:
      • 在候选人角色的情况下,如果当前收到的投票超过了半数,就选举成功,成为leader;
      • 如果在收到某个节点的回复时,变成了follower,就暂停选举;
      • 当前获取所有节点的回复,但是还没有超过半数的投票,就结束选举。

    看下sendRequestVote的实现:

    func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) {
    	if server < 0 || server > len(rf.peers) || server == rf.me {
    		panic("server invalid in sendRequestVote!")
    	}
    
    	rpcTimer := time.NewTimer(RPCTimeout)
    	defer rpcTimer.Stop()
    
    	ch := make(chan bool, 1)
    	go func() {
    		for i := 0; i < 10 && !rf.killed(); i++ {
    			ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
    			if !ok {
    				continue
    			} else {
    				ch <- ok
    				return
    			}
    		}
    	}()
    
    	select {
    	case <-rpcTimer.C:
    		DPrintf("%v role: %v, send request vote to peer %v TIME OUT!!!", rf.me, rf.role, server)
    		return
    	case <-ch:
    		return
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    会尝试10次向节点发送rpc,直到成功或者超过了此次rpc超时时间。

    而每一个节点在收到vote rpc是要进行相应的处理:

    func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
        // Your code here (2A, 2B).
        rf.mu.Lock()
        defer rf.mu.Unlock()
    
        //默认失败,返回
        lastLogTerm, lastLogIndex := rf.getLastLogTermAndIndex()
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
    
        if rf.currentTerm > args.Term {
            return
        } else if rf.currentTerm == args.Term {
            if rf.role == Role_Leader {
                return
            }
    
            if args.CandidateId == rf.votedFor {
                reply.Term = args.Term
                reply.VoteGranted = true
                return
            }
            if rf.votedFor != -1 && args.CandidateId != rf.votedFor {
                return
            }
    
            //还有一种情况,没有投过票
        }
    
        if rf.currentTerm < args.Term {
            rf.currentTerm = args.Term
            rf.changeRole(Role_Follower)
            rf.votedFor = -1
            reply.Term = rf.currentTerm
            rf.persist()
        }
    
        //判断日志完整性
        if lastLogTerm > args.LastLogTerm || (lastLogTerm == args.LastLogTerm && lastLogIndex > args.LastLogIndex) {
            return
        }
    
        rf.votedFor = args.CandidateId
        reply.VoteGranted = true
        rf.changeRole(Role_Follower)
        rf.resetElectionTimer()
        rf.persist()
        DPrintf("%v, role:%v,voteFor: %v", rf.me, rf.role, rf.votedFor)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    主要就是进行如下判断:

    1. 如果currentTerm>term,返回 false;
    2. 如果term = currentTerm,如果该节点为leader,返回false;如果 votedFor 不为空且不为 candidateId,返回false;如果 votedFor 不为空且为 candidateId,返回true;
    3. 如果currentTerm
    4. 判断日志完整性,如果当前节点的日志完整性大于等于候选人日志完整性(lastLogTerm>args.lastLogTerm 或者 lastLogTerm=args.lastLogTerm &&lastLogIndex > args.LastLogIndex),那么就拒绝投票给它;
    5. 否者,就投票给它。

    心跳的实现

    因为我整个第二部分已经完成,所有这部分的部分代码会涉及2B部分,因此只要大概有个思路就行了。

    先看下rpc的参数和回复结构体:

    type AppendEntriesArgs struct {
    	Term         int
    	LeaderId     int
    	PrevLogIndex int
    	PervLogTerm  int
    	Entries      []LogEntry
    	LeaderCommit int
    }
    
    type AppendEntriesReply struct {
    	Term      int
    	Success   bool
    	NextIndex int
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    这个没什么好说的,基本都是根据论文来实现的。

    当向某个节点发送心跳时,会调用sendAppendEntriesToPeer向指定的节点发送rpc:

    func (rf *Raft) sendAppendEntriesToPeer(peerId int) {
        if rf.killed() {
            return
        }
    
        rf.mu.Lock()
        if rf.role != Role_Leader {
            rf.resetAppendEntriesTimer(peerId)
            rf.mu.Unlock()
            return
        }
        DPrintf("%v send append entries to peer %v", rf.me, peerId)
    
        //获取要发送的日志信息,在2A部分可以暂时不用实现
        prevLogIndex, prevLogTerm, logEntries := rf.getAppendLogs(peerId)
        args := AppendEntriesArgs{
            Term:         rf.currentTerm,
            LeaderId:     rf.me,
            PrevLogIndex: prevLogIndex,
            PrevLogTerm:  prevLogTerm,
            Entries:      logEntries,
            LeaderCommit: rf.commitIndex,
        }
        reply := AppendEntriesReply{}
        rf.resetAppendEntriesTimer(peerId)
        rf.mu.Unlock()
    
        //发送rpc
        rf.sendAppendEntries(peerId, &args, &reply)
    
        DPrintf("%v role: %v, send append entries to peer finish,%v,args = %+v,reply = %+v", rf.me, rf.role, peerId, args, reply)
    
        rf.mu.Lock()
        if reply.Term > rf.currentTerm {
            rf.changeRole(Role_Follower)
            rf.currentTerm = reply.Term
            rf.resetElectionTimer()
            rf.persist()
            rf.mu.Unlock()
            return
        }
    
        if rf.role != Role_Leader || rf.currentTerm != args.Term {
            rf.mu.Unlock()
            return
        }
        
        //如果有日志的话,接下来要做的是对日志进行处理,这里暂时可以不用写
    
        rf.mu.Unlock()
        return
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    在2A部分中,当前要做的处理很简单,就是创建一个AppendEntriesArgs,并调用sendAppendEntries函数进行发送,而对于发送的结果我们并不需要管,因此要实现心跳,并不需要日志,因此也就不需要进行判断了。

    看下sendAppendEntries的实现:

    //发送端发送数据
    func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) {
    	rpcTimer := time.NewTimer(RPCTimeout)
    	defer rpcTimer.Stop()
    
    	ch := make(chan bool, 1)
    	go func() {
    		//尝试10次
    		for i := 0; i < 10 && !rf.killed(); i++ {
    			ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
    			if !ok {
    				time.Sleep(time.Millisecond * 10)
    				continue
    			} else {
    				ch <- ok
    				return
    			}
    		}
    	}()
    
    	select {
    	case <-rpcTimer.C:
    		DPrintf("%v role: %v, send append entries to peer %v TIME OUT!!!", rf.me, rf.role, server)
    		return
    	case <-ch:
    		return
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    会尝试10次向节点发送rpc,直到成功或者超过了此次rpc超时时间。

    而每一个节点在收到AppendEntries rpc是要进行相应的处理:

    func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
        rf.mu.Lock()
        DPrintf("%v receive a appendEntries: %+v", rf.me, args)
        reply.Term = rf.currentTerm
        if args.Term < rf.currentTerm {
            rf.mu.Unlock()
            return
        }
        rf.currentTerm = args.Term
        rf.changeRole(Role_Follower)
        rf.resetElectionTimer()
        rf.Success = true
        
        
        rf.persist()
        DPrintf("%v role: %v, get appendentries finish,args = %v,reply = %+v", rf.me, rf.role, *args, *reply)
        rf.mu.Unlock()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    仅仅是心跳的话,接收方实现很简单,只需要判断一下term其实就可以了,并重置选举定时器。

    测试结果

    在这里插入图片描述

  • 相关阅读:
    vue2升vue3报错,warning Replace `xxx` with `“xxx“` prettier/prettier
    蓝桥等考C++组别八级003
    MySQL日志管理、备份与恢复
    Docker 容器监控之CAdvisor+InfluxDB+Granfana
    windbg查看GDT表的基址和长度 段描述符查分实验 段选择子拆分实验
    物联网ARM开发-4STM32串口通信USART应用
    GIT 工具使用
    【leetcode】【剑指offer Ⅱ】019. 最多删除一个字符得到回文
    解决sass问题:npm ERR! node-sass@9.0.0 postinstall: `node scripts/build.js`
    《Effective Objective-C 2.0》读书笔记——对象、消息、运行期
  • 原文地址:https://blog.csdn.net/qq_44766883/article/details/126255117