Skip to content

Commit

Permalink
Ignore WAL indices before snapshot that is being restored (benbjohnso…
Browse files Browse the repository at this point in the history
  • Loading branch information
hifi authored Dec 16, 2023
1 parent 7badf0e commit c633eb1
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,7 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
}
defer sitr.Close()

minIndex, maxIndex := -1, -1
for sitr.Next() {
info := sitr.Snapshot()
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
Expand All @@ -969,6 +970,12 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (
if updatedAt.IsZero() || info.CreatedAt.After(updatedAt) {
updatedAt = info.CreatedAt
}
if minIndex == -1 || info.Index < minIndex {
minIndex = info.Index
}
if info.Index > maxIndex {
maxIndex = info.Index
}
}
if err := sitr.Close(); err != nil {
return createdAt, updatedAt, err
Expand All @@ -983,6 +990,9 @@ func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (

for witr.Next() {
info := witr.WALSegment()
if info.Index < minIndex || info.Index > maxIndex {
continue
}
if createdAt.IsZero() || info.CreatedAt.Before(createdAt) {
createdAt = info.CreatedAt
}
Expand Down Expand Up @@ -1077,7 +1087,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
}

// Compute list of offsets for each WAL index.
walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, opt.Index, opt.Timestamp)
walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, minWALIndex, opt.Index, opt.Timestamp)
if err != nil {
return fmt.Errorf("cannot find max wal index for restore: %w", err)
}
Expand Down Expand Up @@ -1285,7 +1295,7 @@ func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, i

// walSegmentMap returns a map of WAL indices to their segments.
// Filters by a max timestamp or a max index.
func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) {
func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) {
itr, err := r.Client.WALSegments(ctx, generation)
if err != nil {
return nil, err
Expand All @@ -1301,6 +1311,8 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, maxIndex
break // after max timestamp, skip
} else if info.Index > maxIndex {
break // after max index, skip
} else if info.Index < minIndex {
continue // before min index, continue
}

// Verify offsets are added in order.
Expand Down

0 comments on commit c633eb1

Please sign in to comment.