Skip to content

Commit

Permalink
Ignore WAL indices before snapshot that is being restored
Browse files Browse the repository at this point in the history
If the replica has stray WAL files from older snapshots they will
break restoring even though they can be ignored and reaped later
through retention.

In addition if there are no snapshots any random WAL file should be
ignored when it doesn't have a matching snapshot.
  • Loading branch information
hifi committed Dec 16, 2023
1 parent 7badf0e commit 373610f
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
}
defer sitr.Close()

minIndex, maxIndex := -1, -1
for sitr.Next() {
info := sitr.Snapshot()
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
Expand All @@ -969,6 +970,12 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
if updatedAt.IsZero() || info.CreatedAt.After(updatedAt) {
updatedAt = info.CreatedAt
}
if minIndex == -1 || info.Index < minIndex {
minIndex = info.Index
}
if maxIndex == -1 || info.Index > maxIndex {
maxIndex = info.Index
}
}
if err := sitr.Close(); err != nil {
return createdAt, updatedAt, err
Expand All @@ -983,6 +990,9 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (

for witr.Next() {
info := witr.WALSegment()
if info.Index < minIndex || info.Index > maxIndex {
continue
}
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
createdAt = info.CreatedAt
}
Expand Down Expand Up @@ -1077,7 +1087,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
}

// Compute list of offsets for each WAL index.
walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, opt.Index, opt.Timestamp)
walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, minWALIndex, opt.Index, opt.Timestamp)
if err != nil {
return fmt.Errorf("cannot find max wal index for restore: %w", err)
}
Expand Down Expand Up @@ -1285,7 +1295,7 @@ func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, i

// walSegmentMap returns a map of WAL indices to their segments.
// Filters by a max timestamp or a max index.
func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) {
func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) {
itr, err := r.Client.WALSegments(ctx, generation)
if err != nil {
return nil, err
Expand All @@ -1301,6 +1311,8 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex
break // after max timestamp, skip
} else if info.Index > maxIndex {
break // after max index, skip
} else if info.Index < minIndex {
continue // before min index, continue
}

// Verify offsets are added in order.
Expand Down

0 comments on commit 373610f

Please sign in to comment.