diff --git a/node.go b/node.go index e2a261cd..77aed4e7 100644 --- a/node.go +++ b/node.go @@ -228,7 +228,9 @@ type Node interface { // ReportUnreachable reports the given node is not reachable for the last send. ReportUnreachable(id uint64) // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower - // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure. + // who is meant to receive the snapshot, the status is SnapshotFinish or SnapshotFailure, + // and appliedSnapshotIndex represents the index of the snapshot that was successfully applied + // on the follower. // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a // snapshot (for e.g., while streaming it from leader to follower), should be reported to the // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft @@ -237,7 +239,7 @@ type Node interface { // updates from the leader. Therefore, it is crucial that the application ensures that any // failure in snapshot sending is caught and reported back to the leader; so it can resume raft // log probing in the follower. - ReportSnapshot(id uint64, status SnapshotStatus) + ReportSnapshot(id uint64, status SnapshotStatus, appliedSnapshotIndex uint64) // Stop performs any necessary termination of the Node. Stop() } @@ -583,11 +585,16 @@ func (n *node) ReportUnreachable(id uint64) { } } -func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) { +func (n *node) ReportSnapshot(id uint64, status SnapshotStatus, appliedSnapshotIndex uint64) { rej := status == SnapshotFailure select { - case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}: + case n.recvc <- pb.Message{ + Type: pb.MsgSnapStatus, + From: id, + Reject: rej, + AppliedSnapshotIndex: appliedSnapshotIndex, + }: case <-n.done: } } diff --git a/raft.go b/raft.go index 94c2363d..31bfab76 100644 --- a/raft.go +++ b/raft.go @@ -1514,19 +1514,16 @@ func stepLeader(r *raft, m pb.Message) error { // for an example of the latter case). // NB: the same does not make sense for StateSnapshot - if `m.Index` // equals pr.Match we know we don't m.Index+1 in our log, so moving - // back to replicating state is not useful; besides pr.PendingSnapshot - // would prevent it. + // back to replicating state is not useful. if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) { switch { case pr.State == tracker.StateProbe: pr.BecomeReplicate() - case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex(): - // Note that we don't take into account PendingSnapshot to - // enter this branch. No matter at which index a snapshot - // was actually applied, as long as this allows catching up - // the follower from the log, we will accept it. This gives - // systems more flexibility in how they implement snapshots; - // see the comments on PendingSnapshot. + case pr.State == tracker.StateSnapshot && pr.Next >= r.raftLog.firstIndex(): + // Transition from StateSnapshot to StateProbe and then to StateReplicate + // when the follower's `Next` index is greater than or equal to the leader's + // first log index. This ensures that the follower can catch up using the + // leader's log entries, resuming regular replication. r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // Transition back to replicating state via probing state // (which takes the snapshot into account). If we didn't @@ -1608,12 +1605,23 @@ func stepLeader(r *raft, m pb.Message) error { return nil } if !m.Reject { - pr.BecomeProbe() - r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + pr.OnSnapshotApplied(m.AppliedSnapshotIndex) + + if pr.Match+1 >= r.raftLog.lastIndex() { + // Check if the follower is fully caught up with the leader's log. + // pr.Match + 1 represents the next expected log index for the follower. + // The leader appends an empty entry when it assumes leadership, which ensures + // that the lastIndex is always at least pr.Match + 1 after a snapshot is applied. + // If the follower is caught up, transition to StateReplicate. + pr.BecomeReplicate() + r.logger.Debugf("%x snapshot succeeded, transitioned to StateReplicate for %x [%s]", r.id, m.From, pr) + } else { + pr.BecomeProbe() + r.logger.Debugf("%x snapshot succeeded, resumed probing for %x [%s]", r.id, m.From, pr) + } } else { // NB: the order here matters or we'll be probing erroneously from // the snapshot index, but the snapshot never applied. - pr.PendingSnapshot = 0 pr.BecomeProbe() r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } diff --git a/raft_snap_test.go b/raft_snap_test.go index 1c54b5b8..1bd7bad4 100644 --- a/raft_snap_test.go +++ b/raft_snap_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.etcd.io/raft/v3/tracker" pb "go.etcd.io/raft/v3/raftpb" ) @@ -33,23 +34,24 @@ var ( } ) -func TestSendingSnapshotSetPendingSnapshot(t *testing.T) { - storage := newTestMemoryStorage(withPeers(1)) +func TestSnapshotPauseReplication(t *testing.T) { + storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) sm.restore(testingSnap) sm.becomeCandidate() sm.becomeLeader() - // force set the next of node 2, so that - // node 2 needs a snapshot - sm.trk.Progress[2].Next = sm.raftLog.firstIndex() + sm.trk.Progress[2].BecomeSnapshot(11) + require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State) - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.trk.Progress[2].Next - 1, Reject: true}) - require.Equal(t, uint64(11), sm.trk.Progress[2].PendingSnapshot) + sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) + msgs := sm.readMessages() + require.Empty(t, msgs) + require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State) } -func TestPendingSnapshotPauseReplication(t *testing.T) { +func TestSnapshotFailure(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) sm.restore(testingSnap) @@ -57,14 +59,17 @@ func TestPendingSnapshotPauseReplication(t *testing.T) { sm.becomeCandidate() sm.becomeLeader() + sm.trk.Progress[2].Next = 1 sm.trk.Progress[2].BecomeSnapshot(11) + require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State) - sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}}) - msgs := sm.readMessages() - require.Empty(t, msgs) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) + require.Equal(t, uint64(1), sm.trk.Progress[2].Next) + assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused) + require.Equal(t, tracker.StateProbe, sm.trk.Progress[2].State) } -func TestSnapshotFailure(t *testing.T) { +func TestSnapshotSucceedToReplicate(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) sm.restore(testingSnap) @@ -74,14 +79,15 @@ func TestSnapshotFailure(t *testing.T) { sm.trk.Progress[2].Next = 1 sm.trk.Progress[2].BecomeSnapshot(11) + require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State) - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true}) - require.Zero(t, sm.trk.Progress[2].PendingSnapshot) - require.Equal(t, uint64(1), sm.trk.Progress[2].Next) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false, AppliedSnapshotIndex: 11}) + require.Equal(t, uint64(12), sm.trk.Progress[2].Next) + require.Equal(t, tracker.StateReplicate, sm.trk.Progress[2].State) assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused) } -func TestSnapshotSucceed(t *testing.T) { +func TestSnapshotSucceedToProbe(t *testing.T) { storage := newTestMemoryStorage(withPeers(1, 2)) sm := newTestRaft(1, 10, 1, storage) sm.restore(testingSnap) @@ -91,10 +97,12 @@ func TestSnapshotSucceed(t *testing.T) { sm.trk.Progress[2].Next = 1 sm.trk.Progress[2].BecomeSnapshot(11) + require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State) - sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false}) - require.Zero(t, sm.trk.Progress[2].PendingSnapshot) - require.Equal(t, uint64(12), sm.trk.Progress[2].Next) + sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false, AppliedSnapshotIndex: 10}) + + require.Equal(t, uint64(11), sm.trk.Progress[2].Next) + require.Equal(t, tracker.StateProbe, sm.trk.Progress[2].State) assert.True(t, sm.trk.Progress[2].MsgAppFlowPaused) } @@ -108,15 +116,14 @@ func TestSnapshotAbort(t *testing.T) { sm.trk.Progress[2].Next = 1 sm.trk.Progress[2].BecomeSnapshot(11) + require.Equal(t, tracker.StateSnapshot, sm.trk.Progress[2].State) - // A successful msgAppResp that has a higher/equal index than the - // pending snapshot should abort the pending snapshot. sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11}) - require.Zero(t, sm.trk.Progress[2].PendingSnapshot) // The follower entered StateReplicate and the leader send an append // and optimistically updated the progress (so we see 13 instead of 12). // There is something to append because the leader appended an empty entry // to the log at index 12 when it assumed leadership. require.Equal(t, uint64(13), sm.trk.Progress[2].Next) require.Equal(t, 1, sm.trk.Progress[2].Inflights.Count()) + require.Equal(t, tracker.StateReplicate, sm.trk.Progress[2].State) } diff --git a/raftpb/raft.pb.go b/raftpb/raft.pb.go index 7dcdef0c..47e9b6af 100644 --- a/raftpb/raft.pb.go +++ b/raftpb/raft.pb.go @@ -432,6 +432,8 @@ type Message struct { // to respond and who to respond to when the work associated with a message // is complete. Populated for MsgStorageAppend and MsgStorageApply messages. Responses []Message `protobuf:"bytes,14,rep,name=responses" json:"responses"` + // applied_snapshot_index is used for MsgSnapStatus to report back the applied snapshot index. + AppliedSnapshotIndex uint64 `protobuf:"varint,15,opt,name=applied_snapshot_index,json=appliedSnapshotIndex" json:"applied_snapshot_index"` } func (m *Message) Reset() { *m = Message{} } @@ -729,76 +731,78 @@ func init() { func init() { proto.RegisterFile("raft.proto", fileDescriptor_b042552c306ae59b) } var fileDescriptor_b042552c306ae59b = []byte{ - // 1102 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcb, 0x6e, 0x23, 0x45, - 0x14, 0xed, 0x6e, 0x77, 0xfc, 0xb8, 0x76, 0x9c, 0x4a, 0xc5, 0x33, 0xd3, 0x8a, 0x22, 0x8f, 0xf1, - 0x0c, 0x1a, 0x2b, 0x68, 0x02, 0x32, 0x12, 0x42, 0xec, 0xf2, 0x18, 0x94, 0xa0, 0x38, 0x0c, 0x4e, - 0x26, 0x0b, 0x24, 0x14, 0x55, 0xdc, 0x95, 0x4e, 0x83, 0x5d, 0xd5, 0xaa, 0x2e, 0x87, 0x64, 0x83, - 0x10, 0x5f, 0xc0, 0x92, 0x0d, 0x5b, 0x3e, 0x80, 0x8f, 0x40, 0x59, 0x66, 0xc9, 0x6a, 0xc4, 0x24, - 0x7f, 0xc0, 0x17, 0xa0, 0xaa, 0xae, 0x7e, 0xd8, 0x89, 0x66, 0xc1, 0xae, 0xea, 0xdc, 0x53, 0xf7, - 0x9e, 0x7b, 0x6e, 0x57, 0x35, 0x80, 0x20, 0x67, 0x72, 0x23, 0x12, 0x5c, 0x72, 0x5c, 0x56, 0xeb, - 0xe8, 0x74, 0xb5, 0x15, 0xf0, 0x80, 0x6b, 0xe8, 0x63, 0xb5, 0x4a, 0xa2, 0xdd, 0x9f, 0x60, 0xe1, - 0x15, 0x93, 0xe2, 0x0a, 0x7b, 0xe0, 0x1e, 0x51, 0x31, 0xf1, 0x9c, 0x8e, 0xdd, 0x73, 0xb7, 0xdc, - 0xeb, 0xb7, 0x4f, 0xad, 0xa1, 0x46, 0xf0, 0x2a, 0x2c, 0xec, 0x31, 0x9f, 0x5e, 0x7a, 0xa5, 0x42, - 0x28, 0x81, 0xf0, 0x47, 0xe0, 0x1e, 0x5d, 0x45, 0xd4, 0xb3, 0x3b, 0x76, 0xaf, 0xd9, 0x5f, 0xde, - 0x48, 0x6a, 0x6d, 0xe8, 0x94, 0x2a, 0x90, 0x25, 0xba, 0x8a, 0x28, 0xc6, 0xe0, 0xee, 0x10, 0x49, - 0x3c, 0xb7, 0x63, 0xf7, 0x1a, 0x43, 0xbd, 0xee, 0xfe, 0x6c, 0x03, 0x3a, 0x64, 0x24, 0x8a, 0xcf, - 0xb9, 0x1c, 0x50, 0x49, 0x7c, 0x22, 0x09, 0xfe, 0x0c, 0x60, 0xc4, 0xd9, 0xd9, 0x49, 0x2c, 0x89, - 0x4c, 0x72, 0xd7, 0xf3, 0xdc, 0xdb, 0x9c, 0x9d, 0x1d, 0xaa, 0x80, 0xc9, 0x5d, 0x1b, 0xa5, 0x80, - 0x52, 0x1a, 0x6a, 0xa5, 0xc5, 0x26, 0x12, 0x48, 0xf5, 0x27, 0x55, 0x7f, 0xc5, 0x26, 0x34, 0xd2, - 0xfd, 0x16, 0xaa, 0xa9, 0x02, 0x25, 0x51, 0x29, 0xd0, 0x35, 0x1b, 0x43, 0xbd, 0xc6, 0x5f, 0x40, - 0x75, 0x62, 0x94, 0xe9, 0xc4, 0xf5, 0xbe, 0x97, 0x6a, 0x99, 0x57, 0x6e, 0xf2, 0x66, 0xfc, 0xee, - 0xbf, 0x25, 0xa8, 0x0c, 0x68, 0x1c, 0x93, 0x80, 0xe2, 0x97, 0xe0, 0xca, 0xdc, 0xab, 0x95, 0x34, - 0x87, 0x09, 0x17, 0xdd, 0x52, 0x34, 0xdc, 0x02, 0x47, 0xf2, 0x99, 0x4e, 0x1c, 0xc9, 0x55, 0x1b, - 0x67, 0x82, 0xcf, 0xb5, 0xa1, 0x90, 0xac, 0x41, 0x77, 0xbe, 0x41, 0xdc, 0x86, 0xca, 0x98, 0x07, - 0x7a, 0xba, 0x0b, 0x85, 0x60, 0x0a, 0xe6, 0xb6, 0x95, 0xef, 0xdb, 0xf6, 0x12, 0x2a, 0x94, 0x49, - 0x11, 0xd2, 0xd8, 0xab, 0x74, 0x4a, 0xbd, 0x7a, 0x7f, 0x71, 0x66, 0xc6, 0x69, 0x2a, 0xc3, 0xc1, - 0x6b, 0x50, 0x1e, 0xf1, 0xc9, 0x24, 0x94, 0x5e, 0xb5, 0x90, 0xcb, 0x60, 0x4a, 0xe2, 0x05, 0x97, - 0xd4, 0x5b, 0x2c, 0x4a, 0x54, 0x08, 0xee, 0x43, 0x35, 0x36, 0x5e, 0x7a, 0x35, 0xed, 0x31, 0x9a, - 0xf7, 0x58, 0xf3, 0xed, 0x61, 0xc6, 0x53, 0xb5, 0x04, 0xfd, 0x9e, 0x8e, 0xa4, 0x07, 0x1d, 0xbb, - 0x57, 0x4d, 0x6b, 0x25, 0x18, 0x7e, 0x0e, 0x90, 0xac, 0x76, 0x43, 0x26, 0xbd, 0x7a, 0xa1, 0x62, - 0x01, 0x57, 0xd6, 0x8c, 0x38, 0x93, 0xf4, 0x52, 0x7a, 0x0d, 0x35, 0x72, 0x53, 0x24, 0x05, 0xf1, - 0xa7, 0x50, 0x13, 0x34, 0x8e, 0x38, 0x8b, 0x69, 0xec, 0x35, 0xb5, 0x01, 0x4b, 0x73, 0x83, 0x4b, - 0x3f, 0xc3, 0x8c, 0xd7, 0xfd, 0x0e, 0x6a, 0xbb, 0x44, 0xf8, 0xc9, 0x37, 0x99, 0x8e, 0xc5, 0xbe, - 0x37, 0x96, 0xd4, 0x0d, 0xe7, 0x9e, 0x1b, 0xb9, 0x8b, 0xa5, 0xfb, 0x2e, 0x76, 0xff, 0xb4, 0xa1, - 0x96, 0x5d, 0x02, 0xfc, 0x18, 0xca, 0xea, 0x8c, 0x88, 0x3d, 0xbb, 0x53, 0xea, 0xb9, 0x43, 0xb3, - 0xc3, 0xab, 0x50, 0x1d, 0x53, 0x22, 0x98, 0x8a, 0x38, 0x3a, 0x92, 0xed, 0xf1, 0x0b, 0x58, 0x4a, - 0x58, 0x27, 0x7c, 0x2a, 0x03, 0x1e, 0xb2, 0xc0, 0x2b, 0x69, 0x4a, 0x33, 0x81, 0xbf, 0x36, 0x28, - 0x7e, 0x06, 0x8b, 0xe9, 0xa1, 0x13, 0xa6, 0x4c, 0x72, 0x35, 0xad, 0x91, 0x82, 0x07, 0xca, 0xa3, - 0x67, 0x00, 0x64, 0x2a, 0xf9, 0xc9, 0x98, 0x92, 0x0b, 0xaa, 0xbf, 0xb0, 0x74, 0x16, 0x35, 0x85, - 0xef, 0x2b, 0xb8, 0xfb, 0xbb, 0x0d, 0xa0, 0x44, 0x6f, 0x9f, 0x13, 0x16, 0x50, 0xfc, 0x89, 0xb9, - 0x0b, 0x8e, 0xbe, 0x0b, 0x8f, 0x8b, 0x77, 0x3b, 0x61, 0xdc, 0xbb, 0x0e, 0x2f, 0xa0, 0xc2, 0xb8, - 0x4f, 0x4f, 0x42, 0xdf, 0x98, 0xd2, 0x54, 0xc1, 0xdb, 0xb7, 0x4f, 0xcb, 0x07, 0xdc, 0xa7, 0x7b, - 0x3b, 0xc3, 0xb2, 0x0a, 0xef, 0xf9, 0xd8, 0xcb, 0x47, 0x9a, 0x3c, 0x34, 0xd9, 0x30, 0x57, 0xc1, - 0x09, 0x7d, 0x33, 0x08, 0x30, 0xa7, 0x9d, 0xbd, 0x9d, 0xa1, 0x13, 0xfa, 0xdd, 0x09, 0xa0, 0xbc, - 0xf8, 0x61, 0xc8, 0x82, 0x71, 0x2e, 0xd2, 0xfe, 0x3f, 0x22, 0x9d, 0xf7, 0x89, 0xec, 0xfe, 0x61, - 0x43, 0x23, 0xcf, 0x73, 0xdc, 0xc7, 0x5b, 0x00, 0x52, 0x10, 0x16, 0x87, 0x32, 0xe4, 0xcc, 0x54, - 0x5c, 0x7b, 0xa0, 0x62, 0xc6, 0x49, 0x3f, 0xe6, 0xfc, 0x14, 0xfe, 0x1c, 0x2a, 0x23, 0xcd, 0x4a, - 0x26, 0x5e, 0x78, 0xa7, 0xe6, 0x5b, 0x4b, 0xaf, 0xad, 0xa1, 0x17, 0x3d, 0x2b, 0xcd, 0x78, 0xb6, - 0xbe, 0x0b, 0xb5, 0xec, 0x31, 0xc7, 0x4b, 0x50, 0xd7, 0x9b, 0x03, 0x2e, 0x26, 0x64, 0x8c, 0x2c, - 0xbc, 0x02, 0x4b, 0x1a, 0xc8, 0xf3, 0x23, 0x1b, 0x3f, 0x82, 0xe5, 0x39, 0xf0, 0xb8, 0x8f, 0x9c, - 0xf5, 0xbf, 0x4a, 0x50, 0x2f, 0xbc, 0x75, 0x18, 0xa0, 0x3c, 0x88, 0x83, 0xdd, 0x69, 0x84, 0x2c, - 0x5c, 0x87, 0xca, 0x20, 0x0e, 0xb6, 0x28, 0x91, 0xc8, 0x36, 0x9b, 0xd7, 0x82, 0x47, 0xc8, 0x31, - 0xac, 0xcd, 0x28, 0x42, 0x25, 0xdc, 0x04, 0x48, 0xd6, 0x43, 0x1a, 0x47, 0xc8, 0x35, 0xc4, 0x63, - 0x2e, 0x29, 0x5a, 0x50, 0xda, 0xcc, 0x46, 0x47, 0xcb, 0x26, 0xaa, 0x5e, 0x0f, 0x54, 0xc1, 0x08, - 0x1a, 0xaa, 0x18, 0x25, 0x42, 0x9e, 0xaa, 0x2a, 0x55, 0xdc, 0x02, 0x54, 0x44, 0xf4, 0xa1, 0x1a, - 0xc6, 0xd0, 0x1c, 0xc4, 0xc1, 0x1b, 0x26, 0x28, 0x19, 0x9d, 0x93, 0xd3, 0x31, 0x45, 0x80, 0x97, - 0x61, 0xd1, 0x24, 0x52, 0x37, 0x6e, 0x1a, 0xa3, 0xba, 0xa1, 0x6d, 0x9f, 0xd3, 0xd1, 0x0f, 0xdf, - 0x4c, 0xb9, 0x98, 0x4e, 0x50, 0x43, 0xb5, 0x3d, 0x88, 0x03, 0x3d, 0xa0, 0x33, 0x2a, 0xf6, 0x29, - 0xf1, 0xa9, 0x40, 0x8b, 0xe6, 0xf4, 0x51, 0x38, 0xa1, 0x7c, 0x2a, 0x0f, 0xf8, 0x8f, 0xa8, 0x69, - 0xc4, 0x0c, 0x29, 0xf1, 0xf5, 0x4f, 0x14, 0x2d, 0x19, 0x31, 0x19, 0xa2, 0xc5, 0x20, 0xd3, 0xef, - 0x6b, 0x41, 0x75, 0x8b, 0xcb, 0xa6, 0xaa, 0xd9, 0x6b, 0x0e, 0x36, 0x27, 0x0f, 0x25, 0x17, 0x24, - 0xa0, 0x9b, 0x51, 0x44, 0x99, 0x8f, 0x56, 0xb0, 0x07, 0xad, 0x79, 0x54, 0xf3, 0x5b, 0x6a, 0x62, - 0x33, 0x91, 0xf1, 0x15, 0x7a, 0x84, 0x9f, 0xc0, 0xca, 0x1c, 0xa8, 0xd9, 0x8f, 0x0d, 0xfb, 0x4b, - 0x2e, 0x02, 0x2a, 0x4d, 0x47, 0x4f, 0xd6, 0x7f, 0xb1, 0xa1, 0xf5, 0xd0, 0x17, 0x89, 0xd7, 0xc0, - 0x7b, 0x08, 0xdf, 0x9c, 0x4a, 0x8e, 0x2c, 0xfc, 0x21, 0x7c, 0xf0, 0x50, 0xf4, 0x2b, 0x1e, 0x32, - 0xb9, 0x37, 0x89, 0xc6, 0xe1, 0x28, 0x54, 0xd3, 0x7f, 0x1f, 0xed, 0xd5, 0xa5, 0xa1, 0x39, 0xeb, - 0x57, 0xd0, 0x9c, 0xbd, 0x87, 0xca, 0xff, 0x1c, 0xd9, 0xf4, 0x7d, 0x75, 0xe3, 0x90, 0xa5, 0xac, - 0xc8, 0xe1, 0x21, 0x9d, 0xf0, 0x0b, 0xaa, 0x23, 0xf6, 0x6c, 0xe4, 0x4d, 0xe4, 0x13, 0x99, 0x44, - 0x9c, 0xd9, 0x46, 0x36, 0x7d, 0x7f, 0x3f, 0x79, 0xee, 0x74, 0xb4, 0xb4, 0xf5, 0xfc, 0xfa, 0x5d, - 0xdb, 0xba, 0x79, 0xd7, 0xb6, 0xae, 0x6f, 0xdb, 0xf6, 0xcd, 0x6d, 0xdb, 0xfe, 0xe7, 0xb6, 0x6d, - 0xff, 0x7a, 0xd7, 0xb6, 0x7e, 0xbb, 0x6b, 0x5b, 0x37, 0x77, 0x6d, 0xeb, 0xef, 0xbb, 0xb6, 0xf5, - 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x67, 0x2b, 0x47, 0x0c, 0x83, 0x09, 0x00, 0x00, + // 1125 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4d, 0x6f, 0x23, 0x45, + 0x10, 0x9d, 0x19, 0x4f, 0xfc, 0x51, 0x76, 0xec, 0x4e, 0xc7, 0x9b, 0x1d, 0x45, 0x91, 0xd7, 0x78, + 0x17, 0xad, 0x15, 0xb4, 0x01, 0x19, 0x09, 0xa1, 0xbd, 0xe5, 0x63, 0x51, 0x82, 0xe2, 0xb0, 0x38, + 0xd9, 0x1c, 0x90, 0x90, 0xd5, 0xf1, 0x74, 0x26, 0x03, 0xf6, 0xf4, 0xa8, 0xa7, 0x1d, 0x92, 0x0b, + 0x42, 0xfc, 0x02, 0x8e, 0x5c, 0xb8, 0x72, 0xe4, 0xc0, 0x8f, 0x40, 0x39, 0xe6, 0xc8, 0x69, 0xc5, + 0x26, 0x7f, 0x04, 0x75, 0x4f, 0xcf, 0x87, 0xed, 0x68, 0x0f, 0x7b, 0x9b, 0x7e, 0xf5, 0xba, 0xea, + 0xd5, 0xab, 0xee, 0x1e, 0x00, 0x4e, 0xce, 0xc5, 0x56, 0xc8, 0x99, 0x60, 0xb8, 0x28, 0xbf, 0xc3, + 0xb3, 0xf5, 0xa6, 0xc7, 0x3c, 0xa6, 0xa0, 0x4f, 0xe5, 0x57, 0x1c, 0xed, 0xfc, 0x0c, 0x4b, 0xaf, + 0x02, 0xc1, 0xaf, 0xb1, 0x03, 0xf6, 0x09, 0xe5, 0x13, 0xc7, 0x6a, 0x9b, 0x5d, 0x7b, 0xc7, 0xbe, + 0x79, 0xfb, 0xc4, 0x18, 0x28, 0x04, 0xaf, 0xc3, 0xd2, 0x41, 0xe0, 0xd2, 0x2b, 0xa7, 0x90, 0x0b, + 0xc5, 0x10, 0xfe, 0x04, 0xec, 0x93, 0xeb, 0x90, 0x3a, 0x66, 0xdb, 0xec, 0xd6, 0x7b, 0x2b, 0x5b, + 0x71, 0xad, 0x2d, 0x95, 0x52, 0x06, 0xd2, 0x44, 0xd7, 0x21, 0xc5, 0x18, 0xec, 0x3d, 0x22, 0x88, + 0x63, 0xb7, 0xcd, 0x6e, 0x6d, 0xa0, 0xbe, 0x3b, 0xbf, 0x98, 0x80, 0x8e, 0x03, 0x12, 0x46, 0x17, + 0x4c, 0xf4, 0xa9, 0x20, 0x2e, 0x11, 0x04, 0x7f, 0x01, 0x30, 0x62, 0xc1, 0xf9, 0x30, 0x12, 0x44, + 0xc4, 0xb9, 0xab, 0x59, 0xee, 0x5d, 0x16, 0x9c, 0x1f, 0xcb, 0x80, 0xce, 0x5d, 0x19, 0x25, 0x80, + 0x54, 0xea, 0x2b, 0xa5, 0xf9, 0x26, 0x62, 0x48, 0xf6, 0x27, 0x64, 0x7f, 0xf9, 0x26, 0x14, 0xd2, + 0xf9, 0x0e, 0xca, 0x89, 0x02, 0x29, 0x51, 0x2a, 0x50, 0x35, 0x6b, 0x03, 0xf5, 0x8d, 0x5f, 0x42, + 0x79, 0xa2, 0x95, 0xa9, 0xc4, 0xd5, 0x9e, 0x93, 0x68, 0x99, 0x57, 0xae, 0xf3, 0xa6, 0xfc, 0xce, + 0x5f, 0x36, 0x94, 0xfa, 0x34, 0x8a, 0x88, 0x47, 0xf1, 0x0b, 0xb0, 0x45, 0xe6, 0xd5, 0x6a, 0x92, + 0x43, 0x87, 0xf3, 0x6e, 0x49, 0x1a, 0x6e, 0x82, 0x25, 0xd8, 0x4c, 0x27, 0x96, 0x60, 0xb2, 0x8d, + 0x73, 0xce, 0xe6, 0xda, 0x90, 0x48, 0xda, 0xa0, 0x3d, 0xdf, 0x20, 0x6e, 0x41, 0x69, 0xcc, 0x3c, + 0x35, 0xdd, 0xa5, 0x5c, 0x30, 0x01, 0x33, 0xdb, 0x8a, 0x8b, 0xb6, 0xbd, 0x80, 0x12, 0x0d, 0x04, + 0xf7, 0x69, 0xe4, 0x94, 0xda, 0x85, 0x6e, 0xb5, 0xb7, 0x3c, 0x33, 0xe3, 0x24, 0x95, 0xe6, 0xe0, + 0x0d, 0x28, 0x8e, 0xd8, 0x64, 0xe2, 0x0b, 0xa7, 0x9c, 0xcb, 0xa5, 0x31, 0x29, 0xf1, 0x92, 0x09, + 0xea, 0x2c, 0xe7, 0x25, 0x4a, 0x04, 0xf7, 0xa0, 0x1c, 0x69, 0x2f, 0x9d, 0x8a, 0xf2, 0x18, 0xcd, + 0x7b, 0xac, 0xf8, 0xe6, 0x20, 0xe5, 0xc9, 0x5a, 0x9c, 0xfe, 0x40, 0x47, 0xc2, 0x81, 0xb6, 0xd9, + 0x2d, 0x27, 0xb5, 0x62, 0x0c, 0x3f, 0x03, 0x88, 0xbf, 0xf6, 0xfd, 0x40, 0x38, 0xd5, 0x5c, 0xc5, + 0x1c, 0x2e, 0xad, 0x19, 0xb1, 0x40, 0xd0, 0x2b, 0xe1, 0xd4, 0xe4, 0xc8, 0x75, 0x91, 0x04, 0xc4, + 0x9f, 0x43, 0x85, 0xd3, 0x28, 0x64, 0x41, 0x44, 0x23, 0xa7, 0xae, 0x0c, 0x68, 0xcc, 0x0d, 0x2e, + 0x39, 0x86, 0x29, 0x0f, 0xbf, 0x84, 0x35, 0x12, 0x86, 0x63, 0x9f, 0xba, 0xc3, 0x44, 0xec, 0x30, + 0x36, 0xb8, 0x91, 0x93, 0xd1, 0xd4, 0x9c, 0xa4, 0x3f, 0x75, 0xa1, 0x3a, 0xdf, 0x43, 0x65, 0x9f, + 0x70, 0x37, 0x3e, 0xcf, 0xc9, 0x48, 0xcd, 0x85, 0x91, 0x26, 0x4e, 0x5a, 0x0b, 0x4e, 0x66, 0x13, + 0x28, 0x2c, 0x4e, 0xa0, 0xf3, 0xb7, 0x09, 0x95, 0xf4, 0x02, 0xe1, 0x35, 0x28, 0xca, 0x3d, 0x3c, + 0x72, 0xcc, 0x76, 0xa1, 0x6b, 0x0f, 0xf4, 0x0a, 0xaf, 0x43, 0x79, 0x4c, 0x09, 0x0f, 0x64, 0xc4, + 0x52, 0x91, 0x74, 0x8d, 0x9f, 0x43, 0x23, 0x66, 0x0d, 0xd9, 0x54, 0x78, 0xcc, 0x0f, 0x3c, 0xa7, + 0xa0, 0x28, 0xf5, 0x18, 0xfe, 0x46, 0xa3, 0xf8, 0x29, 0x2c, 0x27, 0x9b, 0x86, 0x81, 0x34, 0xd8, + 0x56, 0xb4, 0x5a, 0x02, 0x1e, 0x49, 0x7f, 0x9f, 0x02, 0x90, 0xa9, 0x60, 0xc3, 0x31, 0x25, 0x97, + 0x54, 0x9d, 0xce, 0x64, 0x8e, 0x15, 0x89, 0x1f, 0x4a, 0xb8, 0xf3, 0x87, 0x09, 0x20, 0x45, 0xef, + 0x5e, 0x90, 0xc0, 0xa3, 0xf8, 0x33, 0x7d, 0x8f, 0x2c, 0x75, 0x8f, 0xd6, 0xf2, 0xef, 0x42, 0xcc, + 0x58, 0xb8, 0x4a, 0xcf, 0xa1, 0x14, 0x30, 0x97, 0x0e, 0x7d, 0x57, 0x9b, 0x52, 0x97, 0xc1, 0xbb, + 0xb7, 0x4f, 0x8a, 0x47, 0xcc, 0xa5, 0x07, 0x7b, 0x83, 0xa2, 0x0c, 0x1f, 0xb8, 0xd8, 0xc9, 0x8e, + 0x43, 0xfc, 0x48, 0xa5, 0x07, 0x61, 0x1d, 0x2c, 0xdf, 0xd5, 0x83, 0x00, 0xbd, 0xdb, 0x3a, 0xd8, + 0x1b, 0x58, 0xbe, 0xdb, 0x99, 0x00, 0xca, 0x8a, 0x1f, 0xfb, 0x81, 0x37, 0xce, 0x44, 0x9a, 0x1f, + 0x22, 0xd2, 0x7a, 0x9f, 0xc8, 0xce, 0x9f, 0x26, 0xd4, 0xb2, 0x3c, 0xa7, 0x3d, 0xbc, 0x03, 0x20, + 0x38, 0x09, 0x22, 0x5f, 0xf8, 0x2c, 0xd0, 0x15, 0x37, 0x1e, 0xa8, 0x98, 0x72, 0x92, 0x8b, 0x90, + 0xed, 0xc2, 0x5f, 0x42, 0x69, 0xa4, 0x58, 0xf1, 0xc4, 0x73, 0x6f, 0xdc, 0x7c, 0x6b, 0xc9, 0x95, + 0xd7, 0xf4, 0xbc, 0x67, 0x85, 0x19, 0xcf, 0x36, 0xf7, 0xa1, 0x92, 0xfe, 0x08, 0x70, 0x03, 0xaa, + 0x6a, 0x71, 0xc4, 0xf8, 0x84, 0x8c, 0x91, 0x81, 0x57, 0xa1, 0xa1, 0x80, 0x2c, 0x3f, 0x32, 0xf1, + 0x23, 0x58, 0x99, 0x03, 0x4f, 0x7b, 0xc8, 0xda, 0xfc, 0xa7, 0x00, 0xd5, 0xdc, 0x3b, 0x89, 0x01, + 0x8a, 0xfd, 0xc8, 0xdb, 0x9f, 0x86, 0xc8, 0xc0, 0x55, 0x28, 0xf5, 0x23, 0x6f, 0x87, 0x12, 0x81, + 0x4c, 0xbd, 0x78, 0xcd, 0x59, 0x88, 0x2c, 0xcd, 0xda, 0x0e, 0x43, 0x54, 0xc0, 0x75, 0x80, 0xf8, + 0x7b, 0x40, 0xa3, 0x10, 0xd9, 0x9a, 0x78, 0xca, 0x04, 0x45, 0x4b, 0x52, 0x9b, 0x5e, 0xa8, 0x68, + 0x51, 0x47, 0xe5, 0xcd, 0x44, 0x25, 0x8c, 0xa0, 0x26, 0x8b, 0x51, 0xc2, 0xc5, 0x99, 0xac, 0x52, + 0xc6, 0x4d, 0x40, 0x79, 0x44, 0x6d, 0xaa, 0x60, 0x0c, 0xf5, 0x7e, 0xe4, 0xbd, 0x09, 0x38, 0x25, + 0xa3, 0x0b, 0x72, 0x36, 0xa6, 0x08, 0xf0, 0x0a, 0x2c, 0xeb, 0x44, 0xf2, 0xc6, 0x4d, 0x23, 0x54, + 0xd5, 0xb4, 0xdd, 0x0b, 0x3a, 0xfa, 0xf1, 0xdb, 0x29, 0xe3, 0xd3, 0x09, 0xaa, 0xc9, 0xb6, 0xfb, + 0x91, 0xa7, 0x06, 0x74, 0x4e, 0xf9, 0x21, 0x25, 0x2e, 0xe5, 0x68, 0x59, 0xef, 0x3e, 0xf1, 0x27, + 0x94, 0x4d, 0xc5, 0x11, 0xfb, 0x09, 0xd5, 0xb5, 0x98, 0x01, 0x25, 0xae, 0x7a, 0x2f, 0x50, 0x43, + 0x8b, 0x49, 0x11, 0x25, 0x06, 0xe9, 0x7e, 0x5f, 0x73, 0xaa, 0x5a, 0x5c, 0xd1, 0x55, 0xf5, 0x5a, + 0x71, 0xb0, 0xde, 0x79, 0x2c, 0x18, 0x27, 0x1e, 0xdd, 0x0e, 0x43, 0x1a, 0xb8, 0x68, 0x15, 0x3b, + 0xd0, 0x9c, 0x47, 0x15, 0xbf, 0x29, 0x27, 0x36, 0x13, 0x19, 0x5f, 0xa3, 0x47, 0xf8, 0x31, 0xac, + 0xce, 0x81, 0x8a, 0xbd, 0xa6, 0xd9, 0x5f, 0x31, 0xee, 0x51, 0xa1, 0x3b, 0x7a, 0xbc, 0xf9, 0xab, + 0x09, 0xcd, 0x87, 0x4e, 0x24, 0xde, 0x00, 0xe7, 0x21, 0x7c, 0x7b, 0x2a, 0x18, 0x32, 0xf0, 0xc7, + 0xf0, 0xd1, 0x43, 0xd1, 0xaf, 0x99, 0x1f, 0x88, 0x83, 0x49, 0x38, 0xf6, 0x47, 0xbe, 0x9c, 0xfe, + 0xfb, 0x68, 0xaf, 0xae, 0x34, 0xcd, 0xda, 0xbc, 0x86, 0xfa, 0xec, 0x3d, 0x94, 0xfe, 0x67, 0xc8, + 0xb6, 0xeb, 0xca, 0x1b, 0x87, 0x0c, 0x69, 0x45, 0x06, 0x0f, 0xe8, 0x84, 0x5d, 0x52, 0x15, 0x31, + 0x67, 0x23, 0x6f, 0x42, 0x97, 0x88, 0x38, 0x62, 0xcd, 0x36, 0xb2, 0xed, 0xba, 0x87, 0xf1, 0x73, + 0xa7, 0xa2, 0x85, 0x9d, 0x67, 0x37, 0xef, 0x5a, 0xc6, 0xed, 0xbb, 0x96, 0x71, 0x73, 0xd7, 0x32, + 0x6f, 0xef, 0x5a, 0xe6, 0x7f, 0x77, 0x2d, 0xf3, 0xb7, 0xfb, 0x96, 0xf1, 0xfb, 0x7d, 0xcb, 0xb8, + 0xbd, 0x6f, 0x19, 0xff, 0xde, 0xb7, 0x8c, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0x4d, 0xc3, 0x29, + 0xaa, 0xbf, 0x09, 0x00, 0x00, } func (m *Entry) Marshal() (dAtA []byte, err error) { @@ -939,6 +943,9 @@ func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + i = encodeVarintRaft(dAtA, i, uint64(m.AppliedSnapshotIndex)) + i-- + dAtA[i] = 0x78 if len(m.Responses) > 0 { for iNdEx := len(m.Responses) - 1; iNdEx >= 0; iNdEx-- { { @@ -1321,6 +1328,7 @@ func (m *Message) Size() (n int) { n += 1 + l + sovRaft(uint64(l)) } } + n += 1 + sovRaft(uint64(m.AppliedSnapshotIndex)) return n } @@ -2156,6 +2164,25 @@ func (m *Message) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 15: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AppliedSnapshotIndex", wireType) + } + m.AppliedSnapshotIndex = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRaft + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.AppliedSnapshotIndex |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRaft(dAtA[iNdEx:]) diff --git a/raftpb/raft.proto b/raftpb/raft.proto index a8598ee5..04326ac4 100644 --- a/raftpb/raft.proto +++ b/raftpb/raft.proto @@ -105,6 +105,8 @@ message Message { // to respond and who to respond to when the work associated with a message // is complete. Populated for MsgStorageAppend and MsgStorageApply messages. repeated Message responses = 14 [(gogoproto.nullable) = false]; + // applied_snapshot_index is used for MsgSnapStatus to report back the applied snapshot index. + optional uint64 applied_snapshot_index = 15 [(gogoproto.nullable) = false]; } message HardState { diff --git a/raftpb/raft_test.go b/raftpb/raft_test.go index a369aad6..2e06d00f 100644 --- a/raftpb/raft_test.go +++ b/raftpb/raft_test.go @@ -40,7 +40,7 @@ func TestProtoMemorySizes(t *testing.T) { assert.Equal(t, if64Bit(144, 80), unsafe.Sizeof(s), "Snapshot size check") var m Message - assert.Equal(t, if64Bit(160, 112), unsafe.Sizeof(m), "Message size check") + assert.Equal(t, if64Bit(168, 112), unsafe.Sizeof(m), "Message size check") var hs HardState assert.Equal(t, uintptr(24), unsafe.Sizeof(hs), "HardState size check") diff --git a/rawnode.go b/rawnode.go index a4da2ae2..52b8e732 100644 --- a/rawnode.go +++ b/rawnode.go @@ -536,10 +536,15 @@ func (rn *RawNode) ReportUnreachable(id uint64) { } // ReportSnapshot reports the status of the sent snapshot. -func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) { +func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus, appliedSnapshotIndex uint64) { rej := status == SnapshotFailure - _ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}) + _ = rn.raft.Step(pb.Message{ + Type: pb.MsgSnapStatus, + From: id, + Reject: rej, + AppliedSnapshotIndex: appliedSnapshotIndex, + }) } // TransferLeader tries to transfer leadership to the given transferee. diff --git a/testdata/confchange_v1_add_single.txt b/testdata/confchange_v1_add_single.txt index f13d3077..90e7993f 100644 --- a/testdata/confchange_v1_add_single.txt +++ b/testdata/confchange_v1_add_single.txt @@ -72,7 +72,7 @@ stabilize DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -93,4 +93,4 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/4 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused] diff --git a/testdata/confchange_v2_add_double_auto.txt b/testdata/confchange_v2_add_double_auto.txt index f290c980..d3b93dfb 100644 --- a/testdata/confchange_v2_add_double_auto.txt +++ b/testdata/confchange_v2_add_double_auto.txt @@ -95,7 +95,7 @@ stabilize 1 2 DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -116,7 +116,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/4 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -171,7 +171,7 @@ stabilize 1 3 DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 3 for index 3 DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=5] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -192,7 +192,7 @@ stabilize 1 3 3->1 MsgAppResp Term:1 Log:0/5 > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/5 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=5 next=6 paused pendingSnap=5] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=5 next=6 paused] # Nothing else happens. stabilize diff --git a/testdata/confchange_v2_add_double_implicit.txt b/testdata/confchange_v2_add_double_implicit.txt index 2db3b7c1..b64aef2f 100644 --- a/testdata/confchange_v2_add_double_implicit.txt +++ b/testdata/confchange_v2_add_double_implicit.txt @@ -78,7 +78,7 @@ stabilize 1 2 DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -99,7 +99,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/4 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused] > 1 handling Ready Ready MustSync=false: Messages: diff --git a/testdata/confchange_v2_add_single_auto.txt b/testdata/confchange_v2_add_single_auto.txt index 3c234159..a022569b 100644 --- a/testdata/confchange_v2_add_single_auto.txt +++ b/testdata/confchange_v2_add_single_auto.txt @@ -73,7 +73,7 @@ stabilize DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -94,4 +94,4 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/4 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused] diff --git a/testdata/confchange_v2_add_single_explicit.txt b/testdata/confchange_v2_add_single_explicit.txt index 1d390e55..97aa2664 100644 --- a/testdata/confchange_v2_add_single_explicit.txt +++ b/testdata/confchange_v2_add_single_explicit.txt @@ -73,7 +73,7 @@ stabilize 1 2 DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] - DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused pendingSnap=4] + DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=5 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -94,7 +94,7 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/4 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused pendingSnap=4] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 2 [StateSnapshot match=4 next=5 paused] # Check that we're not allowed to change membership again while in the joint state. # This leads to an empty entry being proposed instead (index 5 in the stabilize block diff --git a/testdata/snapshot_succeed_via_app_resp.txt b/testdata/snapshot_succeed_via_app_resp.txt index b1fc4144..907c2820 100644 --- a/testdata/snapshot_succeed_via_app_resp.txt +++ b/testdata/snapshot_succeed_via_app_resp.txt @@ -87,7 +87,7 @@ stabilize 1 > 1 receiving messages 3->1 MsgHeartbeatResp Term:1 Log:0/0 DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11] - DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=12 paused pendingSnap=11] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=12 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -98,7 +98,7 @@ status 1 ---- 1: StateReplicate match=11 next=12 2: StateReplicate match=11 next=12 -3: StateSnapshot match=0 next=12 paused pendingSnap=11 +3: StateSnapshot match=0 next=12 paused # Follower applies the snapshot. Note how it reacts with a MsgAppResp upon completion. # The snapshot fully catches the follower up (i.e. there are no more log entries it @@ -126,7 +126,7 @@ stabilize 1 ---- > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/11 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused pendingSnap=11] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=12 paused] status 1 ---- diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt index 3bab832d..4f3a55b9 100644 --- a/testdata/snapshot_succeed_via_app_resp_behind.txt +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -1,5 +1,6 @@ -# This is a variant of snapshot_succeed_via_app_resp in which the snapshot -# that is being sent is behind the PendingSnapshot index tracked by the leader. +# This test verifies that a snapshot sent to a follower is applied correctly, +# even when the leader's log progresses after sending the snapshot, making it +# no longer the latest snapshot. # Turn off output during the setup of the test. log-level none @@ -96,7 +97,7 @@ DEBUG 3 [logterm: 0, index: 10] rejected MsgApp [logterm: 1, index: 10] from 1 # Note below that the RejectionHint is 5, which is below the first index 10 of the # leader. Once the leader receives this, it will move 3 into StateSnapshot with -# PendingSnapshot=lastIndex=12. +# lastIndex=12. process-ready 3 ---- Ready MustSync=false: @@ -124,7 +125,7 @@ stabilize 1 DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10 DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6] DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6] - DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=13 paused pendingSnap=12] + DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=13 paused] > 1 handling Ready Ready MustSync=false: Messages: @@ -152,14 +153,13 @@ stabilize 1 ---- > 1 receiving messages 3->1 MsgAppResp Term:1 Log:0/11 - DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=13 paused pendingSnap=12] + DEBUG 1 recovered from needing snapshot, resumed sending replication messages to 3 [StateSnapshot match=11 next=13 paused] > 1 handling Ready Ready MustSync=false: Messages: 1->3 MsgApp Term:1 Log:1/11 Commit:12 Entries:[1/12 EntryNormal "\"foo\""] # 3 is in StateReplicate thanks to receiving the snapshot at index 11. -# This is despite its PendingSnapshot having been 12. status 1 ---- 1: StateReplicate match=12 next=13 diff --git a/tracker/progress.go b/tracker/progress.go index 5716661c..a8262a6f 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -36,8 +36,6 @@ type Progress struct { // // Invariant: 0 <= Match < Next. // NB: it follows that Next >= 1. - // - // In StateSnapshot, Next == PendingSnapshot + 1. Next uint64 // sentCommit is the highest commit index in flight to the follower. @@ -45,7 +43,7 @@ type Progress struct { // Generally, it is monotonic, but con regress in some cases, e.g. when // converting to `StateProbe` or when receiving a rejection from a follower. // - // In StateSnapshot, sentCommit == PendingSnapshot == Next-1. + // In StateSnapshot, sentCommit == Next-1. sentCommit uint64 // State defines how the leader should interact with the follower. @@ -61,29 +59,6 @@ type Progress struct { // before and stops sending any replication message. State StateType - // PendingSnapshot is used in StateSnapshot and tracks the last index of the - // leader at the time at which it realized a snapshot was necessary. This - // matches the index in the MsgSnap message emitted from raft. - // - // While there is a pending snapshot, replication to the follower is paused. - // The follower will transition back to StateReplicate if the leader - // receives an MsgAppResp from it that reconnects the follower to the - // leader's log (such an MsgAppResp is emitted when the follower applies a - // snapshot). It may be surprising that PendingSnapshot is not taken into - // account here, but consider that complex systems may delegate the sending - // of snapshots to alternative datasources (i.e. not the leader). In such - // setups, it is difficult to manufacture a snapshot at a particular index - // requested by raft and the actual index may be ahead or behind. This - // should be okay, as long as the snapshot allows replication to resume. - // - // The follower will transition to StateProbe if ReportSnapshot is called on - // the leader; if SnapshotFinish is passed then PendingSnapshot becomes the - // basis for the next attempt to append. In practice, the first mechanism is - // the one that is relevant in most cases. However, if this MsgAppResp is - // lost (fallible network) then the second mechanism ensures that in this - // case the follower does not erroneously remain in StateSnapshot. - PendingSnapshot uint64 - // RecentActive is true if the progress is recently active. Receiving any messages // from the corresponding follower indicates the progress is active. // RecentActive can be reset to false after an election timeout. @@ -117,28 +92,17 @@ type Progress struct { } // ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, -// PendingSnapshot, and Inflights. +// and Inflights. func (pr *Progress) ResetState(state StateType) { pr.MsgAppFlowPaused = false - pr.PendingSnapshot = 0 pr.State = state pr.Inflights.reset() } -// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, -// optionally and if larger, the index of the pending snapshot. +// BecomeProbe transitions into StateProbe. Next is reset to Match+1. func (pr *Progress) BecomeProbe() { - // If the original state is StateSnapshot, progress knows that - // the pending snapshot has been sent to this peer successfully, then - // probes from pendingSnapshot + 1. - if pr.State == StateSnapshot { - pendingSnapshot := pr.PendingSnapshot - pr.ResetState(StateProbe) - pr.Next = max(pr.Match+1, pendingSnapshot+1) - } else { - pr.ResetState(StateProbe) - pr.Next = pr.Match + 1 - } + pr.ResetState(StateProbe) + pr.Next = pr.Match + 1 pr.sentCommit = min(pr.sentCommit, pr.Next-1) } @@ -148,15 +112,20 @@ func (pr *Progress) BecomeReplicate() { pr.Next = pr.Match + 1 } -// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending -// snapshot index. +// BecomeSnapshot moves the Progress to StateSnapshot. func (pr *Progress) BecomeSnapshot(snapshoti uint64) { pr.ResetState(StateSnapshot) - pr.PendingSnapshot = snapshoti pr.Next = snapshoti + 1 pr.sentCommit = snapshoti } +// OnSnapshotApplied updates the Progress state based on the applied snapshot index. +// This method sets Match to the given index and adjusts Next accordingly. +func (pr *Progress) OnSnapshotApplied(appliedSnapshotIndex uint64) { + pr.Match = appliedSnapshotIndex + pr.Next = pr.Match + 1 +} + // SentEntries updates the progress on the given number of consecutive entries // being sent in a MsgApp, with the given total bytes size, appended at log // indices >= pr.Next. @@ -281,9 +250,6 @@ func (pr *Progress) String() string { if pr.IsPaused() { fmt.Fprint(&buf, " paused") } - if pr.PendingSnapshot > 0 { - fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot) - } if !pr.RecentActive { fmt.Fprint(&buf, " inactive") } diff --git a/tracker/progress_test.go b/tracker/progress_test.go index 49dedb53..1b5bf192 100644 --- a/tracker/progress_test.go +++ b/tracker/progress_test.go @@ -27,13 +27,12 @@ func TestProgressString(t *testing.T) { Match: 1, Next: 2, State: StateSnapshot, - PendingSnapshot: 123, RecentActive: false, MsgAppFlowPaused: true, IsLearner: true, Inflights: ins, } - const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]` + const exp = `StateSnapshot match=1 next=2 learner paused inactive inflight=1[full]` assert.Equal(t, exp, pr.String()) } @@ -87,12 +86,7 @@ func TestProgressBecomeProbe(t *testing.T) { }, { // snapshot finish - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256, 0)}, - 11, - }, - { - // snapshot failure - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256, 0)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, Inflights: NewInflights(256, 0)}, 2, }, } @@ -117,7 +111,18 @@ func TestProgressBecomeSnapshot(t *testing.T) { p.BecomeSnapshot(10) assert.Equal(t, StateSnapshot, p.State) assert.Equal(t, uint64(1), p.Match) - assert.Equal(t, uint64(10), p.PendingSnapshot) + assert.Equal(t, uint64(11), p.Next) // Next is set to snapshot index + 1 + assert.Equal(t, uint64(10), p.sentCommit) +} + +func TestProgressOnSnapshotApplied(t *testing.T) { + p := &Progress{State: StateSnapshot, Match: 5, Next: 6, Inflights: NewInflights(256, 0)} + appliedSnapshotIndex := uint64(10) + + p.OnSnapshotApplied(appliedSnapshotIndex) + + assert.Equal(t, appliedSnapshotIndex, p.Match) + assert.Equal(t, appliedSnapshotIndex+1, p.Next) } func TestProgressUpdate(t *testing.T) {