From c633eb1fea919516211aeec8dc633e253cf17747 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Sat, 16 Dec 2023 12:01:33 +0200 Subject: [PATCH] Ignore WAL indices before snapshot that is being restored (#527) --- replica.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/replica.go b/replica.go index b8b3a9e5..6872bca1 100644 --- a/replica.go +++ b/replica.go @@ -961,6 +961,7 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) ( } defer sitr.Close() + minIndex, maxIndex := -1, -1 for sitr.Next() { info := sitr.Snapshot() if createdAt.IsZero() || info.CreatedAt.Before(createdAt) { @@ -969,6 +970,12 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) ( if updatedAt.IsZero() || info.CreatedAt.After(updatedAt) { updatedAt = info.CreatedAt } + if minIndex == -1 || info.Index < minIndex { + minIndex = info.Index + } + if info.Index > maxIndex { + maxIndex = info.Index + } } if err := sitr.Close(); err != nil { return createdAt, updatedAt, err @@ -983,6 +990,9 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) ( for witr.Next() { info := witr.WALSegment() + if info.Index < minIndex || info.Index > maxIndex { + continue + } if createdAt.IsZero() || info.CreatedAt.Before(createdAt) { createdAt = info.CreatedAt } @@ -1077,7 +1087,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { } // Compute list of offsets for each WAL index. - walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, opt.Index, opt.Timestamp) + walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, minWALIndex, opt.Index, opt.Timestamp) if err != nil { return fmt.Errorf("cannot find max wal index for restore: %w", err) } @@ -1285,7 +1295,7 @@ func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, i // walSegmentMap returns a map of WAL indices to their segments. // Filters by a max timestamp or a max index. -func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) { +func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) { itr, err := r.Client.WALSegments(ctx, generation) if err != nil { return nil, err @@ -1301,6 +1311,8 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex break // after max timestamp, skip } else if info.Index > maxIndex { break // after max index, skip + } else if info.Index < minIndex { + continue // before min index, continue } // Verify offsets are added in order.