Skip to content

Commit

Permalink
feat: delete Progress.PendingSnapshot and improve snapshot handling
Browse files Browse the repository at this point in the history
Signed-off-by: soma00333 <[email protected]>
  • Loading branch information
soma00333 committed Nov 27, 2024
1 parent d155efc commit 48b6b6d
Show file tree
Hide file tree
Showing 16 changed files with 215 additions and 188 deletions.
15 changes: 11 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ type Node interface {
// ReportUnreachable reports the given node is not reachable for the last send.
ReportUnreachable(id uint64)
// ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
// who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
// who is meant to receive the snapshot, the status is SnapshotFinish or SnapshotFailure,
// and appliedSnapshotIndex represents the index of the snapshot that was successfully applied
// on the follower.
// Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
// snapshot (for e.g., while streaming it from leader to follower), should be reported to the
// leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
Expand All @@ -237,7 +239,7 @@ type Node interface {
// updates from the leader. Therefore, it is crucial that the application ensures that any
// failure in snapshot sending is caught and reported back to the leader; so it can resume raft
// log probing in the follower.
ReportSnapshot(id uint64, status SnapshotStatus)
ReportSnapshot(id uint64, status SnapshotStatus, appliedSnapshotIndex uint64)
// Stop performs any necessary termination of the Node.
Stop()
}
Expand Down Expand Up @@ -583,11 +585,16 @@ func (n *node) ReportUnreachable(id uint64) {
}
}

func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
func (n *node) ReportSnapshot(id uint64, status SnapshotStatus, appliedSnapshotIndex uint64) {
rej := status == SnapshotFailure

select {
case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
case n.recvc <- pb.Message{
Type: pb.MsgSnapStatus,
From: id,
Reject: rej,
AppliedSnapshotIndex: appliedSnapshotIndex,
}:
case <-n.done:
}
}
Expand Down
32 changes: 20 additions & 12 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,19 +1514,16 @@ func stepLeader(r *raft, m pb.Message) error {
// for an example of the latter case).
// NB: the same does not make sense for StateSnapshot - if `m.Index`
// equals pr.Match we know we don't m.Index+1 in our log, so moving
// back to replicating state is not useful; besides pr.PendingSnapshot
// would prevent it.
// back to replicating state is not useful.
if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex():
// Note that we don't take into account PendingSnapshot to
// enter this branch. No matter at which index a snapshot
// was actually applied, as long as this allows catching up
// the follower from the log, we will accept it. This gives
// systems more flexibility in how they implement snapshots;
// see the comments on PendingSnapshot.
case pr.State == tracker.StateSnapshot && pr.Next >= r.raftLog.firstIndex():
// Transition from StateSnapshot to StateProbe and then to StateReplicate
// when the follower's `Next` index is greater than or equal to the leader's
// first log index. This ensures that the follower can catch up using the
// leader's log entries, resuming regular replication.
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
Expand Down Expand Up @@ -1608,12 +1605,23 @@ func stepLeader(r *raft, m pb.Message) error {
return nil
}
if !m.Reject {
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.OnSnapshotApplied(m.AppliedSnapshotIndex)

if pr.Match+1 >= r.raftLog.lastIndex() {
// Check if the follower is fully caught up with the leader's log.
// pr.Match + 1 represents the next expected log index for the follower.
// The leader appends an empty entry when it assumes leadership, which ensures
// that the lastIndex is always at least pr.Match + 1 after a snapshot is applied.
// If the follower is caught up, transition to StateReplicate.
pr.BecomeReplicate()
r.logger.Debugf("%x snapshot succeeded, transitioned to StateReplicate for %x [%s]", r.id, m.From, pr)
} else {
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed probing for %x [%s]", r.id, m.From, pr)
}
} else {
// NB: the order here matters or we'll be probing erroneously from
// the snapshot index, but the snapshot never applied.
pr.PendingSnapshot = 0
pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
Expand Down
51 changes: 29 additions & 22 deletions raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/raft/v3/tracker"

pb "go.etcd.io/raft/v3/raftpb"
)
Expand All @@ -33,38 +34,42 @@ var (
}
)

func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1))
func TestSnapshotPauseReplication(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeCandidate()
sm.becomeLeader()

// force set the next of node 2, so that
// node 2 needs a snapshot
sm.trk.Progress[2].Next = sm.raftLog.firstIndex()
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress[2].Next - 1, Reject: true})
require.Equal(t, uint64(11), sm.trk.Progress[2].PendingSnapshot)
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
msgs := sm.readMessages()
require.Empty(t, msgs)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)
}

func TestPendingSnapshotPauseReplication(t *testing.T) {
func TestSnapshotFailure(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeCandidate()
sm.becomeLeader()

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
msgs := sm.readMessages()
require.Empty(t, msgs)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
require.Equal(t, uint64(1), sm.trk.Progress[2].Next)
assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused)
require.Equal(t, tracker.StateProbe, sm.trk.Progress[2].State)
}

func TestSnapshotFailure(t *testing.T) {
func TestSnapshotSucceedToReplicate(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)
Expand All @@ -74,14 +79,15 @@ func TestSnapshotFailure(t *testing.T) {

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
require.Zero(t, sm.trk.Progress[2].PendingSnapshot)
require.Equal(t, uint64(1), sm.trk.Progress[2].Next)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false, AppliedSnapshotIndex: 11})
require.Equal(t, uint64(12), sm.trk.Progress[2].Next)
require.Equal(t, tracker.StateReplicate, sm.trk.Progress[2].State)
assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused)
}

func TestSnapshotSucceed(t *testing.T) {
func TestSnapshotSucceedToProbe(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)
Expand All @@ -91,10 +97,12 @@ func TestSnapshotSucceed(t *testing.T) {

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
require.Zero(t, sm.trk.Progress[2].PendingSnapshot)
require.Equal(t, uint64(12), sm.trk.Progress[2].Next)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false, AppliedSnapshotIndex: 10})

require.Equal(t, uint64(11), sm.trk.Progress[2].Next)
require.Equal(t, tracker.StateProbe, sm.trk.Progress[2].State)
assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused)
}

Expand All @@ -108,15 +116,14 @@ func TestSnapshotAbort(t *testing.T) {

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

// A successful msgAppResp that has a higher/equal index than the
// pending snapshot should abort the pending snapshot.
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
require.Zero(t, sm.trk.Progress[2].PendingSnapshot)
// The follower entered StateReplicate and the leader send an append
// and optimistically updated the progress (so we see 13 instead of 12).
// There is something to append because the leader appended an empty entry
// to the log at index 12 when it assumed leadership.
require.Equal(t, uint64(13), sm.trk.Progress[2].Next)
require.Equal(t, 1, sm.trk.Progress[2].Inflights.Count())
require.Equal(t, tracker.StateReplicate, sm.trk.Progress[2].State)
}
Loading

0 comments on commit 48b6b6d

Please sign in to comment.