From c6455e3f7bd5dde4acba9e5cb443961bd4bcb44c Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Thu, 30 Nov 2023 08:50:33 +0200 Subject: [PATCH] Ignore WAL indices before snapshot that is being restored If the replica has stray WAL files from older snapshots they will break restoring even though they can be ignored and reaped later through retention. In addition if there are no snapshots any random WAL file should be ignored when it doesn't have a matching snapshot. --- replica.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/replica.go b/replica.go index bf858836..1894be4e 100644 --- a/replica.go +++ b/replica.go @@ -956,6 +956,7 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) ( } defer sitr.Close() + minIndex, maxIndex := -1, -1 for sitr.Next() { info := sitr.Snapshot() if createdAt.IsZero() || info.CreatedAt.Before(createdAt) { @@ -964,6 +965,12 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) ( if updatedAt.IsZero() || info.CreatedAt.After(updatedAt) { updatedAt = info.CreatedAt } + if minIndex == -1 || info.Index < minIndex { + minIndex = info.Index + } + if maxIndex == -1 || info.Index > maxIndex { + maxIndex = info.Index + } } if err := sitr.Close(); err != nil { return createdAt, updatedAt, err @@ -978,6 +985,9 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) ( for witr.Next() { info := witr.WALSegment() + if info.Index < minIndex || info.Index > maxIndex { + continue + } if createdAt.IsZero() || info.CreatedAt.Before(createdAt) { createdAt = info.CreatedAt } @@ -1072,7 +1082,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { } // Compute list of offsets for each WAL index. - walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, opt.Index, opt.Timestamp) + walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, minWALIndex, opt.Index, opt.Timestamp) if err != nil { return fmt.Errorf("cannot find max wal index for restore: %w", err) } @@ -1280,7 +1290,7 @@ func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, i // walSegmentMap returns a map of WAL indices to their segments. // Filters by a max timestamp or a max index. -func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) { +func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) { itr, err := r.Client.WALSegments(ctx, generation) if err != nil { return nil, err @@ -1296,6 +1306,8 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex break // after max timestamp, skip } else if info.Index > maxIndex { break // after max index, skip + } else if info.Index < minIndex { + continue // ignore indices before snapshot } // Verify offsets are added in order.