Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Delete Progress.PendingSnapshot and improve snapshot handling #243

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ type Node interface {
// ReportUnreachable reports the given node is not reachable for the last send.
ReportUnreachable(id uint64)
// ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
// who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
// who is meant to receive the snapshot, the status is SnapshotFinish or SnapshotFailure,
// and appliedSnapshotIndex represents the index of the snapshot that was successfully applied
// on the follower.
// Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
// snapshot (for e.g., while streaming it from leader to follower), should be reported to the
// leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
Expand All @@ -237,7 +239,7 @@ type Node interface {
// updates from the leader. Therefore, it is crucial that the application ensures that any
// failure in snapshot sending is caught and reported back to the leader; so it can resume raft
// log probing in the follower.
ReportSnapshot(id uint64, status SnapshotStatus)
ReportSnapshot(id uint64, status SnapshotStatus, appliedSnapshotIndex uint64)
// Stop performs any necessary termination of the Node.
Stop()
}
Expand Down Expand Up @@ -583,11 +585,16 @@ func (n *node) ReportUnreachable(id uint64) {
}
}

func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
func (n *node) ReportSnapshot(id uint64, status SnapshotStatus, appliedSnapshotIndex uint64) {
rej := status == SnapshotFailure

select {
case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
case n.recvc <- pb.Message{
Type: pb.MsgSnapStatus,
From: id,
Reject: rej,
AppliedSnapshotIndex: appliedSnapshotIndex,
}:
case <-n.done:
}
}
Expand Down
32 changes: 20 additions & 12 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,19 +1514,16 @@ func stepLeader(r *raft, m pb.Message) error {
// for an example of the latter case).
// NB: the same does not make sense for StateSnapshot - if `m.Index`
// equals pr.Match we know we don't m.Index+1 in our log, so moving
// back to replicating state is not useful; besides pr.PendingSnapshot
// would prevent it.
// back to replicating state is not useful.
if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) {
switch {
case pr.State == tracker.StateProbe:
pr.BecomeReplicate()
case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex():
// Note that we don't take into account PendingSnapshot to
// enter this branch. No matter at which index a snapshot
// was actually applied, as long as this allows catching up
// the follower from the log, we will accept it. This gives
// systems more flexibility in how they implement snapshots;
// see the comments on PendingSnapshot.
case pr.State == tracker.StateSnapshot && pr.Next >= r.raftLog.firstIndex():
// Transition from StateSnapshot to StateProbe and then to StateReplicate
// when the follower's `Next` index is greater than or equal to the leader's
// first log index. This ensures that the follower can catch up using the
// leader's log entries, resuming regular replication.
r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
Expand Down Expand Up @@ -1608,12 +1605,23 @@ func stepLeader(r *raft, m pb.Message) error {
return nil
}
if !m.Reject {
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.OnSnapshotApplied(m.AppliedSnapshotIndex)

if pr.Match+1 >= r.raftLog.lastIndex() {
// Check if the follower is fully caught up with the leader's log.
// pr.Match + 1 represents the next expected log index for the follower.
// The leader appends an empty entry when it assumes leadership, which ensures
// that the lastIndex is always at least pr.Match + 1 after a snapshot is applied.
// If the follower is caught up, transition to StateReplicate.
pr.BecomeReplicate()
r.logger.Debugf("%x snapshot succeeded, transitioned to StateReplicate for %x [%s]", r.id, m.From, pr)
} else {
pr.BecomeProbe()
r.logger.Debugf("%x snapshot succeeded, resumed probing for %x [%s]", r.id, m.From, pr)
}
} else {
// NB: the order here matters or we'll be probing erroneously from
// the snapshot index, but the snapshot never applied.
pr.PendingSnapshot = 0
pr.BecomeProbe()
r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
Expand Down
51 changes: 29 additions & 22 deletions raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/raft/v3/tracker"

pb "go.etcd.io/raft/v3/raftpb"
)
Expand All @@ -33,38 +34,42 @@ var (
}
)

func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1))
func TestSnapshotPauseReplication(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeCandidate()
sm.becomeLeader()

// force set the next of node 2, so that
// node 2 needs a snapshot
sm.trk.Progress[2].Next = sm.raftLog.firstIndex()
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress[2].Next - 1, Reject: true})
require.Equal(t, uint64(11), sm.trk.Progress[2].PendingSnapshot)
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
msgs := sm.readMessages()
require.Empty(t, msgs)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)
}

func TestPendingSnapshotPauseReplication(t *testing.T) {
func TestSnapshotFailure(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)

sm.becomeCandidate()
sm.becomeLeader()

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
msgs := sm.readMessages()
require.Empty(t, msgs)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
require.Equal(t, uint64(1), sm.trk.Progress[2].Next)
assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused)
require.Equal(t, tracker.StateProbe, sm.trk.Progress[2].State)
}

func TestSnapshotFailure(t *testing.T) {
func TestSnapshotSucceedToReplicate(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)
Expand All @@ -74,14 +79,15 @@ func TestSnapshotFailure(t *testing.T) {

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
require.Zero(t, sm.trk.Progress[2].PendingSnapshot)
require.Equal(t, uint64(1), sm.trk.Progress[2].Next)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false, AppliedSnapshotIndex: 11})
require.Equal(t, uint64(12), sm.trk.Progress[2].Next)
require.Equal(t, tracker.StateReplicate, sm.trk.Progress[2].State)
assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused)
}

func TestSnapshotSucceed(t *testing.T) {
func TestSnapshotSucceedToProbe(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
sm.restore(testingSnap)
Expand All @@ -91,10 +97,12 @@ func TestSnapshotSucceed(t *testing.T) {

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
require.Zero(t, sm.trk.Progress[2].PendingSnapshot)
require.Equal(t, uint64(12), sm.trk.Progress[2].Next)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false, AppliedSnapshotIndex: 10})

require.Equal(t, uint64(11), sm.trk.Progress[2].Next)
require.Equal(t, tracker.StateProbe, sm.trk.Progress[2].State)
assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused)
}

Expand All @@ -108,15 +116,14 @@ func TestSnapshotAbort(t *testing.T) {

sm.trk.Progress[2].Next = 1
sm.trk.Progress[2].BecomeSnapshot(11)
require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State)

// A successful msgAppResp that has a higher/equal index than the
// pending snapshot should abort the pending snapshot.
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
require.Zero(t, sm.trk.Progress[2].PendingSnapshot)
// The follower entered StateReplicate and the leader send an append
// and optimistically updated the progress (so we see 13 instead of 12).
// There is something to append because the leader appended an empty entry
// to the log at index 12 when it assumed leadership.
require.Equal(t, uint64(13), sm.trk.Progress[2].Next)
require.Equal(t, 1, sm.trk.Progress[2].Inflights.Count())
require.Equal(t, tracker.StateReplicate, sm.trk.Progress[2].State)
}
Loading
Loading