diff --git a/raft.go b/raft.go index 5a150562..8e6174a5 100644 --- a/raft.go +++ b/raft.go @@ -375,6 +375,12 @@ type raft struct { // the leader id lead uint64 + // logSynced is true if this node's log is guaranteed to be a prefix of the + // leader's log at this term. Always true for the leader. Always false for a + // candidate. For a follower, this is true if the last entry term matches the + // leader term, otherwise becomes true when the first MsgApp append from the + // leader succeeds. + logSynced bool // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 @@ -763,6 +769,7 @@ func (r *raft) reset(term uint64) { r.Vote = None } r.lead = None + r.logSynced = false r.electionElapsed = 0 r.heartbeatElapsed = 0 @@ -866,6 +873,10 @@ func (r *raft) becomeFollower(term uint64, lead uint64) { r.reset(term) r.tick = r.tickElection r.lead = lead + // If the last entry term matches the leader term, the log is guaranteed to be + // a prefix of the leader's log. Otherwise, we will establish this guarantee + // later, on the first successful MsgApp. + r.logSynced = r.raftLog.lastTerm() == term r.state = StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term) } @@ -908,6 +919,7 @@ func (r *raft) becomeLeader() { r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id + r.logSynced = true // the leader's log is in sync with itself r.state = StateLeader // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is @@ -1735,6 +1747,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.logSynced = true // from now on, the log is a prefix of the leader's log r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1770,7 +1783,11 @@ func (r *raft) handleAppendEntries(m pb.Message) { } func (r *raft) handleHeartbeat(m pb.Message) { - r.raftLog.commitTo(m.Commit) + // It is only safe to advance the commit index if our log is a prefix of the + // leader's log. Otherwise, entries at this index may mismatch. + if r.logSynced { + r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) + } r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..16e40608 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1332,16 +1332,24 @@ func TestHandleMsgApp(t *testing.T) { func TestHandleHeartbeat(t *testing.T) { commit := uint64(2) tests := []struct { - m pb.Message - wCommit uint64 + m pb.Message + lastTerm uint64 + wCommit uint64 }{ - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1}, - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 2, commit + 1}, + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 2, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 1, commit}, + + // Do not increase the commit index if the log is not guaranteed to be a + // prefix of the leader's log. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 1, commit}, + // Do not increase the commit index beyond our log size. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, 2, commit + 1}, } for i, tt := range tests { storage := newTestMemoryStorage(withPeers(1, 2)) - storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) + storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}}) sm := newTestRaft(1, 5, 1, storage) sm.becomeFollower(2, 2) sm.raftLog.commitTo(commit)