diff --git a/abs/replica_client.go b/abs/replica_client.go index 4d5e00ef..04be73b9 100644 --- a/abs/replica_client.go +++ b/abs/replica_client.go @@ -168,14 +168,14 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites } // WriteSnapshot writes LZ4 compressed data from rd to the object storage. -func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { +func (c *ReplicaClient) WriteSnapshot(ctx context.Context, info *litestream.SnapshotInfo, rd io.Reader) error { if err := c.Init(ctx); err != nil { - return info, err + return err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, *info) if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) + return fmt.Errorf("cannot determine snapshot path: %w", err) } startTime := time.Now() @@ -186,7 +186,7 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: "application/octet-stream"}, BlobAccessTier: azblob.DefaultAccessTier, }); err != nil { - return info, err + return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() @@ -194,21 +194,18 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) - return litestream.SnapshotInfo{ - Generation: generation, - Index: index, - Size: rc.N(), - CreatedAt: startTime.UTC(), - }, nil + info.Size = rc.N() + info.CreatedAt = startTime.UTC() + return nil } // SnapshotReader returns a reader for snapshot data at the given generation/index. -func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { +func (c *ReplicaClient) SnapshotReader(ctx context.Context, info litestream.SnapshotInfo) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, info) if err != nil { return nil, fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -228,12 +225,12 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i } // DeleteSnapshot deletes a snapshot with the given generation & index. -func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { +func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, info litestream.SnapshotInfo) error { if err := c.Init(ctx); err != nil { return err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, info) if err != nil { return fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -258,14 +255,14 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. -func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { +func (c *ReplicaClient) WriteWALSegment(ctx context.Context, info *litestream.WALSegmentInfo, rd io.Reader) error { if err := c.Init(ctx); err != nil { - return info, err + return err } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, *info) if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) + return fmt.Errorf("cannot determine wal segment path: %w", err) } startTime := time.Now() @@ -276,29 +273,25 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: "application/octet-stream"}, BlobAccessTier: azblob.DefaultAccessTier, }); err != nil { - return info, err + return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N())) - return litestream.WALSegmentInfo{ - Generation: pos.Generation, - Index: pos.Index, - Offset: pos.Offset, - Size: rc.N(), - CreatedAt: startTime.UTC(), - }, nil + info.Size = rc.N() + info.CreatedAt = startTime.UTC() + return nil } // WALSegmentReader returns a reader for a section of WAL data at the given index. // Returns os.ErrNotExist if no matching index/offset is found. -func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { +func (c *ReplicaClient) WALSegmentReader(ctx context.Context, info litestream.WALSegmentInfo) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, info) if err != nil { return nil, fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -318,13 +311,13 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos } // DeleteWALSegments deletes WAL segments with at the given positions. -func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error { +func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.WALSegmentInfo) error { if err := c.Init(ctx); err != nil { return err } - for _, pos := range a { - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + for _, info := range a { + key, err := litestream.WALSegmentPath(c.Path, info) if err != nil { return fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -388,19 +381,18 @@ func (itr *snapshotIterator) fetch() error { marker = resp.NextMarker for _, item := range resp.Segment.BlobItems { - key := path.Base(item.Name) - index, err := litestream.ParseSnapshotPath(key) - if err != nil { - continue - } - info := litestream.SnapshotInfo{ Generation: itr.generation, - Index: index, Size: *item.Properties.ContentLength, CreatedAt: item.Properties.CreationTime.UTC(), } + key := path.Base(item.Name) + err := info.ParsePath(key) + if err != nil { + continue + } + select { case <-itr.ctx.Done(): case itr.ch <- info: @@ -494,20 +486,18 @@ func (itr *walSegmentIterator) fetch() error { marker = resp.NextMarker for _, item := range resp.Segment.BlobItems { - key := path.Base(item.Name) - index, offset, err := litestream.ParseWALSegmentPath(key) - if err != nil { - continue - } - info := litestream.WALSegmentInfo{ Generation: itr.generation, - Index: index, - Offset: offset, Size: *item.Properties.ContentLength, CreatedAt: item.Properties.CreationTime.UTC(), } + key := path.Base(item.Name) + err := info.ParsePath(key) + if err != nil { + continue + } + select { case <-itr.ctx.Done(): case itr.ch <- info: diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 9698bf50..085e961f 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -18,6 +18,7 @@ import ( "filippo.io/age" "github.com/benbjohnson/litestream" + "github.com/benbjohnson/litestream/abs" "github.com/benbjohnson/litestream/file" "github.com/benbjohnson/litestream/gcs" diff --git a/file/replica_client.go b/file/replica_client.go index adbdbf04..d4b64cdb 100644 --- a/file/replica_client.go +++ b/file/replica_client.go @@ -77,12 +77,12 @@ func (c *ReplicaClient) SnapshotsDir(generation string) (string, error) { } // SnapshotPath returns the path to an uncompressed snapshot file. -func (c *ReplicaClient) SnapshotPath(generation string, index int) (string, error) { - dir, err := c.SnapshotsDir(generation) +func (c *ReplicaClient) SnapshotPath(info litestream.SnapshotInfo) (string, error) { + dir, err := c.SnapshotsDir(info.Generation) if err != nil { return "", err } - return filepath.Join(dir, litestream.FormatSnapshotPath(index)), nil + return filepath.Join(dir, info.FormatPath()), nil } // WALDir returns the path to a generation's WAL directory @@ -95,12 +95,12 @@ func (c *ReplicaClient) WALDir(generation string) (string, error) { } // WALSegmentPath returns the path to a WAL segment file. -func (c *ReplicaClient) WALSegmentPath(generation string, index int, offset int64) (string, error) { - dir, err := c.WALDir(generation) +func (c *ReplicaClient) WALSegmentPath(info litestream.WALSegmentInfo) (string, error) { + dir, err := c.WALDir(info.Generation) if err != nil { return "", err } - return filepath.Join(dir, litestream.FormatWALSegmentPath(index, offset)), nil + return filepath.Join(dir, info.FormatPath()), nil } // Generations returns a list of available generation names. @@ -165,28 +165,28 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites // Iterate over every file and convert to metadata. infos := make([]litestream.SnapshotInfo, 0, len(fis)) for _, fi := range fis { - // Parse index from filename. - index, err := litestream.ParseSnapshotPath(fi.Name()) + info := litestream.SnapshotInfo{ + Generation: generation, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + } + + err := info.ParsePath(fi.Name()) if err != nil { continue } - infos = append(infos, litestream.SnapshotInfo{ - Generation: generation, - Index: index, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), - }) + infos = append(infos, info) } return litestream.NewSnapshotInfoSliceIterator(infos), nil } // WriteSnapshot writes LZ4 compressed data from rd into a file on disk. -func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { - filename, err := c.SnapshotPath(generation, index) +func (c *ReplicaClient) WriteSnapshot(ctx context.Context, info *litestream.SnapshotInfo, rd io.Reader) error { + filename, err := c.SnapshotPath(*info) if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) + return fmt.Errorf("cannot determine snapshot path: %w", err) } var fileInfo, dirInfo os.FileInfo @@ -196,48 +196,45 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in // Ensure parent directory exists. if err := internal.MkdirAll(filepath.Dir(filename), dirInfo); err != nil { - return info, err + return err } // Write snapshot to temporary file next to destination path. f, err := internal.CreateFile(filename+".tmp", fileInfo) if err != nil { - return info, err + return err } defer f.Close() if _, err := io.Copy(f, rd); err != nil { - return info, err + return err } else if err := f.Sync(); err != nil { - return info, err + return err } else if err := f.Close(); err != nil { - return info, err + return err } // Build metadata. fi, err := os.Stat(filename + ".tmp") if err != nil { - return info, err - } - info = litestream.SnapshotInfo{ - Generation: generation, - Index: index, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), + return err } + info.Size = fi.Size() + info.CreatedAt = fi.ModTime().UTC() + // Move snapshot to final path when it has been fully written & synced to disk. if err := os.Rename(filename+".tmp", filename); err != nil { - return info, err + return err } - return info, nil + return nil } // SnapshotReader returns a reader for snapshot data at the given generation/index. // Returns os.ErrNotExist if no matching index is found. -func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { - filename, err := c.SnapshotPath(generation, index) +func (c *ReplicaClient) SnapshotReader(ctx context.Context, info litestream.SnapshotInfo) (io.ReadCloser, error) { + filename, err := c.SnapshotPath(info) if err != nil { return nil, fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -245,8 +242,8 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i } // DeleteSnapshot deletes a snapshot with the given generation & index. -func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { - filename, err := c.SnapshotPath(generation, index) +func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, info litestream.SnapshotInfo) error { + filename, err := c.SnapshotPath(info) if err != nil { return fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -279,29 +276,29 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit // Iterate over every file and convert to metadata. infos := make([]litestream.WALSegmentInfo, 0, len(fis)) for _, fi := range fis { + info := litestream.WALSegmentInfo{ + Generation: generation, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + } + // Parse index from filename. - index, offset, err := litestream.ParseWALSegmentPath(fi.Name()) + err := info.ParsePath(fi.Name()) if err != nil { continue } - infos = append(infos, litestream.WALSegmentInfo{ - Generation: generation, - Index: index, - Offset: offset, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), - }) + infos = append(infos, info) } return litestream.NewWALSegmentInfoSliceIterator(infos), nil } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. -func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { - filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) +func (c *ReplicaClient) WriteWALSegment(ctx context.Context, info *litestream.WALSegmentInfo, rd io.Reader) error { + filename, err := c.WALSegmentPath(*info) if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) + return fmt.Errorf("cannot determine wal segment path: %w", err) } var fileInfo, dirInfo os.FileInfo @@ -311,49 +308,45 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, // Ensure parent directory exists. if err := internal.MkdirAll(filepath.Dir(filename), dirInfo); err != nil { - return info, err + return err } // Write WAL segment to temporary file next to destination path. f, err := internal.CreateFile(filename+".tmp", fileInfo) if err != nil { - return info, err + return err } defer f.Close() if _, err := io.Copy(f, rd); err != nil { - return info, err + return err } else if err := f.Sync(); err != nil { - return info, err + return err } else if err := f.Close(); err != nil { - return info, err + return err } // Build metadata. fi, err := os.Stat(filename + ".tmp") if err != nil { - return info, err - } - info = litestream.WALSegmentInfo{ - Generation: pos.Generation, - Index: pos.Index, - Offset: pos.Offset, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), + return err } + info.Size = fi.Size() + info.CreatedAt = fi.ModTime().UTC() + // Move WAL segment to final path when it has been written & synced to disk. if err := os.Rename(filename+".tmp", filename); err != nil { - return info, err + return err } - return info, nil + return nil } // WALSegmentReader returns a reader for a section of WAL data at the given position. // Returns os.ErrNotExist if no matching index/offset is found. -func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { - filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) +func (c *ReplicaClient) WALSegmentReader(ctx context.Context, info litestream.WALSegmentInfo) (io.ReadCloser, error) { + filename, err := c.WALSegmentPath(info) if err != nil { return nil, fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -361,9 +354,9 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos } // DeleteWALSegments deletes WAL segments at the given positions. -func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error { - for _, pos := range a { - filename, err := c.WALSegmentPath(pos.Generation, pos.Index, pos.Offset) +func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.WALSegmentInfo) error { + for _, info := range a { + filename, err := c.WALSegmentPath(info) if err != nil { return fmt.Errorf("cannot determine wal segment path: %w", err) } diff --git a/file/replica_client_test.go b/file/replica_client_test.go index 94d2e447..870c741e 100644 --- a/file/replica_client_test.go +++ b/file/replica_client_test.go @@ -3,6 +3,7 @@ package file_test import ( "testing" + "github.com/benbjohnson/litestream" "github.com/benbjohnson/litestream/file" ) @@ -76,19 +77,31 @@ func TestReplicaClient_SnapshotsDir(t *testing.T) { func TestReplicaClient_SnapshotPath(t *testing.T) { t.Run("OK", func(t *testing.T) { - if got, err := file.NewReplicaClient("/foo").SnapshotPath("0123456701234567", 1000); err != nil { + info := litestream.SnapshotInfo{ + Generation: "0123456701234567", + Index: 1000, + Compression: litestream.CompressionLZ4, + } + if got, err := file.NewReplicaClient("/foo").SnapshotPath(info); err != nil { t.Fatal(err) } else if want := "/foo/generations/0123456701234567/snapshots/000003e8.snapshot.lz4"; got != want { t.Fatalf("SnapshotPath()=%v, want %v", got, want) } }) t.Run("ErrNoPath", func(t *testing.T) { - if _, err := file.NewReplicaClient("").SnapshotPath("0123456701234567", 1000); err == nil || err.Error() != `file replica path required` { + info := litestream.SnapshotInfo{ + Generation: "0123456701234567", + Index: 1000, + } + if _, err := file.NewReplicaClient("").SnapshotPath(info); err == nil || err.Error() != `file replica path required` { t.Fatalf("unexpected error: %v", err) } }) t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := file.NewReplicaClient("/foo").SnapshotPath("", 1000); err == nil || err.Error() != `generation required` { + info := litestream.SnapshotInfo{ + Index: 1000, + } + if _, err := file.NewReplicaClient("/foo").SnapshotPath(info); err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) @@ -116,19 +129,32 @@ func TestReplicaClient_WALDir(t *testing.T) { func TestReplicaClient_WALSegmentPath(t *testing.T) { t.Run("OK", func(t *testing.T) { - if got, err := file.NewReplicaClient("/foo").WALSegmentPath("0123456701234567", 1000, 1001); err != nil { + info := litestream.WALSegmentInfo{ + Generation: "0123456701234567", + Index: 1000, + Offset: 1001, + Compression: litestream.CompressionLZ4, + } + if got, err := file.NewReplicaClient("/foo").WALSegmentPath(info); err != nil { t.Fatal(err) } else if want := "/foo/generations/0123456701234567/wal/000003e8_000003e9.wal.lz4"; got != want { t.Fatalf("WALPath()=%v, want %v", got, want) } }) t.Run("ErrNoPath", func(t *testing.T) { - if _, err := file.NewReplicaClient("").WALSegmentPath("0123456701234567", 1000, 0); err == nil || err.Error() != `file replica path required` { + info := litestream.WALSegmentInfo{ + Generation: "0123456701234567", + Index: 1000, + } + if _, err := file.NewReplicaClient("").WALSegmentPath(info); err == nil || err.Error() != `file replica path required` { t.Fatalf("unexpected error: %v", err) } }) t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := file.NewReplicaClient("/foo").WALSegmentPath("", 1000, 0); err == nil || err.Error() != `generation required` { + info := litestream.WALSegmentInfo{ + Index: 1000, + } + if _, err := file.NewReplicaClient("/foo").WALSegmentPath(info); err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) diff --git a/gcs/replica_client.go b/gcs/replica_client.go index 7b2b2c67..7c1fa2ab 100644 --- a/gcs/replica_client.go +++ b/gcs/replica_client.go @@ -139,14 +139,14 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites } // WriteSnapshot writes LZ4 compressed data from rd to the object storage. -func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { +func (c *ReplicaClient) WriteSnapshot(ctx context.Context, info *litestream.SnapshotInfo, rd io.Reader) error { if err := c.Init(ctx); err != nil { - return info, err + return err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, *info) if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) + return fmt.Errorf("cannot determine snapshot path: %w", err) } startTime := time.Now() @@ -155,9 +155,9 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in n, err := io.Copy(w, rd) if err != nil { - return info, err + return err } else if err := w.Close(); err != nil { - return info, err + return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() @@ -165,21 +165,18 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) - return litestream.SnapshotInfo{ - Generation: generation, - Index: index, - Size: n, - CreatedAt: startTime.UTC(), - }, nil + info.Size = n + info.CreatedAt = startTime.UTC() + return nil } // SnapshotReader returns a reader for snapshot data at the given generation/index. -func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { +func (c *ReplicaClient) SnapshotReader(ctx context.Context, info litestream.SnapshotInfo) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, info) if err != nil { return nil, fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -198,12 +195,12 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i } // DeleteSnapshot deletes a snapshot with the given generation & index. -func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { +func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, info litestream.SnapshotInfo) error { if err := c.Init(ctx); err != nil { return err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, info) if err != nil { return fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -229,14 +226,14 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. -func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { +func (c *ReplicaClient) WriteWALSegment(ctx context.Context, info *litestream.WALSegmentInfo, rd io.Reader) error { if err := c.Init(ctx); err != nil { - return info, err + return err } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, *info) if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) + return fmt.Errorf("cannot determine wal segment path: %w", err) } startTime := time.Now() @@ -245,31 +242,27 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, n, err := io.Copy(w, rd) if err != nil { - return info, err + return err } else if err := w.Close(); err != nil { - return info, err + return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n)) - return litestream.WALSegmentInfo{ - Generation: pos.Generation, - Index: pos.Index, - Offset: pos.Offset, - Size: n, - CreatedAt: startTime.UTC(), - }, nil + info.Size = n + info.CreatedAt = startTime.UTC() + return nil } // WALSegmentReader returns a reader for a section of WAL data at the given index. // Returns os.ErrNotExist if no matching index/offset is found. -func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { +func (c *ReplicaClient) WALSegmentReader(ctx context.Context, info litestream.WALSegmentInfo) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, info) if err != nil { return nil, fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -288,13 +281,13 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos } // DeleteWALSegments deletes WAL segments with at the given positions. -func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error { +func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.WALSegmentInfo) error { if err := c.Init(ctx); err != nil { return err } - for _, pos := range a { - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + for _, info := range a { + key, err := litestream.WALSegmentPath(c.Path, info) if err != nil { return fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -343,19 +336,20 @@ func (itr *snapshotIterator) Next() bool { return false } + info := litestream.SnapshotInfo{ + Generation: itr.generation, + Size: attrs.Size, + CreatedAt: attrs.Created.UTC(), + } + // Parse index, otherwise skip to the next object. - index, err := litestream.ParseSnapshotPath(path.Base(attrs.Name)) + err = info.ParsePath(path.Base(attrs.Name)) if err != nil { continue } // Store current snapshot and return. - itr.info = litestream.SnapshotInfo{ - Generation: itr.generation, - Index: index, - Size: attrs.Size, - CreatedAt: attrs.Created.UTC(), - } + itr.info = info return true } } @@ -399,20 +393,20 @@ func (itr *walSegmentIterator) Next() bool { return false } + info := litestream.WALSegmentInfo{ + Generation: itr.generation, + Size: attrs.Size, + CreatedAt: attrs.Created.UTC(), + } + // Parse index & offset, otherwise skip to the next object. - index, offset, err := litestream.ParseWALSegmentPath(path.Base(attrs.Name)) + err = info.ParsePath(path.Base(attrs.Name)) if err != nil { continue } // Store current snapshot and return. - itr.info = litestream.WALSegmentInfo{ - Generation: itr.generation, - Index: index, - Offset: offset, - Size: attrs.Size, - CreatedAt: attrs.Created.UTC(), - } + itr.info = info return true } } diff --git a/litestream.go b/litestream.go index 95e97d50..eb87c922 100644 --- a/litestream.go +++ b/litestream.go @@ -23,8 +23,8 @@ const ( WALDirName = "wal" WALExt = ".wal" - WALSegmentExt = ".wal.lz4" - SnapshotExt = ".snapshot.lz4" + SnapshotExt = ".snapshot" + EncryptionExt = ".age" GenerationNameLen = 16 ) @@ -37,6 +37,35 @@ const ( CheckpointModeTruncate = "TRUNCATE" ) +const ( + CompressionNone Compression = "" + CompressionLZ4 Compression = "lz4" +) + +type Compression string + +func (c Compression) IsValid() bool { + return c == CompressionNone || c == CompressionLZ4 +} + +func (c Compression) Ext() string { + if c.IsValid() && c != "" { + return "." + string(c) + } + + return "" +} + +type Encryption bool + +func (e Encryption) Ext() string { + if e { + return ".age" + } + + return "" +} + // Litestream errors. var ( ErrNoGeneration = errors.New("no generation available") @@ -189,10 +218,42 @@ func (itr *WALSegmentInfoSliceIterator) WALSegment() WALSegmentInfo { // SnapshotInfo represents file information about a snapshot. type SnapshotInfo struct { - Generation string - Index int - Size int64 - CreatedAt time.Time + Generation string + Index int + Compression Compression + Encryption Encryption + Size int64 + CreatedAt time.Time +} + +func (info *SnapshotInfo) FormatPath() string { + assert(info.Index >= 0, "snapshot index must be non-negative") + return fmt.Sprintf("%08x%s%s%s", info.Index, SnapshotExt, info.Compression.Ext(), info.Encryption.Ext()) +} + +var snapshotPathRegexExt = regexp.MustCompile(`^([0-9a-f]{8})\.snapshot(\.([0-9a-z]+))?(\.age)?$`) + +// ParseSnapshotPath returns the index for the snapshot. +// Returns an error if the path is not a valid snapshot path. +func (info *SnapshotInfo) ParsePath(s string) error { + s = filepath.Base(s) + + a := snapshotPathRegexExt.FindStringSubmatch(s) + if a == nil { + return fmt.Errorf("invalid snapshot path: %s", s) + } + + i64, _ := strconv.ParseUint(a[1], 16, 64) + info.Index = int(i64) + + info.Compression = Compression(a[3]) + info.Encryption = Encryption(len(a[4]) > 0) + + if !info.Compression.IsValid() { + return fmt.Errorf("invalid snapshot compression: %s", s) + } + + return nil } // Pos returns the WAL position when the snapshot was made. @@ -263,11 +324,43 @@ func (a WALInfoSlice) Less(i, j int) bool { // WALSegmentInfo represents file information about a WAL segment file. type WALSegmentInfo struct { - Generation string - Index int - Offset int64 - Size int64 - CreatedAt time.Time + Generation string + Index int + Offset int64 + Compression Compression + Encryption Encryption + Size int64 + CreatedAt time.Time +} + +func (info *WALSegmentInfo) FormatPath() string { + assert(info.Index >= 0, "wal index must be non-negative") + assert(info.Offset >= 0, "wal offset must be non-negative") + return fmt.Sprintf("%08x_%08x%s%s%s", info.Index, info.Offset, WALExt, info.Compression.Ext(), info.Encryption.Ext()) +} + +var walSegmentPathRegexExt = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))\.wal(\.([0-9a-z]+))?(\.age)?$`) + +func (info *WALSegmentInfo) ParsePath(s string) error { + s = filepath.Base(s) + + a := walSegmentPathRegexExt.FindStringSubmatch(s) + if a == nil { + return fmt.Errorf("invalid wal segment path: %s", s) + } + + i64, _ := strconv.ParseUint(a[1], 16, 64) + info.Index = int(i64) + info.Offset, _ = strconv.ParseInt(a[2], 16, 64) + + info.Compression = Compression(a[4]) + info.Encryption = Encryption(len(a[5]) > 0) + + if !info.Compression.IsValid() { + return fmt.Errorf("invalid snapshot compression: %s", s) + } + + return nil } // Pos returns the WAL position when the segment was made. @@ -431,12 +524,12 @@ func SnapshotsPath(root, generation string) (string, error) { } // SnapshotPath returns the path to an uncompressed snapshot file. -func SnapshotPath(root, generation string, index int) (string, error) { - dir, err := SnapshotsPath(root, generation) +func SnapshotPath(root string, info SnapshotInfo) (string, error) { + dir, err := SnapshotsPath(root, info.Generation) if err != nil { return "", err } - return path.Join(dir, FormatSnapshotPath(index)), nil + return path.Join(dir, info.FormatPath()), nil } // WALPath returns the path to a generation's WAL directory @@ -449,41 +542,19 @@ func WALPath(root, generation string) (string, error) { } // WALSegmentPath returns the path to a WAL segment file. -func WALSegmentPath(root, generation string, index int, offset int64) (string, error) { - dir, err := WALPath(root, generation) +func WALSegmentPath(root string, info WALSegmentInfo) (string, error) { + dir, err := WALPath(root, info.Generation) if err != nil { return "", err } - return path.Join(dir, FormatWALSegmentPath(index, offset)), nil + return path.Join(dir, info.FormatPath()), nil } // IsSnapshotPath returns true if s is a path to a snapshot file. func IsSnapshotPath(s string) bool { - return snapshotPathRegex.MatchString(s) -} - -// ParseSnapshotPath returns the index for the snapshot. -// Returns an error if the path is not a valid snapshot path. -func ParseSnapshotPath(s string) (index int, err error) { - s = filepath.Base(s) - - a := snapshotPathRegex.FindStringSubmatch(s) - if a == nil { - return 0, fmt.Errorf("invalid snapshot path: %s", s) - } - - i64, _ := strconv.ParseUint(a[1], 16, 64) - return int(i64), nil + return snapshotPathRegexExt.MatchString(s) } -// FormatSnapshotPath formats a snapshot filename with a given index. -func FormatSnapshotPath(index int) string { - assert(index >= 0, "snapshot index must be non-negative") - return fmt.Sprintf("%08x%s", index, SnapshotExt) -} - -var snapshotPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.snapshot\.lz4$`) - // IsWALPath returns true if s is a path to a WAL file. func IsWALPath(s string) bool { return walPathRegex.MatchString(s) @@ -511,30 +582,6 @@ func FormatWALPath(index int) string { var walPathRegex = regexp.MustCompile(`^([0-9a-f]{8})\.wal$`) -// ParseWALSegmentPath returns the index & offset for the WAL segment file. -// Returns an error if the path is not a valid wal segment path. -func ParseWALSegmentPath(s string) (index int, offset int64, err error) { - s = filepath.Base(s) - - a := walSegmentPathRegex.FindStringSubmatch(s) - if a == nil { - return 0, 0, fmt.Errorf("invalid wal segment path: %s", s) - } - - i64, _ := strconv.ParseUint(a[1], 16, 64) - off64, _ := strconv.ParseUint(a[2], 16, 64) - return int(i64), int64(off64), nil -} - -// FormatWALSegmentPath formats a WAL segment filename with a given index & offset. -func FormatWALSegmentPath(index int, offset int64) string { - assert(index >= 0, "wal index must be non-negative") - assert(offset >= 0, "wal offset must be non-negative") - return fmt.Sprintf("%08x_%08x%s", index, offset, WALSegmentExt) -} - -var walSegmentPathRegex = regexp.MustCompile(`^([0-9a-f]{8})(?:_([0-9a-f]{8}))\.wal\.lz4$`) - // isHexChar returns true if ch is a lowercase hex character. func isHexChar(ch rune) bool { return (ch >= '0' && ch <= '9') || (ch >= 'a' && ch <= 'f') diff --git a/litestream_test.go b/litestream_test.go index b4f7d3e7..ec403902 100644 --- a/litestream_test.go +++ b/litestream_test.go @@ -85,14 +85,22 @@ func TestSnapshotsPath(t *testing.T) { func TestSnapshotPath(t *testing.T) { t.Run("OK", func(t *testing.T) { - if got, err := litestream.SnapshotPath("foo", "0123456701234567", 1000); err != nil { + info := litestream.SnapshotInfo{ + Generation: "0123456701234567", + Index: 1000, + Compression: litestream.CompressionLZ4, + } + if got, err := litestream.SnapshotPath("foo", info); err != nil { t.Fatal(err) } else if want := "foo/generations/0123456701234567/snapshots/000003e8.snapshot.lz4"; got != want { t.Fatalf("SnapshotPath()=%v, want %v", got, want) } }) t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := litestream.SnapshotPath("foo", "", 1000); err == nil || err.Error() != `generation required` { + info := litestream.SnapshotInfo{ + Index: 1000, + } + if _, err := litestream.SnapshotPath("foo", info); err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) @@ -115,14 +123,23 @@ func TestWALPath(t *testing.T) { func TestWALSegmentPath(t *testing.T) { t.Run("OK", func(t *testing.T) { - if got, err := litestream.WALSegmentPath("foo", "0123456701234567", 1000, 1001); err != nil { + info := litestream.WALSegmentInfo{ + Generation: "0123456701234567", + Index: 1000, + Offset: 1001, + Compression: litestream.CompressionLZ4, + } + if got, err := litestream.WALSegmentPath("foo", info); err != nil { t.Fatal(err) } else if want := "foo/generations/0123456701234567/wal/000003e8_000003e9.wal.lz4"; got != want { t.Fatalf("WALPath()=%v, want %v", got, want) } }) t.Run("ErrNoGeneration", func(t *testing.T) { - if _, err := litestream.WALSegmentPath("foo", "", 1000, 0); err == nil || err.Error() != `generation required` { + info := litestream.WALSegmentInfo{ + Index: 1000, + } + if _, err := litestream.WALSegmentPath("foo", info); err == nil || err.Error() != `generation required` { t.Fatalf("unexpected error: %v", err) } }) diff --git a/mock/replica_client.go b/mock/replica_client.go index a8bd998c..26a5d191 100644 --- a/mock/replica_client.go +++ b/mock/replica_client.go @@ -13,13 +13,13 @@ type ReplicaClient struct { GenerationsFunc func(ctx context.Context) ([]string, error) DeleteGenerationFunc func(ctx context.Context, generation string) error SnapshotsFunc func(ctx context.Context, generation string) (litestream.SnapshotIterator, error) - WriteSnapshotFunc func(ctx context.Context, generation string, index int, r io.Reader) (litestream.SnapshotInfo, error) - DeleteSnapshotFunc func(ctx context.Context, generation string, index int) error - SnapshotReaderFunc func(ctx context.Context, generation string, index int) (io.ReadCloser, error) + WriteSnapshotFunc func(ctx context.Context, info *litestream.SnapshotInfo, r io.Reader) error + DeleteSnapshotFunc func(ctx context.Context, info litestream.SnapshotInfo) error + SnapshotReaderFunc func(ctx context.Context, info litestream.SnapshotInfo) (io.ReadCloser, error) WALSegmentsFunc func(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) - WriteWALSegmentFunc func(ctx context.Context, pos litestream.Pos, r io.Reader) (litestream.WALSegmentInfo, error) - DeleteWALSegmentsFunc func(ctx context.Context, a []litestream.Pos) error - WALSegmentReaderFunc func(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) + WriteWALSegmentFunc func(ctx context.Context, info *litestream.WALSegmentInfo, r io.Reader) error + DeleteWALSegmentsFunc func(ctx context.Context, a []litestream.WALSegmentInfo) error + WALSegmentReaderFunc func(ctx context.Context, info litestream.WALSegmentInfo) (io.ReadCloser, error) } func (c *ReplicaClient) Type() string { return "mock" } @@ -36,30 +36,30 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites return c.SnapshotsFunc(ctx, generation) } -func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, r io.Reader) (litestream.SnapshotInfo, error) { - return c.WriteSnapshotFunc(ctx, generation, index, r) +func (c *ReplicaClient) WriteSnapshot(ctx context.Context, info *litestream.SnapshotInfo, r io.Reader) error { + return c.WriteSnapshotFunc(ctx, info, r) } -func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { - return c.DeleteSnapshotFunc(ctx, generation, index) +func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, info litestream.SnapshotInfo) error { + return c.DeleteSnapshotFunc(ctx, info) } -func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { - return c.SnapshotReaderFunc(ctx, generation, index) +func (c *ReplicaClient) SnapshotReader(ctx context.Context, info litestream.SnapshotInfo) (io.ReadCloser, error) { + return c.SnapshotReaderFunc(ctx, info) } func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (litestream.WALSegmentIterator, error) { return c.WALSegmentsFunc(ctx, generation) } -func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, r io.Reader) (litestream.WALSegmentInfo, error) { - return c.WriteWALSegmentFunc(ctx, pos, r) +func (c *ReplicaClient) WriteWALSegment(ctx context.Context, info *litestream.WALSegmentInfo, r io.Reader) error { + return c.WriteWALSegmentFunc(ctx, info, r) } -func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error { +func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.WALSegmentInfo) error { return c.DeleteWALSegmentsFunc(ctx, a) } -func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { - return c.WALSegmentReaderFunc(ctx, pos) +func (c *ReplicaClient) WALSegmentReader(ctx context.Context, info litestream.WALSegmentInfo) (io.ReadCloser, error) { + return c.WALSegmentReaderFunc(ctx, info) } diff --git a/replica.go b/replica.go index a5daf2dd..b725a598 100644 --- a/replica.go +++ b/replica.go @@ -88,6 +88,25 @@ func NewReplica(db *DB, name string) *Replica { return r } +func (r *Replica) NewSnapshotInfo(generation string, index int) SnapshotInfo { + return SnapshotInfo{ + Generation: generation, + Index: index, + Compression: CompressionLZ4, + Encryption: len(r.AgeRecipients) > 0, + } +} + +func (r *Replica) NewWALSegmentInfo(generation string, index int, offset int64) WALSegmentInfo { + return WALSegmentInfo{ + Generation: generation, + Index: index, + Offset: offset, + Compression: CompressionLZ4, + Encryption: len(r.AgeRecipients) > 0, + } +} + // Name returns the name of the replica. func (r *Replica) Name() string { if r.name == "" && r.Client != nil { @@ -229,6 +248,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { initialPos := pos startTime := time.Now() var bytesWritten int + info := r.NewWALSegmentInfo(pos.Generation, pos.Index, pos.Offset) logger := r.Logger() logger.Info("write wal segment", "position", initialPos.String()) @@ -236,7 +256,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { // Copy through pipe into client from the starting position. var g errgroup.Group g.Go(func() error { - _, err := r.Client.WriteWALSegment(ctx, pos, pr) + err := r.Client.WriteWALSegment(ctx, &info, pr) // Always close pipe reader to signal writers. if e := pr.CloseWithError(err); err == nil { @@ -246,20 +266,23 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { return err }) - var ew io.WriteCloser = pw + var wc io.WriteCloser = pw // Add encryption if we have recipients. - if len(r.AgeRecipients) > 0 { + if info.Encryption { var err error - ew, err = age.Encrypt(pw, r.AgeRecipients...) + wc, err = age.Encrypt(pw, r.AgeRecipients...) if err != nil { return err } - defer ew.Close() + defer wc.Close() } // Wrap writer to LZ4 compress. - zw := lz4.NewWriter(ew) + if info.Compression == CompressionLZ4 { + wc = lz4.NewWriter(wc) + defer wc.Close() + } // Track total WAL bytes written to replica client. walBytesCounter := replicaWALBytesCounterVec.WithLabelValues(r.db.Path(), r.Name()) @@ -274,7 +297,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { psalt = binary.BigEndian.Uint64(buf[16:24]) - n, err := zw.Write(buf) + n, err := wc.Write(buf) if err != nil { return err } @@ -301,7 +324,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { } psalt = salt - n, err := zw.Write(buf) + n, err := wc.Write(buf) if err != nil { return err } @@ -309,10 +332,8 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { bytesWritten += n } - // Flush LZ4 writer, encryption writer and close pipe. - if err := zw.Close(); err != nil { - return err - } else if err := ew.Close(); err != nil { + // Flush writers and close pipe. + if err := wc.Close(); err != nil { return err } else if err := pw.Close(); err != nil { return err @@ -332,7 +353,7 @@ func (r *Replica) syncWAL(ctx context.Context) (err error) { replicaWALIndexGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Index)) replicaWALOffsetGaugeVec.WithLabelValues(r.db.Path(), r.Name()).Set(float64(rd.Pos().Offset)) - logger.Info("wal segment written", "position", initialPos.String(), "elapsed", time.Since(startTime).String(), "sz", bytesWritten) + logger.Info("wal segment written", "position", initialPos.String(), "elapsed", time.Since(startTime).String(), "sz", bytesWritten, "compression", info.Compression, "encryption", info.Encryption) return nil } @@ -370,13 +391,13 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err } // Read segment to determine size to add to offset. - rd, err := r.Client.WALSegmentReader(ctx, segment.Pos()) + rd, err := r.Client.WALSegmentReader(ctx, *segment) if err != nil { return pos, fmt.Errorf("wal segment reader: %w", err) } defer rd.Close() - if len(r.AgeIdentities) > 0 { + if segment.Encryption { drd, err := age.Decrypt(rd, r.AgeIdentities...) if err != nil { return pos, err @@ -385,7 +406,11 @@ func (r *Replica) calcPos(ctx context.Context, generation string) (pos Pos, err rd = io.NopCloser(drd) } - n, err := io.Copy(io.Discard, lz4.NewReader(rd)) + if segment.Compression == CompressionLZ4 { + rd = io.NopCloser(lz4.NewReader(rd)) + } + + n, err := io.Copy(io.Discard, rd) if err != nil { return pos, err } @@ -509,6 +534,9 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { return info, ErrNoGeneration } + // Create new snapshot info with replica settings. + info = r.NewSnapshotInfo(pos.Generation, pos.Index) + // Open db file descriptor, if not already open, & position at beginning. if r.f == nil { if r.f, err = os.Open(r.db.Path()); err != nil { @@ -531,7 +559,7 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { var wc io.WriteCloser = pw // Add encryption if we have recipients. - if len(r.AgeRecipients) > 0 { + if info.Encryption { var err error wc, err = age.Encrypt(pw, r.AgeRecipients...) if err != nil { @@ -541,13 +569,13 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { defer wc.Close() } - zr := lz4.NewWriter(wc) - defer zr.Close() + // Add compression if configured. + if info.Compression == CompressionLZ4 { + wc = lz4.NewWriter(wc) + defer wc.Close() + } - if _, err := io.Copy(zr, r.f); err != nil { - pw.CloseWithError(err) - return err - } else if err := zr.Close(); err != nil { + if _, err := io.Copy(wc, r.f); err != nil { pw.CloseWithError(err) return err } @@ -559,13 +587,13 @@ func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error) { startTime := time.Now() // Delegate write to client & wait for writer goroutine to finish. - if info, err = r.Client.WriteSnapshot(ctx, pos.Generation, pos.Index, pr); err != nil { + if err = r.Client.WriteSnapshot(ctx, &info, pr); err != nil { return info, err } else if err := g.Wait(); err != nil { return info, err } - logger.Info("snapshot written", "position", pos.String(), "elapsed", time.Since(startTime).String(), "sz", info.Size) + logger.Info("snapshot written", "position", pos.String(), "elapsed", time.Since(startTime).String(), "sz", info.Size, "compression", info.Compression, "encryption", info.Encryption) return info, nil } @@ -629,7 +657,7 @@ func (r *Replica) deleteSnapshotsBeforeIndex(ctx context.Context, generation str continue } - if err := r.Client.DeleteSnapshot(ctx, info.Generation, info.Index); err != nil { + if err := r.Client.DeleteSnapshot(ctx, info); err != nil { return fmt.Errorf("delete snapshot %s/%08x: %w", info.Generation, info.Index, err) } r.Logger().Info("snapshot deleted", "generation", generation, "index", index) @@ -645,13 +673,13 @@ func (r *Replica) deleteWALSegmentsBeforeIndex(ctx context.Context, generation s } defer itr.Close() - var a []Pos + var a []WALSegmentInfo for itr.Next() { info := itr.WALSegment() if info.Index >= index { continue } - a = append(a, info.Pos()) + a = append(a, info) } if err := itr.Close(); err != nil { return err @@ -1075,16 +1103,17 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { } // Find lastest snapshot that occurs before timestamp or index. - var minWALIndex int + var snapshot SnapshotInfo if opt.Index < math.MaxInt32 { - if minWALIndex, err = r.SnapshotIndexByIndex(ctx, opt.Generation, opt.Index); err != nil { + if snapshot, err = r.SnapshotIndexByIndex(ctx, opt.Generation, opt.Index); err != nil { return fmt.Errorf("cannot find snapshot index: %w", err) } } else { - if minWALIndex, err = r.SnapshotIndexAt(ctx, opt.Generation, opt.Timestamp); err != nil { + if snapshot, err = r.SnapshotIndexAt(ctx, opt.Generation, opt.Timestamp); err != nil { return fmt.Errorf("cannot find snapshot index by timestamp: %w", err) } } + minWALIndex := snapshot.Index // Compute list of offsets for each WAL index. walSegmentMap, err := r.walSegmentMap(ctx, opt.Generation, minWALIndex, opt.Index, opt.Timestamp) @@ -1109,12 +1138,11 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { snapshotOnly := maxWALIndex == -1 // Initialize starting position. - pos := Pos{Generation: opt.Generation, Index: minWALIndex} tmpPath := opt.OutputPath + ".tmp" // Copy snapshot to output path. r.Logger().Info("restoring snapshot", "generation", opt.Generation, "index", minWALIndex, "path", tmpPath) - if err := r.restoreSnapshot(ctx, pos.Generation, pos.Index, tmpPath); err != nil { + if err := r.restoreSnapshot(ctx, snapshot, tmpPath); err != nil { return fmt.Errorf("cannot restore snapshot: %w", err) } @@ -1165,7 +1193,7 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) { startTime := time.Now() - err := r.downloadWAL(ctx, opt.Generation, index, walSegmentMap[index], tmpPath) + err := r.downloadWAL(ctx, index, walSegmentMap[index], tmpPath) if err != nil { err = fmt.Errorf("cannot download wal %s/%08x: %w", opt.Generation, index, err) } @@ -1235,15 +1263,13 @@ type walRestoreState struct { // SnapshotIndexAt returns the highest index for a snapshot within a generation // that occurs before timestamp. If timestamp is zero, returns the latest snapshot. -func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error) { +func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (info SnapshotInfo, err error) { itr, err := r.Client.Snapshots(ctx, generation) if err != nil { - return 0, err + return info, err } defer itr.Close() - snapshotIndex := -1 - var max time.Time for itr.Next() { snapshot := itr.Snapshot() if !timestamp.IsZero() && snapshot.CreatedAt.After(timestamp) { @@ -1251,28 +1277,27 @@ func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timest } // Use snapshot if it newer. - if max.IsZero() || snapshot.CreatedAt.After(max) { - snapshotIndex, max = snapshot.Index, snapshot.CreatedAt + if info.CreatedAt.IsZero() || snapshot.CreatedAt.After(info.CreatedAt) { + info = snapshot } } if err := itr.Close(); err != nil { - return 0, err - } else if snapshotIndex == -1 { - return 0, ErrNoSnapshots + return info, err + } else if info.CreatedAt.IsZero() { + return info, ErrNoSnapshots } - return snapshotIndex, nil + return info, nil } // SnapshotIndexbyIndex returns the highest index for a snapshot within a generation // that occurs before a given index. If index is MaxInt32, returns the latest snapshot. -func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, index int) (int, error) { +func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, index int) (info SnapshotInfo, err error) { itr, err := r.Client.Snapshots(ctx, generation) if err != nil { - return 0, err + return info, err } defer itr.Close() - snapshotIndex := -1 for itr.Next() { snapshot := itr.Snapshot() @@ -1281,21 +1306,21 @@ func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, i } // Use snapshot if it newer. - if snapshotIndex == -1 || snapshot.Index >= snapshotIndex { - snapshotIndex = snapshot.Index + if info.CreatedAt.IsZero() || snapshot.Index >= info.Index { + info = snapshot } } if err := itr.Close(); err != nil { - return 0, err - } else if snapshotIndex == -1 { - return 0, ErrNoSnapshots + return info, err + } else if info.CreatedAt.IsZero() { + return info, ErrNoSnapshots } - return snapshotIndex, nil + return info, nil } // walSegmentMap returns a map of WAL indices to their segments. // Filters by a max timestamp or a max index. -func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex, maxIndex int, maxTimestamp time.Time) (map[int][]int64, error) { +func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex, maxIndex int, maxTimestamp time.Time) (map[int][]WALSegmentInfo, error) { itr, err := r.Client.WALSegments(ctx, generation) if err != nil { return nil, err @@ -1309,7 +1334,7 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex sort.Sort(WALSegmentInfoSlice(a)) - m := make(map[int][]int64) + m := make(map[int][]WALSegmentInfo) for _, info := range a { // Exit if we go past the max timestamp or index. if !maxTimestamp.IsZero() && info.CreatedAt.After(maxTimestamp) { @@ -1324,18 +1349,18 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex offsets := m[info.Index] if len(offsets) == 0 && info.Offset != 0 { return nil, fmt.Errorf("missing initial wal segment: generation=%s index=%08x offset=%d", generation, info.Index, info.Offset) - } else if len(offsets) > 0 && offsets[len(offsets)-1] >= info.Offset { - return nil, fmt.Errorf("wal segments out of order: generation=%s index=%08x offsets=(%d,%d)", generation, info.Index, offsets[len(offsets)-1], info.Offset) + } else if len(offsets) > 0 && offsets[len(offsets)-1].Offset >= info.Offset { + return nil, fmt.Errorf("wal segments out of order: generation=%s index=%08x offsets=(%d,%d)", generation, info.Index, offsets[len(offsets)-1].Index, info.Offset) } // Append to the end of the WAL file. - m[info.Index] = append(offsets, info.Offset) + m[info.Index] = append(offsets, info) } return m, itr.Close() } // restoreSnapshot copies a snapshot from the replica to a file. -func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index int, filename string) error { +func (r *Replica) restoreSnapshot(ctx context.Context, info SnapshotInfo, filename string) error { // Determine the user/group & mode based on the DB, if available. var fileInfo, dirInfo os.FileInfo if db := r.DB(); db != nil { @@ -1352,13 +1377,13 @@ func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index } defer f.Close() - rd, err := r.Client.SnapshotReader(ctx, generation, index) + rd, err := r.Client.SnapshotReader(ctx, info) if err != nil { return err } defer rd.Close() - if len(r.AgeIdentities) > 0 { + if info.Encryption { drd, err := age.Decrypt(rd, r.AgeIdentities...) if err != nil { return err @@ -1367,7 +1392,11 @@ func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index rd = io.NopCloser(drd) } - if _, err := io.Copy(f, lz4.NewReader(rd)); err != nil { + if info.Compression == CompressionLZ4 { + rd = io.NopCloser(lz4.NewReader(rd)) + } + + if _, err := io.Copy(f, rd); err != nil { return err } else if err := f.Sync(); err != nil { return err @@ -1378,7 +1407,7 @@ func (r *Replica) restoreSnapshot(ctx context.Context, generation string, index // downloadWAL copies a WAL file from the replica to a local copy next to the DB. // The WAL is later applied by applyWAL(). This function can be run in parallel // to download multiple WAL files simultaneously. -func (r *Replica) downloadWAL(ctx context.Context, generation string, index int, offsets []int64, dbPath string) (err error) { +func (r *Replica) downloadWAL(ctx context.Context, index int, offsets []WALSegmentInfo, dbPath string) (err error) { // Determine the user/group & mode based on the DB, if available. var fileInfo os.FileInfo if db := r.DB(); db != nil { @@ -1388,13 +1417,13 @@ func (r *Replica) downloadWAL(ctx context.Context, generation string, index int, // Open readers for every segment in the WAL file, in order. var readers []io.Reader for _, offset := range offsets { - rd, err := r.Client.WALSegmentReader(ctx, Pos{Generation: generation, Index: index, Offset: offset}) + rd, err := r.Client.WALSegmentReader(ctx, offset) if err != nil { return err } defer rd.Close() - if len(r.AgeIdentities) > 0 { + if offset.Encryption { drd, err := age.Decrypt(rd, r.AgeIdentities...) if err != nil { return err @@ -1403,7 +1432,11 @@ func (r *Replica) downloadWAL(ctx context.Context, generation string, index int, rd = io.NopCloser(drd) } - readers = append(readers, lz4.NewReader(rd)) + if offset.Compression == CompressionLZ4 { + rd = io.NopCloser(lz4.NewReader(rd)) + } + + readers = append(readers, rd) } // Open handle to destination WAL path. diff --git a/replica_client.go b/replica_client.go index 3d3f5083..bacc49bd 100644 --- a/replica_client.go +++ b/replica_client.go @@ -19,30 +19,28 @@ type ReplicaClient interface { // Returns an iterator of all snapshots within a generation on the replica. Order is undefined. Snapshots(ctx context.Context, generation string) (SnapshotIterator, error) - // Writes LZ4 compressed snapshot data to the replica at a given index - // within a generation. Returns metadata for the snapshot. - WriteSnapshot(ctx context.Context, generation string, index int, r io.Reader) (SnapshotInfo, error) + // Writes snapshot data to the replica at a given index within a generation. + // Updates passed metadata with file size and creation time. + WriteSnapshot(ctx context.Context, info *SnapshotInfo, r io.Reader) error - // Deletes a snapshot with the given generation & index. - DeleteSnapshot(ctx context.Context, generation string, index int) error + // Deletes a snapshot with the given info. + DeleteSnapshot(ctx context.Context, info SnapshotInfo) error - // Returns a reader that contains LZ4 compressed snapshot data for a - // given index within a generation. Returns an os.ErrNotFound error if - // the snapshot does not exist. - SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) + // Returns a reader that contains snapshot data for a given index within a generation. + // Returns an os.ErrNotFound error if the snapshot does not exist. + SnapshotReader(ctx context.Context, info SnapshotInfo) (io.ReadCloser, error) // Returns an iterator of all WAL segments within a generation on the replica. Order is undefined. WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) - // Writes an LZ4 compressed WAL segment at a given position. - // Returns metadata for the written segment. - WriteWALSegment(ctx context.Context, pos Pos, r io.Reader) (WALSegmentInfo, error) + // Writes an WAL segment at a given position. + // Updates passed metadata with file size and creation time. + WriteWALSegment(ctx context.Context, info *WALSegmentInfo, r io.Reader) error // Deletes one or more WAL segments at the given positions. - DeleteWALSegments(ctx context.Context, a []Pos) error + DeleteWALSegments(ctx context.Context, a []WALSegmentInfo) error - // Returns a reader that contains an LZ4 compressed WAL segment at a given - // index/offset within a generation. Returns an os.ErrNotFound error if the - // WAL segment does not exist. - WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error) + // Returns a reader that contains a WAL segment at a given index/offset within a generation. + // Returns an os.ErrNotFound error if the WAL segment does not exist. + WALSegmentReader(ctx context.Context, info WALSegmentInfo) (io.ReadCloser, error) } diff --git a/replica_client_test.go b/replica_client_test.go index cf6079b2..8e426341 100644 --- a/replica_client_test.go +++ b/replica_client_test.go @@ -67,11 +67,11 @@ func TestReplicaClient_Generations(t *testing.T) { t.Parallel() // Write snapshots. - if _, err := c.WriteSnapshot(context.Background(), "5efbd8d042012dca", 0, strings.NewReader(`foo`)); err != nil { + if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{Generation: "5efbd8d042012dca"}, strings.NewReader(`foo`)); err != nil { t.Fatal(err) - } else if _, err := c.WriteSnapshot(context.Background(), "b16ddcf5c697540f", 0, strings.NewReader(`bar`)); err != nil { + } else if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{Generation: "b16ddcf5c697540f"}, strings.NewReader(`bar`)); err != nil { t.Fatal(err) - } else if _, err := c.WriteSnapshot(context.Background(), "155fe292f8333c72", 0, strings.NewReader(`baz`)); err != nil { + } else if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{Generation: "155fe292f8333c72"}, strings.NewReader(`baz`)); err != nil { t.Fatal(err) } @@ -103,11 +103,11 @@ func TestReplicaClient_Snapshots(t *testing.T) { t.Parallel() // Write snapshots. - if _, err := c.WriteSnapshot(context.Background(), "5efbd8d042012dca", 1, strings.NewReader(``)); err != nil { + if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{Generation: "5efbd8d042012dca", Index: 1}, strings.NewReader(``)); err != nil { t.Fatal(err) - } else if _, err := c.WriteSnapshot(context.Background(), "b16ddcf5c697540f", 5, strings.NewReader(`x`)); err != nil { + } else if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{Generation: "b16ddcf5c697540f", Index: 5}, strings.NewReader(`x`)); err != nil { t.Fatal(err) - } else if _, err := c.WriteSnapshot(context.Background(), "b16ddcf5c697540f", 10, strings.NewReader(`xyz`)); err != nil { + } else if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{Generation: "b16ddcf5c697540f", Index: 10}, strings.NewReader(`xyz`)); err != nil { t.Fatal(err) } @@ -186,11 +186,11 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteSnapshot(context.Background(), "b16ddcf5c697540f", 1000, strings.NewReader(`foobar`)); err != nil { + if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{Generation: "b16ddcf5c697540f", Index: 1000}, strings.NewReader(`foobar`)); err != nil { t.Fatal(err) } - if r, err := c.SnapshotReader(context.Background(), "b16ddcf5c697540f", 1000); err != nil { + if r, err := c.SnapshotReader(context.Background(), litestream.SnapshotInfo{Generation: "b16ddcf5c697540f", Index: 1000}); err != nil { t.Fatal(err) } else if buf, err := io.ReadAll(r); err != nil { t.Fatal(err) @@ -203,7 +203,7 @@ func TestReplicaClient_WriteSnapshot(t *testing.T) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteSnapshot(context.Background(), "", 0, nil); err == nil || err.Error() != `cannot determine snapshot path: generation required` { + if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{}, nil); err == nil || err.Error() != `cannot determine snapshot path: generation required` { t.Fatalf("unexpected error: %v", err) } }) @@ -213,11 +213,16 @@ func TestReplicaClient_SnapshotReader(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteSnapshot(context.Background(), "5efbd8d042012dca", 10, strings.NewReader(`foo`)); err != nil { + info := litestream.SnapshotInfo{ + Generation: "5efbd8d042012dca", + Index: 10, + } + + if err := c.WriteSnapshot(context.Background(), &info, strings.NewReader(`foo`)); err != nil { t.Fatal(err) } - r, err := c.SnapshotReader(context.Background(), "5efbd8d042012dca", 10) + r, err := c.SnapshotReader(context.Background(), info) if err != nil { t.Fatal(err) } @@ -233,7 +238,7 @@ func TestReplicaClient_SnapshotReader(t *testing.T) { RunWithReplicaClient(t, "ErrNotFound", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.SnapshotReader(context.Background(), "5efbd8d042012dca", 1); !os.IsNotExist(err) { + if _, err := c.SnapshotReader(context.Background(), litestream.SnapshotInfo{Generation: "5efbd8d042012dca", Index: 1}); !os.IsNotExist(err) { t.Fatalf("expected not exist, got %#v", err) } }) @@ -241,7 +246,7 @@ func TestReplicaClient_SnapshotReader(t *testing.T) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.SnapshotReader(context.Background(), "", 1); err == nil || err.Error() != `cannot determine snapshot path: generation required` { + if _, err := c.SnapshotReader(context.Background(), litestream.SnapshotInfo{Index: 1}); err == nil || err.Error() != `cannot determine snapshot path: generation required` { t.Fatalf("unexpected error: %v", err) } }) @@ -251,14 +256,14 @@ func TestReplicaClient_WALs(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "5efbd8d042012dca", Index: 1, Offset: 0}, strings.NewReader(``)); err != nil { + if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "5efbd8d042012dca", Index: 1, Offset: 0}, strings.NewReader(``)); err != nil { t.Fatal(err) } - if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 2, Offset: 0}, strings.NewReader(`12345`)); err != nil { + if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "b16ddcf5c697540f", Index: 2, Offset: 0}, strings.NewReader(`12345`)); err != nil { t.Fatal(err) - } else if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 2, Offset: 5}, strings.NewReader(`67`)); err != nil { + } else if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "b16ddcf5c697540f", Index: 2, Offset: 5}, strings.NewReader(`67`)); err != nil { t.Fatal(err) - } else if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 3, Offset: 0}, strings.NewReader(`xyz`)); err != nil { + } else if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "b16ddcf5c697540f", Index: 3, Offset: 0}, strings.NewReader(`xyz`)); err != nil { t.Fatal(err) } @@ -339,7 +344,7 @@ func TestReplicaClient_WALs(t *testing.T) { RunWithReplicaClient(t, "NoWALs", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteSnapshot(context.Background(), "5efbd8d042012dca", 0, strings.NewReader(`foo`)); err != nil { + if err := c.WriteSnapshot(context.Background(), &litestream.SnapshotInfo{Generation: "5efbd8d042012dca"}, strings.NewReader(`foo`)); err != nil { t.Fatal(err) } @@ -371,11 +376,11 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}, strings.NewReader(`foobar`)); err != nil { + if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}, strings.NewReader(`foobar`)); err != nil { t.Fatal(err) } - if r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil { + if r, err := c.WALSegmentReader(context.Background(), litestream.WALSegmentInfo{Generation: "b16ddcf5c697540f", Index: 1000, Offset: 2000}); err != nil { t.Fatal(err) } else if buf, err := io.ReadAll(r); err != nil { t.Fatal(err) @@ -388,7 +393,7 @@ func TestReplicaClient_WriteWALSegment(t *testing.T) { RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "", Index: 0, Offset: 0}, nil); err == nil || err.Error() != `cannot determine wal segment path: generation required` { + if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "", Index: 0, Offset: 0}, nil); err == nil || err.Error() != `cannot determine wal segment path: generation required` { t.Fatalf("unexpected error: %v", err) } }) @@ -398,11 +403,11 @@ func TestReplicaClient_WALReader(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "5efbd8d042012dca", Index: 10, Offset: 5}, strings.NewReader(`foobar`)); err != nil { + if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "5efbd8d042012dca", Index: 10, Offset: 5}, strings.NewReader(`foobar`)); err != nil { t.Fatal(err) } - r, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "5efbd8d042012dca", Index: 10, Offset: 5}) + r, err := c.WALSegmentReader(context.Background(), litestream.WALSegmentInfo{Generation: "5efbd8d042012dca", Index: 10, Offset: 5}) if err != nil { t.Fatal(err) } @@ -418,7 +423,7 @@ func TestReplicaClient_WALReader(t *testing.T) { RunWithReplicaClient(t, "ErrNotFound", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "5efbd8d042012dca", Index: 1, Offset: 0}); !os.IsNotExist(err) { + if _, err := c.WALSegmentReader(context.Background(), litestream.WALSegmentInfo{Generation: "5efbd8d042012dca", Index: 1, Offset: 0}); !os.IsNotExist(err) { t.Fatalf("expected not exist, got %#v", err) } }) @@ -428,29 +433,29 @@ func TestReplicaClient_DeleteWALSegments(t *testing.T) { RunWithReplicaClient(t, "OK", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1, Offset: 2}, strings.NewReader(`foo`)); err != nil { + if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "b16ddcf5c697540f", Index: 1, Offset: 2}, strings.NewReader(`foo`)); err != nil { t.Fatal(err) - } else if _, err := c.WriteWALSegment(context.Background(), litestream.Pos{Generation: "5efbd8d042012dca", Index: 3, Offset: 4}, strings.NewReader(`bar`)); err != nil { + } else if err := c.WriteWALSegment(context.Background(), &litestream.WALSegmentInfo{Generation: "5efbd8d042012dca", Index: 3, Offset: 4}, strings.NewReader(`bar`)); err != nil { t.Fatal(err) } - if err := c.DeleteWALSegments(context.Background(), []litestream.Pos{ + if err := c.DeleteWALSegments(context.Background(), []litestream.WALSegmentInfo{ {Generation: "b16ddcf5c697540f", Index: 1, Offset: 2}, {Generation: "5efbd8d042012dca", Index: 3, Offset: 4}, }); err != nil { t.Fatal(err) } - if _, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "b16ddcf5c697540f", Index: 1, Offset: 2}); !os.IsNotExist(err) { + if _, err := c.WALSegmentReader(context.Background(), litestream.WALSegmentInfo{Generation: "b16ddcf5c697540f", Index: 1, Offset: 2}); !os.IsNotExist(err) { t.Fatalf("expected not exist, got %#v", err) - } else if _, err := c.WALSegmentReader(context.Background(), litestream.Pos{Generation: "5efbd8d042012dca", Index: 3, Offset: 4}); !os.IsNotExist(err) { + } else if _, err := c.WALSegmentReader(context.Background(), litestream.WALSegmentInfo{Generation: "5efbd8d042012dca", Index: 3, Offset: 4}); !os.IsNotExist(err) { t.Fatalf("expected not exist, got %#v", err) } }) RunWithReplicaClient(t, "ErrNoGeneration", func(t *testing.T, c litestream.ReplicaClient) { t.Parallel() - if err := c.DeleteWALSegments(context.Background(), []litestream.Pos{{}}); err == nil || err.Error() != `cannot determine wal segment path: generation required` { + if err := c.DeleteWALSegments(context.Background(), []litestream.WALSegmentInfo{{}}); err == nil || err.Error() != `cannot determine wal segment path: generation required` { t.Fatalf("unexpected error: %v", err) } }) diff --git a/replica_test.go b/replica_test.go index 78ba6206..24dfee2f 100644 --- a/replica_test.go +++ b/replica_test.go @@ -13,10 +13,12 @@ import ( "github.com/pierrec/lz4/v4" ) -func nextIndex(pos litestream.Pos) litestream.Pos { - return litestream.Pos{ - Generation: pos.Generation, - Index: pos.Index + 1, +func nextIndex(pos litestream.Pos) litestream.WALSegmentInfo { + return litestream.WALSegmentInfo{ + Generation: pos.Generation, + Index: pos.Index + 1, + Offset: 0, + Compression: litestream.CompressionLZ4, // FIXME: this matches the assumption in replica.go for now } } @@ -108,7 +110,7 @@ func TestReplica_Sync(t *testing.T) { // Verify WAL matches replica WAL. if b0, err := os.ReadFile(db.Path() + "-wal"); err != nil { t.Fatal(err) - } else if r, err := c.WALSegmentReader(context.Background(), dpos.Truncate()); err != nil { + } else if r, err := c.WALSegmentReader(context.Background(), litestream.WALSegmentInfo{Generation: dpos.Generation, Index: dpos.Index, Compression: litestream.CompressionLZ4}); err != nil { t.Fatal(err) } else if b1, err := io.ReadAll(lz4.NewReader(r)); err != nil { t.Fatal(err) @@ -142,8 +144,8 @@ func TestReplica_Snapshot(t *testing.T) { t.Fatal(err) } else if info, err := r.Snapshot(context.Background()); err != nil { t.Fatal(err) - } else if got, want := info.Pos(), nextIndex(pos0); got != want { - t.Fatalf("pos=%s, want %s", got, want) + } else if got, want := info.Pos(), nextIndex(pos0); got != want.Pos() { + t.Fatalf("pos=%s, want %s", got, want.Pos()) } // Sync database and then replica. @@ -166,7 +168,7 @@ func TestReplica_Snapshot(t *testing.T) { t.Fatal(err) } else if info, err := r.Snapshot(context.Background()); err != nil { t.Fatal(err) - } else if got, want := info.Pos(), nextIndex(pos1); got != want { + } else if got, want := info.Pos(), nextIndex(pos1); got != want.Pos() { t.Fatalf("pos=%v, want %v", got, want) } @@ -177,9 +179,9 @@ func TestReplica_Snapshot(t *testing.T) { t.Fatalf("len=%v, want %v", got, want) } else if got, want := infos[0].Pos(), pos0.Truncate(); got != want { t.Fatalf("info[0]=%s, want %s", got, want) - } else if got, want := infos[1].Pos(), nextIndex(pos0); got != want { - t.Fatalf("info[1]=%s, want %s", got, want) - } else if got, want := infos[2].Pos(), nextIndex(pos1); got != want { - t.Fatalf("info[2]=%s, want %s", got, want) + } else if got, want := infos[1].Pos(), nextIndex(pos0); got != want.Pos() { + t.Fatalf("info[1]=%s, want %s", got, want.Pos()) + } else if got, want := infos[2].Pos(), nextIndex(pos1); got != want.Pos() { + t.Fatalf("info[2]=%s, want %s", got, want.Pos()) } } diff --git a/s3/replica_client.go b/s3/replica_client.go index 3d2ddffd..ce71c8e1 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -238,14 +238,14 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites } // WriteSnapshot writes LZ4 compressed data from rd into a file on disk. -func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { +func (c *ReplicaClient) WriteSnapshot(ctx context.Context, info *litestream.SnapshotInfo, rd io.Reader) error { if err := c.Init(ctx); err != nil { - return info, err + return err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, *info) if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) + return fmt.Errorf("cannot determine snapshot path: %w", err) } startTime := time.Now() @@ -255,7 +255,7 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in Key: aws.String(key), Body: rc, }); err != nil { - return info, err + return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() @@ -263,21 +263,18 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) - return litestream.SnapshotInfo{ - Generation: generation, - Index: index, - Size: rc.N(), - CreatedAt: startTime.UTC(), - }, nil + info.Size = rc.N() + info.CreatedAt = startTime.UTC() + return nil } // SnapshotReader returns a reader for snapshot data at the given generation/index. -func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) { +func (c *ReplicaClient) SnapshotReader(ctx context.Context, info litestream.SnapshotInfo) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, info) if err != nil { return nil, fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -298,12 +295,12 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i } // DeleteSnapshot deletes a snapshot with the given generation & index. -func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) error { +func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, info litestream.SnapshotInfo) error { if err := c.Init(ctx); err != nil { return err } - key, err := litestream.SnapshotPath(c.Path, generation, index) + key, err := litestream.SnapshotPath(c.Path, info) if err != nil { return fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -332,14 +329,14 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. -func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { +func (c *ReplicaClient) WriteWALSegment(ctx context.Context, info *litestream.WALSegmentInfo, rd io.Reader) error { if err := c.Init(ctx); err != nil { - return info, err + return err } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, *info) if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) + return fmt.Errorf("cannot determine wal segment path: %w", err) } startTime := time.Now() @@ -349,29 +346,25 @@ func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, Key: aws.String(key), Body: rc, }); err != nil { - return info, err + return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(rc.N())) - return litestream.WALSegmentInfo{ - Generation: pos.Generation, - Index: pos.Index, - Offset: pos.Offset, - Size: rc.N(), - CreatedAt: startTime.UTC(), - }, nil + info.Size = rc.N() + info.CreatedAt = startTime.UTC() + return nil } // WALSegmentReader returns a reader for a section of WAL data at the given index. // Returns os.ErrNotExist if no matching index/offset is found. -func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (io.ReadCloser, error) { +func (c *ReplicaClient) WALSegmentReader(ctx context.Context, info litestream.WALSegmentInfo) (io.ReadCloser, error) { if err := c.Init(ctx); err != nil { return nil, err } - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + key, err := litestream.WALSegmentPath(c.Path, info) if err != nil { return nil, fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -392,7 +385,7 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos } // DeleteWALSegments deletes WAL segments with at the given positions. -func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) error { +func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.WALSegmentInfo) error { if err := c.Init(ctx); err != nil { return err } @@ -405,8 +398,8 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po } // Generate a batch of object IDs for deleting the WAL segments. - for i, pos := range a[:n] { - key, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + for i, info := range a[:n] { + key, err := litestream.WALSegmentPath(c.Path, info) if err != nil { return fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -527,19 +520,18 @@ func (itr *snapshotIterator) fetch() error { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, obj := range page.Contents { - key := path.Base(*obj.Key) - index, err := litestream.ParseSnapshotPath(key) - if err != nil { - continue - } - info := litestream.SnapshotInfo{ Generation: itr.generation, - Index: index, Size: *obj.Size, CreatedAt: obj.LastModified.UTC(), } + key := path.Base(*obj.Key) + err := info.ParsePath(key) + if err != nil { + continue + } + select { case <-itr.ctx.Done(): case itr.ch <- info: @@ -630,20 +622,18 @@ func (itr *walSegmentIterator) fetch() error { internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc() for _, obj := range page.Contents { - key := path.Base(*obj.Key) - index, offset, err := litestream.ParseWALSegmentPath(key) - if err != nil { - continue - } - info := litestream.WALSegmentInfo{ Generation: itr.generation, - Index: index, - Offset: offset, Size: *obj.Size, CreatedAt: obj.LastModified.UTC(), } + key := path.Base(*obj.Key) + err := info.ParsePath(key) + if err != nil { + continue + } + select { case <-itr.ctx.Done(): return false diff --git a/sftp/replica_client.go b/sftp/replica_client.go index 1b8168c1..777119d0 100644 --- a/sftp/replica_client.go +++ b/sftp/replica_client.go @@ -212,53 +212,54 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit // Iterate over every file and convert to metadata. infos := make([]litestream.SnapshotInfo, 0, len(fis)) for _, fi := range fis { + info := litestream.SnapshotInfo{ + Generation: generation, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + } + // Parse index from filename. - index, err := litestream.ParseSnapshotPath(path.Base(fi.Name())) + err := info.ParsePath(path.Base(fi.Name())) if err != nil { continue } - infos = append(infos, litestream.SnapshotInfo{ - Generation: generation, - Index: index, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), - }) + infos = append(infos, info) } return litestream.NewSnapshotInfoSliceIterator(infos), nil } // WriteSnapshot writes LZ4 compressed data from rd to the object storage. -func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, index int, rd io.Reader) (info litestream.SnapshotInfo, err error) { +func (c *ReplicaClient) WriteSnapshot(ctx context.Context, info *litestream.SnapshotInfo, rd io.Reader) (err error) { defer func() { c.resetOnConnError(err) }() sftpClient, err := c.Init(ctx) if err != nil { - return info, err + return err } - filename, err := litestream.SnapshotPath(c.Path, generation, index) + filename, err := litestream.SnapshotPath(c.Path, *info) if err != nil { - return info, fmt.Errorf("cannot determine snapshot path: %w", err) + return fmt.Errorf("cannot determine snapshot path: %w", err) } startTime := time.Now() if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { - return info, fmt.Errorf("cannot make parent wal segment directory %q: %w", path.Dir(filename), err) + return fmt.Errorf("cannot make parent wal segment directory %q: %w", path.Dir(filename), err) } f, err := sftpClient.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) if err != nil { - return info, fmt.Errorf("cannot open snapshot file for writing: %w", err) + return fmt.Errorf("cannot open snapshot file for writing: %w", err) } defer f.Close() n, err := io.Copy(f, rd) if err != nil { - return info, err + return err } else if err := f.Close(); err != nil { - return info, err + return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() @@ -266,16 +267,13 @@ func (c *ReplicaClient) WriteSnapshot(ctx context.Context, generation string, in // log.Printf("%s(%s): snapshot: creating %s/%08x t=%s", r.db.Path(), r.Name(), generation, index, time.Since(startTime).Truncate(time.Millisecond)) - return litestream.SnapshotInfo{ - Generation: generation, - Index: index, - Size: n, - CreatedAt: startTime.UTC(), - }, nil + info.Size = n + info.CreatedAt = startTime.UTC() + return nil } // SnapshotReader returns a reader for snapshot data at the given generation/index. -func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, index int) (_ io.ReadCloser, err error) { +func (c *ReplicaClient) SnapshotReader(ctx context.Context, info litestream.SnapshotInfo) (_ io.ReadCloser, err error) { defer func() { c.resetOnConnError(err) }() sftpClient, err := c.Init(ctx) @@ -283,7 +281,7 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i return nil, err } - filename, err := litestream.SnapshotPath(c.Path, generation, index) + filename, err := litestream.SnapshotPath(c.Path, info) if err != nil { return nil, fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -299,7 +297,7 @@ func (c *ReplicaClient) SnapshotReader(ctx context.Context, generation string, i } // DeleteSnapshot deletes a snapshot with the given generation & index. -func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, index int) (err error) { +func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, info litestream.SnapshotInfo) (err error) { defer func() { c.resetOnConnError(err) }() sftpClient, err := c.Init(ctx) @@ -307,7 +305,7 @@ func (c *ReplicaClient) DeleteSnapshot(ctx context.Context, generation string, i return err } - filename, err := litestream.SnapshotPath(c.Path, generation, index) + filename, err := litestream.SnapshotPath(c.Path, info) if err != nil { return fmt.Errorf("cannot determine snapshot path: %w", err) } @@ -344,70 +342,66 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l // Iterate over every file and convert to metadata. infos := make([]litestream.WALSegmentInfo, 0, len(fis)) for _, fi := range fis { - index, offset, err := litestream.ParseWALSegmentPath(path.Base(fi.Name())) + info := litestream.WALSegmentInfo{ + Generation: generation, + Size: fi.Size(), + CreatedAt: fi.ModTime().UTC(), + } + + err := info.ParsePath(path.Base(fi.Name())) if err != nil { continue } - infos = append(infos, litestream.WALSegmentInfo{ - Generation: generation, - Index: index, - Offset: offset, - Size: fi.Size(), - CreatedAt: fi.ModTime().UTC(), - }) + infos = append(infos, info) } return litestream.NewWALSegmentInfoSliceIterator(infos), nil } // WriteWALSegment writes LZ4 compressed data from rd into a file on disk. -func (c *ReplicaClient) WriteWALSegment(ctx context.Context, pos litestream.Pos, rd io.Reader) (info litestream.WALSegmentInfo, err error) { +func (c *ReplicaClient) WriteWALSegment(ctx context.Context, info *litestream.WALSegmentInfo, rd io.Reader) (err error) { defer func() { c.resetOnConnError(err) }() sftpClient, err := c.Init(ctx) if err != nil { - return info, err + return err } - filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + filename, err := litestream.WALSegmentPath(c.Path, *info) if err != nil { - return info, fmt.Errorf("cannot determine wal segment path: %w", err) + return fmt.Errorf("cannot determine wal segment path: %w", err) } startTime := time.Now() if err := sftpClient.MkdirAll(path.Dir(filename)); err != nil { - return info, fmt.Errorf("cannot make parent snapshot directory %q: %w", path.Dir(filename), err) + return fmt.Errorf("cannot make parent snapshot directory %q: %w", path.Dir(filename), err) } f, err := sftpClient.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC) if err != nil { - return info, fmt.Errorf("cannot open snapshot file for writing: %w", err) + return fmt.Errorf("cannot open snapshot file for writing: %w", err) } defer f.Close() n, err := io.Copy(f, rd) if err != nil { - return info, err + return err } else if err := f.Close(); err != nil { - return info, err + return err } internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "PUT").Inc() internal.OperationBytesCounterVec.WithLabelValues(ReplicaClientType, "PUT").Add(float64(n)) - return litestream.WALSegmentInfo{ - Generation: pos.Generation, - Index: pos.Index, - Offset: pos.Offset, - Size: n, - CreatedAt: startTime.UTC(), - }, nil + info.Size = n + info.CreatedAt = startTime.UTC() + return nil } // WALSegmentReader returns a reader for a section of WAL data at the given index. // Returns os.ErrNotExist if no matching index/offset is found. -func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos) (_ io.ReadCloser, err error) { +func (c *ReplicaClient) WALSegmentReader(ctx context.Context, info litestream.WALSegmentInfo) (_ io.ReadCloser, err error) { defer func() { c.resetOnConnError(err) }() sftpClient, err := c.Init(ctx) @@ -415,7 +409,7 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos return nil, err } - filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + filename, err := litestream.WALSegmentPath(c.Path, info) if err != nil { return nil, fmt.Errorf("cannot determine wal segment path: %w", err) } @@ -431,7 +425,7 @@ func (c *ReplicaClient) WALSegmentReader(ctx context.Context, pos litestream.Pos } // DeleteWALSegments deletes WAL segments with at the given positions. -func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Pos) (err error) { +func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.WALSegmentInfo) (err error) { defer func() { c.resetOnConnError(err) }() sftpClient, err := c.Init(ctx) @@ -439,8 +433,8 @@ func (c *ReplicaClient) DeleteWALSegments(ctx context.Context, a []litestream.Po return err } - for _, pos := range a { - filename, err := litestream.WALSegmentPath(c.Path, pos.Generation, pos.Index, pos.Offset) + for _, info := range a { + filename, err := litestream.WALSegmentPath(c.Path, info) if err != nil { return fmt.Errorf("cannot determine wal segment path: %w", err) }