diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 3bb20c1a..3018815a 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -94,19 +94,19 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { // Wait for signal to stop program. select { - case err = <-c.execCh: + case err = <-c.runCh: fmt.Println("subprocess exited, litestream shutting down") case sig := <-signalCh: fmt.Println("signal received, litestream shutting down") - if c.cmd != nil { + if c.runSignal != nil { fmt.Println("sending signal to exec process") - if err := c.cmd.Process.Signal(sig); err != nil { + if err := c.runSignal(sig); err != nil { return fmt.Errorf("cannot signal exec process: %w", err) } fmt.Println("waiting for exec process to close") - if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") { + if err := <-c.runCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") { return fmt.Errorf("cannot wait for exec process: %w", err) } } diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index cd95d961..d85a88df 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -23,8 +23,12 @@ import ( // ReplicateCommand represents a command that continuously replicates SQLite databases. type ReplicateCommand struct { - cmd *exec.Cmd // subcommand - execCh chan error // subcommand error channel + runSignal func(os.Signal) error // run cancel signaler + runCh chan error // run error channel + + once bool // replicate once and exit + forceSnapshot bool // force snapshot to all replicas + enforceRetention bool // enforce retention of old snapshots Config Config @@ -34,7 +38,7 @@ type ReplicateCommand struct { func NewReplicateCommand() *ReplicateCommand { return &ReplicateCommand{ - execCh: make(chan error), + runCh: make(chan error), } } @@ -42,6 +46,9 @@ func NewReplicateCommand() *ReplicateCommand { func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) { fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError) execFlag := fs.String("exec", "", "execute subcommand") + onceFlag := fs.Bool("once", false, "replicate once and exit") + forceSnapshotFlag := fs.Bool("force-snapshot", false, "force snapshot when replicating once") + enforceRetentionFlag := fs.Bool("enforce-retention", false, "enforce retention of old snapshots") tracePath := fs.String("trace", "", "trace path") configPath, noExpandEnv := registerConfigFlag(fs) fs.Usage = c.Usage @@ -90,6 +97,22 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e litestream.Tracef = log.New(f, "", log.LstdFlags|log.Lmicroseconds|log.LUTC|log.Lshortfile).Printf } + // Once is mutually exclusive with exec + c.once = *onceFlag + if c.once && c.Config.Exec != "" { + return fmt.Errorf("cannot specify -once flag with exec") + } + + c.forceSnapshot = *forceSnapshotFlag + if !c.once && c.forceSnapshot { + return fmt.Errorf("cannot specify -force-snapshot flag without -once") + } + + c.enforceRetention = *enforceRetentionFlag + if !c.once && c.enforceRetention { + return fmt.Errorf("cannot specify -enforce-retention flag without -once") + } + return nil } @@ -109,6 +132,14 @@ func (c *ReplicateCommand) Run() (err error) { return err } + // Disable monitors if we're running once. + if c.once { + db.MonitorInterval = 0 + for _, r := range db.Replicas { + r.MonitorEnabled = false + } + } + // Open database & attach to program. if err := db.Open(); err != nil { return err @@ -162,14 +193,65 @@ func (c *ReplicateCommand) Run() (err error) { return fmt.Errorf("cannot parse exec command: %w", err) } - c.cmd = exec.Command(execArgs[0], execArgs[1:]...) - c.cmd.Env = os.Environ() - c.cmd.Stdout = os.Stdout - c.cmd.Stderr = os.Stderr - if err := c.cmd.Start(); err != nil { + cmd := exec.Command(execArgs[0], execArgs[1:]...) + cmd.Env = os.Environ() + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { return fmt.Errorf("cannot start exec command: %w", err) } - go func() { c.execCh <- c.cmd.Wait() }() + c.runSignal = cmd.Process.Signal + go func() { c.runCh <- cmd.Wait() }() + } else if c.once { + // Run replication once for each replica with cancel. + ctx, cancel := context.WithCancel(context.Background()) + c.runSignal = func(s os.Signal) error { + cancel() + return nil + } + + go func() { + var err error + + defer func() { + cancel() + c.runCh <- err + }() + + for _, db := range c.DBs { + if c.forceSnapshot { + // Force next index with RESTART checkpoint. + db.MaxCheckpointPageN = 1 + } + + if err = db.Sync(ctx); err != nil { + return + } + + // Prevent checkpointing on Close() + db.MinCheckpointPageN = 0 + db.MaxCheckpointPageN = 0 + db.TruncatePageN = 0 + db.CheckpointInterval = 0 + + for _, r := range db.Replicas { + if c.forceSnapshot { + _, err = r.Snapshot(ctx) + } else { + err = r.Sync(ctx) + } + if err != nil { + return + } + + if c.enforceRetention { + if err = r.EnforceRetention(ctx); err != nil { + return + } + } + } + } + }() } return nil @@ -212,6 +294,15 @@ Arguments: Executes a subcommand. Litestream will exit when the child process exits. Useful for simple process management. + -once + Execute replication once and exit. + + -force-snapshot + When replicating once, force taking a snapshot to all replicas. + + -enforce-retention + When replicating once, enforce rentention of old snapshots. + -no-expand-env Disables environment variable expansion in configuration file. diff --git a/db.go b/db.go index 4ef73124..71dd7cc7 100644 --- a/db.go +++ b/db.go @@ -761,7 +761,7 @@ func (db *DB) Sync(ctx context.Context) (err error) { checkpointMode := CheckpointModePassive if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { checkpoint, checkpointMode = true, CheckpointModeRestart - } else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { + } else if db.MinCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { checkpoint = true } else if db.CheckpointInterval > 0 && !info.dbModTime.IsZero() && time.Since(info.dbModTime) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) { checkpoint = true