diff --git a/replica.go b/replica.go index bf858836..95195c9e 100644 --- a/replica.go +++ b/replica.go @@ -55,6 +55,9 @@ type Replica struct { // Frequency to create new snapshots. SnapshotInterval time.Duration + // Time waited before starting the snapshot interval. Skips checking replica to calculate offset. + SnapshotOffset time.Duration + // Time to keep snapshots and related WAL files. // Database is snapshotted after interval, if needed, and older WAL files are discarded. Retention time.Duration @@ -62,9 +65,15 @@ type Replica struct { // Time between checks for retention. RetentionCheckInterval time.Duration + // Time waited before starting the check interval. + RetentionCheckOffset time.Duration + // Time between validation checks. ValidationInterval time.Duration + // Time waited before starting the validation checks. + ValidationOffset time.Duration + // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool @@ -715,6 +724,20 @@ func (r *Replica) retainer(ctx context.Context) { checkInterval = r.Retention } + // Offset retention checks if configured. + if r.RetentionCheckOffset > 0 { + r.Logger().Info("retention check interval adjusted", "next", time.Now().Add(r.RetentionCheckOffset).Format(time.RFC3339)) + + select { + case <-ctx.Done(): + return + case <-time.After(r.RetentionCheckOffset): + if err := r.EnforceRetention(ctx); err != nil { + r.Logger().Error("retainer error", "error", err) + } + } + } + ticker := time.NewTicker(checkInterval) defer ticker.Stop() @@ -738,26 +761,31 @@ func (r *Replica) snapshotter(ctx context.Context) { } logger := r.Logger() - if pos, err := r.db.Pos(); err != nil { - logger.Error("snapshotter cannot determine generation", "error", err) - } else if !pos.IsZero() { - if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil { - logger.Error("snapshotter cannot determine latest snapshot", "error", err) - } else if snapshot != nil { - nextSnapshot := r.SnapshotInterval - time.Since(snapshot.CreatedAt) - if nextSnapshot < 0 { - nextSnapshot = 0 + if r.SnapshotOffset == 0 { + if pos, err := r.db.Pos(); err != nil { + logger.Error("snapshotter cannot determine generation", "error", err) + } else if !pos.IsZero() { + if snapshot, err := r.maxSnapshot(ctx, pos.Generation); err != nil { + logger.Error("snapshotter cannot determine latest snapshot", "error", err) + } else if snapshot != nil { + r.SnapshotOffset = r.SnapshotInterval - time.Since(snapshot.CreatedAt) + // ensure we will snapshot immediately if zero than less + if r.SnapshotOffset < 1 { + r.SnapshotOffset = 1 + } } + } + } - logger.Info("snapshot interval adjusted", "previous", snapshot.CreatedAt.Format(time.RFC3339), "next", nextSnapshot.String()) + if r.SnapshotOffset > 0 { + logger.Info("snapshot interval adjusted", "next", time.Now().Add(r.SnapshotOffset).Format(time.RFC3339)) - select { - case <-ctx.Done(): - return - case <-time.After(nextSnapshot): - if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration { - logger.Error("snapshotter error", "error", err) - } + select { + case <-ctx.Done(): + return + case <-time.After(r.SnapshotOffset): + if _, err := r.Snapshot(ctx); err != nil && err != ErrNoGeneration { + logger.Error("snapshotter error", "error", err) } } } @@ -790,6 +818,20 @@ func (r *Replica) validator(ctx context.Context) { return } + // Offset validations if configured. + if r.ValidationOffset > 0 { + r.Logger().Info("validation interval adjusted", "next", time.Now().Add(r.ValidationInterval).Format(time.RFC3339)) + + select { + case <-ctx.Done(): + return + case <-time.After(r.ValidationOffset): + if err := r.Validate(ctx); err != nil { + r.Logger().Error("validation error", "error", err) + } + } + } + ticker := time.NewTicker(r.ValidationInterval) defer ticker.Stop()