Skip to content

Commit

Permalink
Allow setting replica interval offsets
Browse files Browse the repository at this point in the history
If using S3 replicas the extra LIST call on startup may be
expensive in scale if a lot of Litestreams are started in quick
succession.

Allowing configuring the snapshot offset externally allows the
caller to spread around the snapshots without relying on access to
the remote replica making restarts no-op.

Same thing applies to retention checks that they can easily stack
too close.
  • Loading branch information
hifi committed Nov 17, 2023
1 parent bb8e90c commit abcf103
Showing 1 changed file with 59 additions and 17 deletions.
76 changes: 59 additions & 17 deletions replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,25 @@ type Replica struct {
// Frequency to create new snapshots.
SnapshotInterval time.Duration

// Time waited before starting the snapshot interval. Skips checking replica to calculate offset.
SnapshotOffset time.Duration

// Time to keep snapshots and related WAL files.
// Database is snapshotted after interval, if needed, and older WAL files are discarded.
Retention time.Duration

// Time between checks for retention.
RetentionCheckInterval time.Duration

// Time waited before starting the check interval.
RetentionCheckOffset time.Duration

// Time between validation checks.
ValidationInterval time.Duration

// Time waited before starting the validation checks.
ValidationOffset time.Duration

// If true, replica monitors database for changes automatically.
// Set to false if replica is being used synchronously (such as in tests).
MonitorEnabled bool
Expand Down Expand Up @@ -721,6 +730,20 @@ func (r *Replica) retainer(ctx context.Context) {
checkInterval = r.Retention
}

// Offset retention checks if configured.
if r.RetentionCheckOffset > 0 {
r.Logger().Info("retention check interval adjusted", "next", time.Now().Add(r.RetentionCheckOffset).Format(time.RFC3339))

select {
case <-ctx.Done():
return
case <-time.After(r.RetentionCheckOffset):
if err := r.EnforceRetention(ctx); err != nil {
r.Logger().Error("retainer error", "error", err)
}
}
}

ticker := time.NewTicker(checkInterval)
defer ticker.Stop()

Expand All @@ -744,26 +767,31 @@ func (r *Replica) snapshotter(ctx context.Context) {
}

logger := r.Logger()
if pos, err := r.db.Pos(); err != nil {
logger.Error("snapshotter cannot determine generation", "error", err)
} else if !pos.IsZero() {
if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil {
logger.Error("snapshotter cannot determine latest snapshot", "error", err)
} else if snapshot != nil {
nextSnapshot := r.SnapshotInterval - time.Since(snapshot.CreatedAt)
if nextSnapshot < 0 {
nextSnapshot = 0
if r.SnapshotOffset == 0 {
if pos, err := r.db.Pos(); err != nil {
logger.Error("snapshotter cannot determine generation", "error", err)
} else if !pos.IsZero() {
if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil {
logger.Error("snapshotter cannot determine latest snapshot", "error", err)
} else if snapshot != nil {
r.SnapshotOffset = r.SnapshotInterval - time.Since(snapshot.CreatedAt)
// ensure we will snapshot immediately if zero than less
if r.SnapshotOffset < 1 {
r.SnapshotOffset = 1
}
}
}
}

logger.Info("snapshot interval adjusted", "previous", snapshot.CreatedAt.Format(time.RFC3339), "next", nextSnapshot.String())
if r.SnapshotOffset > 0 {
logger.Info("snapshot interval adjusted", "next", time.Now().Add(r.SnapshotOffset).Format(time.RFC3339))

select {
case <-ctx.Done():
return
case <-time.After(nextSnapshot):
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
logger.Error("snapshotter error", "error", err)
}
select {
case <-ctx.Done():
return
case <-time.After(r.SnapshotOffset):
if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration {
logger.Error("snapshotter error", "error", err)
}
}
}
Expand Down Expand Up @@ -796,6 +824,20 @@ func (r *Replica) validator(ctx context.Context) {
return
}

// Offset validations if configured.
if r.ValidationOffset > 0 {
r.Logger().Info("validation interval adjusted", "next", time.Now().Add(r.ValidationInterval).Format(time.RFC3339))

select {
case <-ctx.Done():
return
case <-time.After(r.ValidationOffset):
if err := r.Validate(ctx); err != nil {
r.Logger().Error("validation error", "error", err)
}
}
}

ticker := time.NewTicker(r.ValidationInterval)
defer ticker.Stop()

Expand Down

0 comments on commit abcf103

Please sign in to comment.