From 39879742460fea8b40a60a0cdc9a4959e788a3fc Mon Sep 17 00:00:00 2001 From: Argelbargel Date: Sun, 24 Sep 2023 00:53:01 +0200 Subject: [PATCH] Refactor storage to properly support dynamic configuration; to pick up changes of env-vars or files, we need to recreate the storage-clients on every interval --- internal/agent/snapshot-agent.go | 7 +- internal/agent/snapshot-agent_test.go | 87 ++++++++------- internal/agent/storage/aws.go | 64 +++++------ internal/agent/storage/azure.go | 28 ++--- internal/agent/storage/config.go | 14 +-- internal/agent/storage/controller.go | 82 +++++++------- internal/agent/storage/controller_test.go | 126 +++++++++++++++++----- internal/agent/storage/gcp.go | 19 ++-- internal/agent/storage/local.go | 19 ++-- internal/agent/storage/local_test.go | 29 ++--- internal/agent/storage/manager.go | 117 ++++++++++---------- internal/agent/storage/manager_test.go | 91 +++++++++++++--- internal/agent/storage/swift.go | 21 ++-- 13 files changed, 426 insertions(+), 278 deletions(-) diff --git a/internal/agent/snapshot-agent.go b/internal/agent/snapshot-agent.go index 7fa54ba..abdd268 100644 --- a/internal/agent/snapshot-agent.go +++ b/internal/agent/snapshot-agent.go @@ -107,12 +107,7 @@ func (a *SnapshotAgent) reconfigure(ctx context.Context, config SnapshotAgentCon return err } - manager, err := storage.CreateManager(ctx, config.Snapshots.Storages) - if err != nil { - return err - } - - a.update(ctx, client, manager, config.Snapshots.StorageConfigDefaults) + a.update(ctx, client, storage.CreateManager(config.Snapshots.Storages), config.Snapshots.StorageConfigDefaults) return nil } diff --git a/internal/agent/snapshot-agent_test.go b/internal/agent/snapshot-agent_test.go index c470a0c..403a439 100644 --- a/internal/agent/snapshot-agent_test.go +++ b/internal/agent/snapshot-agent_test.go @@ -22,12 +22,10 @@ func TestTakeSnapshotUploadsSnapshot(t *testing.T) { Frequency: time.Millisecond, } - controller := &storageControllerStub{ - nextSnapshot: time.Now().Add(time.Millisecond * 250), - } + factory := &storageControllerFactoryStub{nextSnapshot: time.Now().Add(time.Millisecond * 250)} manager := &storage.Manager{} - manager.AddStorage(controller) + manager.AddStorageFactory(factory) ctx := context.Background() @@ -39,10 +37,10 @@ func TestTakeSnapshotUploadsSnapshot(t *testing.T) { <-timer.C assert.True(t, clientVaultAPI.tookSnapshot) - assert.Equal(t, clientVaultAPI.snapshotData, controller.uploadData) - assert.Equal(t, defaults, controller.defaults) - assert.WithinRange(t, controller.snapshotTimestamp, start, start.Add(50*time.Millisecond)) - assert.GreaterOrEqual(t, time.Now(), controller.nextSnapshot) + assert.Equal(t, clientVaultAPI.snapshotData, factory.uploadData) + assert.Equal(t, defaults, factory.defaults) + assert.WithinRange(t, factory.snapshotTimestamp, start, start.Add(50*time.Millisecond)) + assert.GreaterOrEqual(t, time.Now(), factory.nextSnapshot) } func TestTakeSnapshotLocksTakeSnapshot(t *testing.T) { @@ -119,12 +117,12 @@ func TestTakeSnapshotFailsWhenTempFileCannotBeCreated(t *testing.T) { Frequency: time.Millisecond * 150, } - controller := &storageControllerStub{ + factory := &storageControllerFactoryStub{ nextSnapshot: time.Now().Add(defaults.Frequency * 4), } manager := &storage.Manager{} - manager.AddStorage(controller) + manager.AddStorageFactory(factory) ctx := context.Background() @@ -135,7 +133,7 @@ func TestTakeSnapshotFailsWhenTempFileCannotBeCreated(t *testing.T) { <-timer.C assert.False(t, clientVaultAPI.tookSnapshot) - assert.Less(t, time.Now(), controller.nextSnapshot.Add(-defaults.Frequency)) + assert.Less(t, time.Now(), factory.nextSnapshot.Add(-defaults.Frequency)) } func TestTakeSnapshotFailsWhenSnapshottingFails(t *testing.T) { @@ -148,12 +146,12 @@ func TestTakeSnapshotFailsWhenSnapshottingFails(t *testing.T) { Frequency: time.Millisecond * 150, } - controller := &storageControllerStub{ + factory := &storageControllerFactoryStub{ nextSnapshot: time.Now().Add(defaults.Frequency * 4), } manager := &storage.Manager{} - manager.AddStorage(controller) + manager.AddStorageFactory(factory) ctx := context.Background() @@ -164,7 +162,7 @@ func TestTakeSnapshotFailsWhenSnapshottingFails(t *testing.T) { <-timer.C assert.True(t, clientVaultAPI.tookSnapshot) - assert.Less(t, time.Now(), controller.nextSnapshot.Add(-defaults.Frequency)) + assert.Less(t, time.Now(), factory.nextSnapshot.Add(-defaults.Frequency)) } func TestTakeSnapshotIgnoresEmptySnapshot(t *testing.T) { @@ -176,12 +174,12 @@ func TestTakeSnapshotIgnoresEmptySnapshot(t *testing.T) { Frequency: time.Millisecond * 150, } - controller := &storageControllerStub{ + factory := &storageControllerFactoryStub{ nextSnapshot: time.Now().Add(defaults.Frequency * 4), } manager := &storage.Manager{} - manager.AddStorage(controller) + manager.AddStorageFactory(factory) ctx := context.Background() @@ -192,7 +190,7 @@ func TestTakeSnapshotIgnoresEmptySnapshot(t *testing.T) { <-timer.C assert.True(t, clientVaultAPI.tookSnapshot) - assert.Less(t, time.Now(), controller.nextSnapshot.Add(-defaults.Frequency)) + assert.Less(t, time.Now(), factory.nextSnapshot.Add(-defaults.Frequency)) } func TestIgnoresZeroTimeForScheduling(t *testing.T) { @@ -205,12 +203,12 @@ func TestIgnoresZeroTimeForScheduling(t *testing.T) { Frequency: time.Millisecond * 150, } - controller := &storageControllerStub{ + factory := &storageControllerFactoryStub{ nextSnapshot: time.Time{}, } manager := &storage.Manager{} - manager.AddStorage(controller) + manager.AddStorageFactory(factory) ctx := context.Background() @@ -222,7 +220,7 @@ func TestIgnoresZeroTimeForScheduling(t *testing.T) { <-timer.C assert.True(t, clientVaultAPI.tookSnapshot) - assert.Equal(t, clientVaultAPI.snapshotData, controller.uploadData) + assert.Equal(t, clientVaultAPI.snapshotData, factory.uploadData) assert.GreaterOrEqual(t, time.Now(), start.Add(defaults.Frequency)) } @@ -233,24 +231,29 @@ func TestUpdateReschedulesSnapshots(t *testing.T) { } manager := &storage.Manager{} - manager.AddStorage(&storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond * 250)}) + factory := &storageControllerFactoryStub{nextSnapshot: time.Now().Add(time.Millisecond * 250)} + manager.AddStorageFactory(factory) - newController := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond * 500)} + newFactory := &storageControllerFactoryStub{nextSnapshot: time.Now().Add(time.Millisecond * 500)} newManager := &storage.Manager{} - newManager.AddStorage(newController) + newManager.AddStorageFactory(newFactory) ctx := context.Background() agent := newSnapshotAgent(t.TempDir()) - agent.update(ctx, newClient(clientVaultAPI), manager, storage.StorageConfigDefaults{}) + client := newClient(clientVaultAPI) + agent.update(ctx, client, manager, storage.StorageConfigDefaults{}) timer := agent.TakeSnapshot(ctx) + updated := make(chan bool, 1) go func() { - agent.update(ctx, newClient(clientVaultAPI), newManager, storage.StorageConfigDefaults{}) + agent.update(ctx, client, newManager, storage.StorageConfigDefaults{}) + updated <- true }() + <-updated <-timer.C - assert.GreaterOrEqual(t, time.Now(), newController.nextSnapshot) + assert.GreaterOrEqual(t, time.Now(), newFactory.nextSnapshot) assert.Equal(t, newManager, agent.manager) } @@ -304,7 +307,7 @@ func (stub clientVaultAPIAuthStub) Login(_ context.Context, _ any) (time.Duratio return 0, nil } -type storageControllerStub struct { +type storageControllerFactoryStub struct { defaults storage.StorageConfigDefaults uploadData string uploadFails bool @@ -312,28 +315,36 @@ type storageControllerStub struct { nextSnapshot time.Time } -func (stub *storageControllerStub) Destination() string { +func (stub *storageControllerFactoryStub) Destination() string { return "" } -func (stub *storageControllerStub) ScheduleSnapshot(_ context.Context, _ time.Time, _ storage.StorageConfigDefaults) time.Time { - return stub.nextSnapshot +func (stub *storageControllerFactoryStub) CreateController(context.Context) (storage.StorageController, error) { + return storageControllerStub{stub}, nil +} + +type storageControllerStub struct { + factory *storageControllerFactoryStub +} + +func (stub storageControllerStub) ScheduleSnapshot(_ context.Context, _ time.Time, _ storage.StorageConfigDefaults) (time.Time, error) { + return stub.factory.nextSnapshot, nil } -func (stub *storageControllerStub) DeleteObsoleteSnapshots(_ context.Context, _ storage.StorageConfigDefaults) (int, error) { +func (stub storageControllerStub) DeleteObsoleteSnapshots(_ context.Context, _ storage.StorageConfigDefaults) (int, error) { return 0, nil } -func (stub *storageControllerStub) UploadSnapshot(_ context.Context, snapshot io.Reader, timestamp time.Time, defaults storage.StorageConfigDefaults) (bool, time.Time, error) { - stub.snapshotTimestamp = timestamp - stub.defaults = defaults - if stub.uploadFails { - return false, stub.nextSnapshot, errors.New("upload failed") +func (stub storageControllerStub) UploadSnapshot(_ context.Context, snapshot io.Reader, timestamp time.Time, defaults storage.StorageConfigDefaults) (bool, time.Time, error) { + stub.factory.snapshotTimestamp = timestamp + stub.factory.defaults = defaults + if stub.factory.uploadFails { + return false, stub.factory.nextSnapshot, errors.New("upload failed") } data, err := io.ReadAll(snapshot) if err != nil { return false, time.Now(), err } - stub.uploadData = string(data) - return true, stub.nextSnapshot, nil + stub.factory.uploadData = string(data) + return true, stub.factory.nextSnapshot, nil } diff --git a/internal/agent/storage/aws.go b/internal/agent/storage/aws.go index 8fb4e69..ef9db73 100644 --- a/internal/agent/storage/aws.go +++ b/internal/agent/storage/aws.go @@ -37,46 +37,50 @@ type awsStorageImpl struct { sse bool } -func createAWSStorageController(ctx context.Context, config AWSStorageConfig) (*storageControllerImpl[types.Object], error) { +func (conf AWSStorageConfig) Destination() string { + return fmt.Sprintf("aws s3 bucket %s at %s", conf.Bucket, conf.Endpoint) +} + +func (conf AWSStorageConfig) CreateController(ctx context.Context) (StorageController, error) { keyPrefix := "" - if config.KeyPrefix != "" { - keyPrefix = fmt.Sprintf("%s/", config.KeyPrefix) + if conf.KeyPrefix != "" { + keyPrefix = fmt.Sprintf("%s/", conf.KeyPrefix) } - client, err := createS3Client(ctx, config) + client, err := conf.createClient(ctx) if err != nil { - return nil, nil + return nil, err } return newStorageController[types.Object]( - config.storageConfig, - fmt.Sprintf("aws s3 bucket %s at %s", config.Bucket, config.Endpoint), + conf.storageConfig, awsStorageImpl{ client: client, keyPrefix: keyPrefix, - bucket: config.Bucket, - sse: config.UseServerSideEncryption, + bucket: conf.Bucket, + sse: conf.UseServerSideEncryption, }, ), nil + } -func createS3Client(ctx context.Context, config AWSStorageConfig) (*s3.Client, error) { - accessKeyId, err := config.AccessKeyId.Resolve(false) +func (conf AWSStorageConfig) createClient(ctx context.Context) (*s3.Client, error) { + accessKeyId, err := conf.AccessKeyId.Resolve(false) if err != nil { return nil, err } - accessKey, err := config.AccessKey.Resolve(accessKeyId != "") + accessKey, err := conf.AccessKey.Resolve(accessKeyId != "") if err != nil { return nil, err } - sessionToken, err := config.SessionToken.Resolve(false) + sessionToken, err := conf.SessionToken.Resolve(false) if err != nil { return nil, err } - region, err := config.Region.Resolve(false) + region, err := conf.Region.Resolve(false) if err != nil { return nil, err } @@ -90,14 +94,14 @@ func createS3Client(ctx context.Context, config AWSStorageConfig) (*s3.Client, e clientConfig.Credentials = credentials.NewStaticCredentialsProvider(accessKeyId, accessKey, sessionToken) } - endpoint, err := config.Endpoint.Resolve(false) + endpoint, err := conf.Endpoint.Resolve(false) if err != nil { return nil, err } client := s3.NewFromConfig(clientConfig, func(o *s3.Options) { - o.UsePathStyle = config.ForcePathStyle - if config.Endpoint != "" { + o.UsePathStyle = conf.ForcePathStyle + if conf.Endpoint != "" { o.BaseEndpoint = aws.String(endpoint) } }) @@ -107,18 +111,18 @@ func createS3Client(ctx context.Context, config AWSStorageConfig) (*s3.Client, e // nolint:unused // implements interface storage -func (u awsStorageImpl) UploadSnapshot(ctx context.Context, name string, data io.Reader) error { +func (s awsStorageImpl) uploadSnapshot(ctx context.Context, name string, data io.Reader) error { input := &s3.PutObjectInput{ - Bucket: &u.bucket, - Key: aws.String(u.keyPrefix + name), + Bucket: &s.bucket, + Key: aws.String(s.keyPrefix + name), Body: data, } - if u.sse { + if s.sse { input.ServerSideEncryption = types.ServerSideEncryptionAes256 } - uploader := manager.NewUploader(u.client) + uploader := manager.NewUploader(s.client) if _, err := uploader.Upload(ctx, input); err != nil { return err } @@ -128,13 +132,13 @@ func (u awsStorageImpl) UploadSnapshot(ctx context.Context, name string, data io // nolint:unused // implements interface storage -func (u awsStorageImpl) DeleteSnapshot(ctx context.Context, snapshot types.Object) error { +func (s awsStorageImpl) deleteSnapshot(ctx context.Context, snapshot types.Object) error { input := &s3.DeleteObjectInput{ - Bucket: &u.bucket, + Bucket: &s.bucket, Key: snapshot.Key, } - if _, err := u.client.DeleteObject(ctx, input); err != nil { + if _, err := s.client.DeleteObject(ctx, input); err != nil { return err } @@ -143,12 +147,12 @@ func (u awsStorageImpl) DeleteSnapshot(ctx context.Context, snapshot types.Objec // nolint:unused // implements interface storage -func (u awsStorageImpl) ListSnapshots(ctx context.Context, prefix string, ext string) ([]types.Object, error) { +func (s awsStorageImpl) listSnapshots(ctx context.Context, prefix string, ext string) ([]types.Object, error) { var result []types.Object - existingSnapshotList, err := u.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ - Bucket: &u.bucket, - Prefix: aws.String(u.keyPrefix), + existingSnapshotList, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: &s.bucket, + Prefix: aws.String(s.keyPrefix), }) if err != nil { @@ -166,6 +170,6 @@ func (u awsStorageImpl) ListSnapshots(ctx context.Context, prefix string, ext st // nolint:unused // implements interface storage -func (u awsStorageImpl) GetLastModifiedTime(snapshot types.Object) time.Time { +func (s awsStorageImpl) getLastModifiedTime(snapshot types.Object) time.Time { return *snapshot.LastModified } diff --git a/internal/agent/storage/azure.go b/internal/agent/storage/azure.go index c2ba157..702512d 100644 --- a/internal/agent/storage/azure.go +++ b/internal/agent/storage/azure.go @@ -25,17 +25,21 @@ type azureStorageImpl struct { container string } -func createAzureStorageController(_ context.Context, config AzureStorageConfig) (*storageControllerImpl[*container.BlobItem], error) { - client, err := createAzBlobClient(config) +func (conf AzureStorageConfig) Destination() string { + return fmt.Sprintf("azure container %s at %s", conf.Container, conf.CloudDomain) +} + +func (conf AzureStorageConfig) CreateController(context.Context) (StorageController, error) { + client, err := createAzBlobClient(conf) if err != nil { return nil, err } return newStorageController[*container.BlobItem]( - config.storageConfig, - fmt.Sprintf("azure container %s at %s", config.Container, config.CloudDomain), - azureStorageImpl{client, config.Container}, + conf.storageConfig, + azureStorageImpl{client, conf.Container}, ), nil + } func createAzBlobClient(config AzureStorageConfig) (*azblob.Client, error) { @@ -60,13 +64,13 @@ func createAzBlobClient(config AzureStorageConfig) (*azblob.Client, error) { // nolint:unused // implements interface storage -func (u azureStorageImpl) UploadSnapshot(ctx context.Context, name string, data io.Reader) error { +func (s azureStorageImpl) uploadSnapshot(ctx context.Context, name string, data io.Reader) error { uploadOptions := &azblob.UploadStreamOptions{ BlockSize: 4 * 1024 * 1024, Concurrency: 16, } - if _, err := u.client.UploadStream(ctx, u.container, name, data, uploadOptions); err != nil { + if _, err := s.client.UploadStream(ctx, s.container, name, data, uploadOptions); err != nil { return err } @@ -75,8 +79,8 @@ func (u azureStorageImpl) UploadSnapshot(ctx context.Context, name string, data // nolint:unused // implements interface storage -func (u azureStorageImpl) DeleteSnapshot(ctx context.Context, snapshot *container.BlobItem) error { - if _, err := u.client.DeleteBlob(ctx, u.container, *snapshot.Name, nil); err != nil { +func (s azureStorageImpl) deleteSnapshot(ctx context.Context, snapshot *container.BlobItem) error { + if _, err := s.client.DeleteBlob(ctx, s.container, *snapshot.Name, nil); err != nil { return err } @@ -85,12 +89,12 @@ func (u azureStorageImpl) DeleteSnapshot(ctx context.Context, snapshot *containe // nolint:unused // implements interface storage -func (u azureStorageImpl) ListSnapshots(ctx context.Context, prefix string, _ string) ([]*container.BlobItem, error) { +func (s azureStorageImpl) listSnapshots(ctx context.Context, prefix string, _ string) ([]*container.BlobItem, error) { var results []*container.BlobItem var maxResults int32 = 500 - pager := u.client.NewListBlobsFlatPager(u.container, &azblob.ListBlobsFlatOptions{ + pager := s.client.NewListBlobsFlatPager(s.container, &azblob.ListBlobsFlatOptions{ Prefix: &prefix, MaxResults: &maxResults, }) @@ -110,6 +114,6 @@ func (u azureStorageImpl) ListSnapshots(ctx context.Context, prefix string, _ st // nolint:unused // implements interface storage -func (u azureStorageImpl) GetLastModifiedTime(snapshot *container.BlobItem) time.Time { +func (s azureStorageImpl) getLastModifiedTime(snapshot *container.BlobItem) time.Time { return *snapshot.Properties.LastModified } diff --git a/internal/agent/storage/config.go b/internal/agent/storage/config.go index bfd4efb..42c85a7 100644 --- a/internal/agent/storage/config.go +++ b/internal/agent/storage/config.go @@ -13,7 +13,7 @@ type StoragesConfig struct { Swift SwiftStorageConfig `default:"{\"Empty\": true}"` } -// StorageConfigDefaults specified the default values of storageConfig for all controllers +// StorageConfigDefaults specified the default values of storageConfig for all factories type StorageConfigDefaults struct { Frequency time.Duration `default:"1h"` Retain int @@ -34,42 +34,42 @@ type storageConfig struct { TimestampFormat string } -func (c storageConfig) FrequencyOrDefault(defaults StorageConfigDefaults) time.Duration { +func (c storageConfig) frequencyOrDefault(defaults StorageConfigDefaults) time.Duration { if c.Frequency > 0 { return c.Frequency } return defaults.Frequency } -func (c storageConfig) RetainOrDefault(defaults StorageConfigDefaults) int { +func (c storageConfig) retainOrDefault(defaults StorageConfigDefaults) int { if c.Retain >= 0 { return c.Retain } return defaults.Retain } -func (c storageConfig) TimeoutOrDefault(defaults StorageConfigDefaults) time.Duration { +func (c storageConfig) timeoutOrDefault(defaults StorageConfigDefaults) time.Duration { if c.Timeout > 0 { return c.Timeout } return defaults.Timeout } -func (c storageConfig) NamePrefixOrDefault(defaults StorageConfigDefaults) string { +func (c storageConfig) namePrefixOrDefault(defaults StorageConfigDefaults) string { if c.NamePrefix != "" { return c.NamePrefix } return defaults.NamePrefix } -func (c storageConfig) NameSuffixOrDefault(defaults StorageConfigDefaults) string { +func (c storageConfig) nameSuffixOrDefault(defaults StorageConfigDefaults) string { if c.NameSuffix != "" { return c.NameSuffix } return defaults.NameSuffix } -func (c storageConfig) TimestampFormatOrDefault(defaults StorageConfigDefaults) string { +func (c storageConfig) timestampFormatOrDefault(defaults StorageConfigDefaults) string { if c.TimestampFormat != "" { return c.TimestampFormat } diff --git a/internal/agent/storage/controller.go b/internal/agent/storage/controller.go index edb8410..5bc5b72 100644 --- a/internal/agent/storage/controller.go +++ b/internal/agent/storage/controller.go @@ -10,76 +10,70 @@ import ( "time" ) -// storageControllerImpl implements storageController. +// storageControllerImpl implements StorageController. // Access to the storage-location is delegated to the given storage. // Options like upload-frequency, snapshot-naming are configured by the given storageControllerConfig. type storageControllerImpl[S any] struct { - config storageControllerConfig - destination string - storage storage[S] - lastUpload time.Time + config storageControllerConfig + storage storage[S] + lastUpload time.Time } // storage defines the interface used by storageControllerImpl to access a storage-location type storage[S any] interface { - UploadSnapshot(ctx context.Context, name string, data io.Reader) error - DeleteSnapshot(ctx context.Context, snapshot S) error - ListSnapshots(ctx context.Context, prefix string, suffix string) ([]S, error) - GetLastModifiedTime(snapshot S) time.Time + uploadSnapshot(ctx context.Context, name string, data io.Reader) error + deleteSnapshot(ctx context.Context, snapshot S) error + listSnapshots(ctx context.Context, prefix string, suffix string) ([]S, error) + getLastModifiedTime(snapshot S) time.Time } // storageControllerConfig defines the interface used by storageControllerImpl to determine its required configuration // options either from the storageConfig for the controlled storage or the global StorageConfigDefaults for all storages type storageControllerConfig interface { - FrequencyOrDefault(StorageConfigDefaults) time.Duration - RetainOrDefault(StorageConfigDefaults) int - TimeoutOrDefault(StorageConfigDefaults) time.Duration - NamePrefixOrDefault(StorageConfigDefaults) string - NameSuffixOrDefault(StorageConfigDefaults) string - TimestampFormatOrDefault(StorageConfigDefaults) string + frequencyOrDefault(StorageConfigDefaults) time.Duration + retainOrDefault(StorageConfigDefaults) int + timeoutOrDefault(StorageConfigDefaults) time.Duration + namePrefixOrDefault(StorageConfigDefaults) string + nameSuffixOrDefault(StorageConfigDefaults) string + timestampFormatOrDefault(StorageConfigDefaults) string } // newStorageController creates a new storageControllerImpl uploading snapshots to the // given storage configured according to the given storageControllerConfig -func newStorageController[S any](config storageControllerConfig, destination string, storage storage[S]) *storageControllerImpl[S] { +func newStorageController[S any](config storageControllerConfig, storage storage[S]) *storageControllerImpl[S] { return &storageControllerImpl[S]{ - config: config, - destination: destination, - storage: storage, + config: config, + storage: storage, } } -func (u *storageControllerImpl[S]) Destination() string { - return u.destination -} - -func (u *storageControllerImpl[S]) ScheduleSnapshot(ctx context.Context, lastSnapshotTime time.Time, defaults StorageConfigDefaults) time.Time { +func (u *storageControllerImpl[S]) ScheduleSnapshot(ctx context.Context, lastSnapshotTime time.Time, defaults StorageConfigDefaults) (time.Time, error) { if err := u.ensureLastUploadTime(ctx, lastSnapshotTime, defaults); err != nil { - logging.Warn("Could not schedule snapshot", "destination", u.destination, "error", err) - return time.Time{} + return time.Time{}, err } - return u.lastUpload.Add(u.config.FrequencyOrDefault(defaults)) + return u.lastUpload.Add(u.config.frequencyOrDefault(defaults)), nil } func (u *storageControllerImpl[S]) UploadSnapshot(ctx context.Context, snapshot io.Reader, timestamp time.Time, defaults StorageConfigDefaults) (bool, time.Time, error) { - frequency := u.config.FrequencyOrDefault(defaults) + frequency := u.config.frequencyOrDefault(defaults) if timestamp.Before(u.lastUpload.Add(frequency)) { - return false, u.ScheduleSnapshot(ctx, timestamp, defaults), nil + nextSnapshot, err := u.ScheduleSnapshot(ctx, timestamp, defaults) + return false, nextSnapshot, err } - ctx, cancel := context.WithTimeout(ctx, u.config.TimeoutOrDefault(defaults)) + ctx, cancel := context.WithTimeout(ctx, u.config.timeoutOrDefault(defaults)) defer cancel() nextSnapshot := timestamp.Add(frequency) - prefix := u.config.NamePrefixOrDefault(defaults) - suffix := u.config.NameSuffixOrDefault(defaults) - ts := timestamp.Format(u.config.TimestampFormatOrDefault(defaults)) + prefix := u.config.namePrefixOrDefault(defaults) + suffix := u.config.nameSuffixOrDefault(defaults) + ts := timestamp.Format(u.config.timestampFormatOrDefault(defaults)) snapshotName := strings.Join([]string{prefix, ts, suffix}, "") - if err := u.storage.UploadSnapshot(ctx, snapshotName, snapshot); err != nil { + if err := u.storage.uploadSnapshot(ctx, snapshotName, snapshot); err != nil { return false, nextSnapshot, err } @@ -89,15 +83,15 @@ func (u *storageControllerImpl[S]) UploadSnapshot(ctx context.Context, snapshot } func (u *storageControllerImpl[S]) DeleteObsoleteSnapshots(ctx context.Context, defaults StorageConfigDefaults) (int, error) { - retain := u.config.RetainOrDefault(defaults) + retain := u.config.retainOrDefault(defaults) if retain < 1 { return 0, nil } - ctx, cancel := context.WithTimeout(ctx, u.config.TimeoutOrDefault(defaults)) + ctx, cancel := context.WithTimeout(ctx, u.config.timeoutOrDefault(defaults)) defer cancel() - snapshots, err := u.listSnapshots(ctx, u.config.NamePrefixOrDefault(defaults), u.config.NameSuffixOrDefault(defaults)) + snapshots, err := u.listSnapshots(ctx, u.config.namePrefixOrDefault(defaults), u.config.nameSuffixOrDefault(defaults)) if err != nil { return 0, err } @@ -108,8 +102,8 @@ func (u *storageControllerImpl[S]) DeleteObsoleteSnapshots(ctx context.Context, deleted := 0 for _, s := range snapshots[retain:] { - if err := u.storage.DeleteSnapshot(ctx, s); err != nil { - logging.Warn("Could not delete snapshot", "snapshot", s, "destination", u.destination, "error", err) + if err := u.storage.deleteSnapshot(ctx, s); err != nil { + logging.Warn("Could not delete snapshot", "snapshot", s, "error", err) } else { deleted++ } @@ -119,13 +113,13 @@ func (u *storageControllerImpl[S]) DeleteObsoleteSnapshots(ctx context.Context, } func (u *storageControllerImpl[S]) listSnapshots(ctx context.Context, prefix string, suffix string) ([]S, error) { - snapshots, err := u.storage.ListSnapshots(ctx, prefix, suffix) + snapshots, err := u.storage.listSnapshots(ctx, prefix, suffix) if err != nil { return nil, err } slices.SortFunc(snapshots, func(a, b S) int { - return u.storage.GetLastModifiedTime(a).Compare(u.storage.GetLastModifiedTime(b)) + return u.storage.getLastModifiedTime(a).Compare(u.storage.getLastModifiedTime(b)) }) return snapshots, nil @@ -147,10 +141,10 @@ func (u *storageControllerImpl[S]) ensureLastUploadTime(ctx context.Context, las } func (u *storageControllerImpl[S]) getLastModificationTime(ctx context.Context, defaults StorageConfigDefaults) (time.Time, error) { - ctx, cancel := context.WithTimeout(ctx, u.config.TimeoutOrDefault(defaults)) + ctx, cancel := context.WithTimeout(ctx, u.config.timeoutOrDefault(defaults)) defer cancel() - snapshots, err := u.storage.ListSnapshots(ctx, u.config.NamePrefixOrDefault(defaults), u.config.NameSuffixOrDefault(defaults)) + snapshots, err := u.storage.listSnapshots(ctx, u.config.namePrefixOrDefault(defaults), u.config.nameSuffixOrDefault(defaults)) if err != nil { return u.lastUpload, err } @@ -159,6 +153,6 @@ func (u *storageControllerImpl[S]) getLastModificationTime(ctx context.Context, return u.lastUpload, errors.New("storage does not contain any snapshots") } - u.lastUpload = u.storage.GetLastModifiedTime(snapshots[0]) + u.lastUpload = u.storage.getLastModifiedTime(snapshots[0]) return u.lastUpload, nil } diff --git a/internal/agent/storage/controller_test.go b/internal/agent/storage/controller_test.go index d486e46..1d5109d 100644 --- a/internal/agent/storage/controller_test.go +++ b/internal/agent/storage/controller_test.go @@ -2,6 +2,7 @@ package storage import ( "context" + "errors" "fmt" "github.com/stretchr/testify/assert" "io" @@ -10,14 +11,6 @@ import ( "time" ) -func TestDestination(t *testing.T) { - controller := &storageControllerImpl[time.Time]{ - destination: "test", - } - - assert.Equal(t, controller.destination, controller.Destination()) -} - func TestScheduleSnapshotPrefersLastUploadTimeAndStorageConfig(t *testing.T) { lastUploadTime := time.Now() config := storageConfigStub{ @@ -31,7 +24,9 @@ func TestScheduleSnapshotPrefersLastUploadTimeAndStorageConfig(t *testing.T) { lastUpload: lastUploadTime, } - assert.Equal(t, lastUploadTime.Add(config.Frequency), controller.ScheduleSnapshot(context.Background(), time.Time{}, StorageConfigDefaults{})) + nextSnapshot, err := controller.ScheduleSnapshot(context.Background(), time.Time{}, StorageConfigDefaults{}) + assert.NoError(t, err, "ScheduleSnapshot failed unexpectedly") + assert.Equal(t, lastUploadTime.Add(config.Frequency), nextSnapshot) } func TestScheduleSnapshotFallsBackOnStorageConfigDefaults(t *testing.T) { @@ -45,7 +40,9 @@ func TestScheduleSnapshotFallsBackOnStorageConfigDefaults(t *testing.T) { lastUpload: lastUploadTime, } - assert.Equal(t, lastUploadTime.Add(defaults.Frequency), controller.ScheduleSnapshot(context.Background(), time.Time{}, defaults)) + nextSnapshot, err := controller.ScheduleSnapshot(context.Background(), time.Time{}, defaults) + assert.NoError(t, err, "ScheduleSnapshot failed unexpectedly") + assert.Equal(t, lastUploadTime.Add(defaults.Frequency), nextSnapshot) } func TestScheduleSnapshotFallsBackOnLastSnapshotTime(t *testing.T) { @@ -60,7 +57,9 @@ func TestScheduleSnapshotFallsBackOnLastSnapshotTime(t *testing.T) { config: config, } - assert.Equal(t, lastSnapshotTime.Add(config.Frequency), controller.ScheduleSnapshot(context.Background(), lastSnapshotTime, StorageConfigDefaults{})) + nextSnapshot, err := controller.ScheduleSnapshot(context.Background(), lastSnapshotTime, StorageConfigDefaults{}) + assert.NoError(t, err, "ScheduleSnapshot failed unexpectedly") + assert.Equal(t, lastSnapshotTime.Add(config.Frequency), nextSnapshot) } func TestScheduleSnapshotFallsBackOnStorageLastModifiedTime(t *testing.T) { @@ -80,7 +79,9 @@ func TestScheduleSnapshotFallsBackOnStorageLastModifiedTime(t *testing.T) { storage: storage, } - assert.Equal(t, storageLastModifiedTime.Add(config.Frequency), controller.ScheduleSnapshot(context.Background(), time.Time{}, StorageConfigDefaults{})) + nextSnapshot, err := controller.ScheduleSnapshot(context.Background(), time.Time{}, StorageConfigDefaults{}) + assert.NoError(t, err, "ScheduleSnapshot failed unexpectedly") + assert.Equal(t, storageLastModifiedTime.Add(config.Frequency), nextSnapshot) } func TestScheduleSnapshotReturnsZeroIfNoFallbackPossible(t *testing.T) { @@ -97,7 +98,9 @@ func TestScheduleSnapshotReturnsZeroIfNoFallbackPossible(t *testing.T) { storage: storage, } - assert.Zero(t, controller.ScheduleSnapshot(context.Background(), time.Time{}, StorageConfigDefaults{})) + nextSnapshot, err := controller.ScheduleSnapshot(context.Background(), time.Time{}, StorageConfigDefaults{}) + assert.Error(t, err) + assert.Zero(t, nextSnapshot) } func TestUploadSnapshotUploadsToStorage(t *testing.T) { @@ -127,7 +130,7 @@ func TestUploadSnapshotUploadsToStorage(t *testing.T) { timestamp, StorageConfigDefaults{}, ) - assert.NoError(t, err, "UploadSnapshot failed unexpectedly") + assert.NoError(t, err, "uploadSnapshot failed unexpectedly") assert.True(t, uploaded) assert.Equal(t, timestamp.Add(config.Frequency), nextSnapshot) @@ -140,6 +143,33 @@ func TestUploadSnapshotUploadsToStorage(t *testing.T) { assert.Equal(t, data, storage.uploadData) } +func TestUploadSnapshotHandlesStorageFailure(t *testing.T) { + config := storageConfigStub{ + storageConfig{ + Frequency: time.Minute, + }, + } + + storage := &storageStub{uploadFails: true} + controller := &storageControllerImpl[time.Time]{ + config: config, + storage: storage, + } + + ctx := context.Background() + timestamp := time.Now() + uploaded, nextSnapshot, err := controller.UploadSnapshot( + ctx, + strings.NewReader("test"), + timestamp, + StorageConfigDefaults{}, + ) + + assert.False(t, uploaded) + assert.Error(t, err, "uploadSnapshot should return error if storage fails") + assert.Equal(t, timestamp.Add(config.Frequency), nextSnapshot) +} + func TestDeletesObsoleteSnapshots(t *testing.T) { config := storageConfigStub{ storageConfig{ @@ -159,7 +189,7 @@ func TestDeletesObsoleteSnapshots(t *testing.T) { } deleted, err := controller.DeleteObsoleteSnapshots(context.Background(), StorageConfigDefaults{}) - assert.NoError(t, err, "UploadSnapshot failed unexpectedly") + assert.NoError(t, err, "DeleteObsoleteSnapshots failed unexpectedly") assert.Equal(t, 2, deleted) assert.Equal(t, []time.Time{now.Add(time.Second), now.Add(time.Second * 2)}, storage.snapshots) @@ -185,7 +215,7 @@ func TestDeleteObsoleteSnapshotsIgnoresFailures(t *testing.T) { } deleted, err := controller.DeleteObsoleteSnapshots(context.Background(), StorageConfigDefaults{}) - assert.NoError(t, err, "UploadSnapshot failed unexpectedly") + assert.NoError(t, err, "DeleteObsoleteSnapshots failed unexpectedly") assert.Equal(t, 1, deleted) assert.Equal(t, []time.Time{now.Add(time.Second), now.Add(time.Second * 2), now.Add(time.Hour)}, storage.snapshots) @@ -194,8 +224,7 @@ func TestDeleteObsoleteSnapshotsIgnoresFailures(t *testing.T) { func TestDeleteObsoleteSnapshotsSkipsWhenNothingToRetain(t *testing.T) { config := storageConfigStub{ storageConfig{ - Retain: 0, - NamePrefix: "test", + Retain: 0, }, } @@ -207,12 +236,52 @@ func TestDeleteObsoleteSnapshotsSkipsWhenNothingToRetain(t *testing.T) { } deleted, err := controller.DeleteObsoleteSnapshots(context.Background(), StorageConfigDefaults{}) - assert.NoError(t, err, "UploadSnapshot failed unexpectedly") + assert.NoError(t, err, "DeleteObsoleteSnapshots failed unexpectedly") assert.Equal(t, 0, deleted) assert.Zero(t, storage.listPrefix) } +func TestDeleteObsoleteSnapshotsHandlesStorageFailure(t *testing.T) { + config := storageConfigStub{ + storageConfig{ + Retain: 1, + }, + } + + storage := &storageStub{listFails: true} + controller := &storageControllerImpl[time.Time]{ + config: config, + storage: storage, + } + + deleted, err := controller.DeleteObsoleteSnapshots(context.Background(), StorageConfigDefaults{}) + assert.Equal(t, 0, deleted) + assert.Error(t, err, "DeleteObsoleteSnapshots should fail if storage fails") +} + +func TestDeleteObsoleteSnapshotsSkipsWhenNothingToDelete(t *testing.T) { + config := storageConfigStub{ + storageConfig{ + Retain: 1, + NamePrefix: "test", + }, + } + + storage := &storageStub{snapshots: []time.Time{time.Now().Add(time.Minute)}} + controller := &storageControllerImpl[time.Time]{ + config: config, + storage: storage, + } + + deleted, err := controller.DeleteObsoleteSnapshots(context.Background(), StorageConfigDefaults{}) + assert.NoError(t, err, "DeleteObsoleteSnapshots failed unexpectedly") + + assert.Equal(t, 0, deleted) + assert.Equal(t, config.NamePrefix, storage.listPrefix) + assert.False(t, storage.deleted) +} + func TestUploadSnapshotSkipsUploadBeforeScheduledTime(t *testing.T) { config := storageConfigStub{ storageConfig{ @@ -233,7 +302,7 @@ func TestUploadSnapshotSkipsUploadBeforeScheduledTime(t *testing.T) { controller.lastUpload.Add(time.Second), StorageConfigDefaults{}, ) - assert.NoError(t, err, "UploadSnapshot failed unexpectedly") + assert.NoError(t, err, "uploadSnapshot failed unexpectedly") assert.False(t, uploaded) assert.Equal(t, controller.lastUpload.Add(config.Frequency), nextSnapshot) @@ -247,9 +316,11 @@ type storageConfigStub struct { type storageStub struct { snapshots []time.Time uploadContext context.Context + uploadFails bool uploadName string uploadData string deleteFailures []time.Time + listFails bool listPrefix string listSuffix string deleted bool @@ -257,7 +328,7 @@ type storageStub struct { // nolint:unused // implements interface storage -func (stub *storageStub) UploadSnapshot(ctx context.Context, name string, data io.Reader) error { +func (stub *storageStub) uploadSnapshot(ctx context.Context, name string, data io.Reader) error { stub.uploadContext = ctx stub.uploadName = name upload, err := io.ReadAll(data) @@ -265,12 +336,15 @@ func (stub *storageStub) UploadSnapshot(ctx context.Context, name string, data i return err } stub.uploadData = string(upload) + if stub.uploadFails { + return errors.New("upload failed") + } return nil } // nolint:unused // implements interface storage -func (stub *storageStub) DeleteSnapshot(_ context.Context, snapshot time.Time) error { +func (stub *storageStub) deleteSnapshot(_ context.Context, snapshot time.Time) error { stub.deleted = false for _, s := range stub.deleteFailures { if s == snapshot { @@ -291,15 +365,19 @@ func (stub *storageStub) DeleteSnapshot(_ context.Context, snapshot time.Time) e // nolint:unused // implements interface storage -func (stub *storageStub) ListSnapshots(_ context.Context, prefix string, suffix string) ([]time.Time, error) { +func (stub *storageStub) listSnapshots(_ context.Context, prefix string, suffix string) ([]time.Time, error) { stub.listPrefix = prefix stub.listSuffix = suffix + if stub.listFails { + return nil, errors.New("listing failed") + } + return stub.snapshots, nil } // nolint:unused // implements interface storage -func (stub *storageStub) GetLastModifiedTime(snapshot time.Time) time.Time { +func (stub *storageStub) getLastModifiedTime(snapshot time.Time) time.Time { return snapshot } diff --git a/internal/agent/storage/gcp.go b/internal/agent/storage/gcp.go index 1dff41e..a0d5f4b 100644 --- a/internal/agent/storage/gcp.go +++ b/internal/agent/storage/gcp.go @@ -21,22 +21,25 @@ type gcpStorageImpl struct { bucket *gcpStorage.BucketHandle } -func createGCPStorageController(ctx context.Context, config GCPStorageConfig) (*storageControllerImpl[gcpStorage.ObjectAttrs], error) { +func (conf GCPStorageConfig) Destination() string { + return fmt.Sprintf("gcp bucket %s", conf.Bucket) +} + +func (conf GCPStorageConfig) CreateController(ctx context.Context) (StorageController, error) { client, err := gcpStorage.NewClient(ctx) if err != nil { return nil, err } return newStorageController[gcpStorage.ObjectAttrs]( - config.storageConfig, - fmt.Sprintf("gcp bucket %s", config.Bucket), - gcpStorageImpl{client.Bucket(config.Bucket)}, + conf.storageConfig, + gcpStorageImpl{client.Bucket(conf.Bucket)}, ), nil } // nolint:unused // implements interface storage -func (u gcpStorageImpl) UploadSnapshot(ctx context.Context, name string, data io.Reader) error { +func (u gcpStorageImpl) uploadSnapshot(ctx context.Context, name string, data io.Reader) error { obj := u.bucket.Object(name) w := obj.NewWriter(ctx) @@ -53,7 +56,7 @@ func (u gcpStorageImpl) UploadSnapshot(ctx context.Context, name string, data io // nolint:unused // implements interface storage -func (u gcpStorageImpl) DeleteSnapshot(ctx context.Context, snapshot gcpStorage.ObjectAttrs) error { +func (u gcpStorageImpl) deleteSnapshot(ctx context.Context, snapshot gcpStorage.ObjectAttrs) error { obj := u.bucket.Object(snapshot.Name) if err := obj.Delete(ctx); err != nil { return err @@ -64,7 +67,7 @@ func (u gcpStorageImpl) DeleteSnapshot(ctx context.Context, snapshot gcpStorage. // nolint:unused // implements interface storage -func (u gcpStorageImpl) ListSnapshots(ctx context.Context, prefix string, _ string) ([]gcpStorage.ObjectAttrs, error) { +func (u gcpStorageImpl) listSnapshots(ctx context.Context, prefix string, _ string) ([]gcpStorage.ObjectAttrs, error) { var result []gcpStorage.ObjectAttrs query := &gcpStorage.Query{Prefix: prefix} @@ -86,6 +89,6 @@ func (u gcpStorageImpl) ListSnapshots(ctx context.Context, prefix string, _ stri // nolint:unused // implements interface storage -func (u gcpStorageImpl) GetLastModifiedTime(snapshot gcpStorage.ObjectAttrs) time.Time { +func (u gcpStorageImpl) getLastModifiedTime(snapshot gcpStorage.ObjectAttrs) time.Time { return snapshot.Updated } diff --git a/internal/agent/storage/local.go b/internal/agent/storage/local.go index 59db239..37fb884 100644 --- a/internal/agent/storage/local.go +++ b/internal/agent/storage/local.go @@ -20,17 +20,20 @@ type localStorageImpl struct { path string } -func createLocalStorageController(_ context.Context, config LocalStorageConfig) (*storageControllerImpl[os.FileInfo], error) { +func (conf LocalStorageConfig) Destination() string { + return fmt.Sprintf("local path %s", conf.Path) +} + +func (conf LocalStorageConfig) CreateController(context.Context) (StorageController, error) { return newStorageController[os.FileInfo]( - config.storageConfig, - fmt.Sprintf("local path %s", config.Path), + conf.storageConfig, localStorageImpl{ - path: config.Path, + path: conf.Path, }, ), nil } -func (u localStorageImpl) UploadSnapshot(_ context.Context, name string, data io.Reader) error { +func (u localStorageImpl) uploadSnapshot(_ context.Context, name string, data io.Reader) error { fileName := fmt.Sprintf("%s/%s", u.path, name) file, err := os.Create(fileName) @@ -43,7 +46,7 @@ func (u localStorageImpl) UploadSnapshot(_ context.Context, name string, data io return multierr.Append(err, file.Close()) } -func (u localStorageImpl) DeleteSnapshot(_ context.Context, snapshot os.FileInfo) error { +func (u localStorageImpl) deleteSnapshot(_ context.Context, snapshot os.FileInfo) error { if err := os.Remove(fmt.Sprintf("%s/%s", u.path, snapshot.Name())); err != nil { return err } @@ -51,7 +54,7 @@ func (u localStorageImpl) DeleteSnapshot(_ context.Context, snapshot os.FileInfo return nil } -func (u localStorageImpl) ListSnapshots(_ context.Context, prefix string, ext string) ([]os.FileInfo, error) { +func (u localStorageImpl) listSnapshots(_ context.Context, prefix string, ext string) ([]os.FileInfo, error) { var snapshots []os.FileInfo files, err := os.ReadDir(u.path) @@ -72,6 +75,6 @@ func (u localStorageImpl) ListSnapshots(_ context.Context, prefix string, ext st return snapshots, nil } -func (u localStorageImpl) GetLastModifiedTime(snapshot os.FileInfo) time.Time { +func (u localStorageImpl) getLastModifiedTime(snapshot os.FileInfo) time.Time { return snapshot.ModTime() } diff --git a/internal/agent/storage/local_test.go b/internal/agent/storage/local_test.go index f191fc9..1c691b0 100644 --- a/internal/agent/storage/local_test.go +++ b/internal/agent/storage/local_test.go @@ -12,32 +12,25 @@ import ( "testing" ) -func TestLocalDestination(t *testing.T) { - config := LocalStorageConfig{Path: "/test"} - impl, _ := createLocalStorageController(context.Background(), config) - - assert.Equal(t, "local path /test", impl.Destination()) -} - func TestLocalUploadSnapshotFailsIfFileCannotBeCreated(t *testing.T) { impl := localStorageImpl{"./does/not/exist"} - err := impl.UploadSnapshot(context.Background(), "test", &bytes.Buffer{}) + err := impl.uploadSnapshot(context.Background(), "test", &bytes.Buffer{}) - assert.Error(t, err, "UploadSnapshot() should fail if file could not be created!") + assert.Error(t, err, "uploadSnapshot() should fail if file could not be created!") } func TestLocalUploadeSnapshotCreatesFile(t *testing.T) { impl := localStorageImpl{t.TempDir()} snapshotData := []byte("test") - err := impl.UploadSnapshot(context.Background(), "test.snap", bytes.NewReader(snapshotData)) + err := impl.uploadSnapshot(context.Background(), "test.snap", bytes.NewReader(snapshotData)) - assert.NoError(t, err, "UploadSnapshot() failed unexpectedly!") + assert.NoError(t, err, "uploadSnapshot() failed unexpectedly!") backupData, err := os.ReadFile(fmt.Sprintf("%s/test.snap", impl.path)) - assert.NoError(t, err, "UploadSnapshot() failed unexpectedly!") + assert.NoError(t, err, "uploadSnapshot() failed unexpectedly!") assert.Equal(t, snapshotData, backupData) } @@ -49,14 +42,14 @@ func TestLocalDeleteSnapshot(t *testing.T) { _ = os.RemoveAll(filepath.Dir(impl.path)) }() - err := impl.UploadSnapshot(context.Background(), "test.snap", bytes.NewReader(snapshotData)) - assert.NoError(t, err, "UploadSnapshot() failed unexpectedly!") + err := impl.uploadSnapshot(context.Background(), "test.snap", bytes.NewReader(snapshotData)) + assert.NoError(t, err, "uploadSnapshot() failed unexpectedly!") info, err := os.Stat(fmt.Sprintf("%s/test.snap", impl.path)) assert.NoError(t, err, "could not get info for snapshot: %v", err) - err = impl.DeleteSnapshot(context.Background(), info) - assert.NoError(t, err, "DeleteSnapshot() failed unexpectedly!") + err = impl.deleteSnapshot(context.Background(), info) + assert.NoError(t, err, "deleteSnapshot() failed unexpectedly!") _, err = os.Stat(fmt.Sprintf("%s/test.snap", impl.path)) assert.Error(t, err) @@ -71,7 +64,7 @@ func TestLocalListSnapshots(t *testing.T) { expectedSnaphotNames = append(expectedSnaphotNames, createEmptySnapshot(t, impl.path, "test", ".snap").Name()) } - listedSnapshots, err := impl.ListSnapshots(context.Background(), "test", ".snap") + listedSnapshots, err := impl.listSnapshots(context.Background(), "test", ".snap") listedSnapshotNames := funk.Map(listedSnapshots, func(s os.FileInfo) string { return s.Name() }) assert.NoError(t, err) @@ -83,7 +76,7 @@ func TestLocalGetLastModifiedTime(t *testing.T) { impl := localStorageImpl{t.TempDir()} snapshot := createEmptySnapshot(t, impl.path, "test", ".snap") - assert.Equal(t, snapshot.ModTime(), impl.GetLastModifiedTime(snapshot)) + assert.Equal(t, snapshot.ModTime(), impl.getLastModifiedTime(snapshot)) } func createEmptySnapshot(t *testing.T, dir string, prefix string, suffix string) os.FileInfo { diff --git a/internal/agent/storage/manager.go b/internal/agent/storage/manager.go index bfdd8e0..071902f 100644 --- a/internal/agent/storage/manager.go +++ b/internal/agent/storage/manager.go @@ -9,21 +9,26 @@ import ( ) // Manager manages the upload of a snapshot to one or multiple storage-locations -// using storageController-instances configured by StoragesConfig +// using StorageController-instances configured by StoragesConfig type Manager struct { - controllers []storageController + factories []StorageControllerFactory } -// storageController defines the interface required by the Manager to communicate with storageControllerImpl-instances -type storageController interface { +type StorageControllerFactory interface { + // CreateController creates a new controller connection to a storage location + CreateController(context.Context) (StorageController, error) // Destination returns information about the location of the controlled storage Destination() string +} + +// StorageController defines the interface required by the Manager to communicate with storageControllerImpl-instances +type StorageController interface { // ScheduleSnapshot schedules the next upload to the controlled storage. // If the time of the last upload can not be determined by the controller, // it may use the time of the last snapshot given as fallback // For the case that the storageConfig of the controller does not specify one of its fields, // StorageConfigDefaults is passed. - ScheduleSnapshot(ctx context.Context, lastSnapshot time.Time, defaults StorageConfigDefaults) time.Time + ScheduleSnapshot(ctx context.Context, lastSnapshot time.Time, defaults StorageConfigDefaults) (time.Time, error) // UploadSnapshot uploads the given snapshot to the controlled storage, if the timestamp of the snapshot // corresponds with its scheduled upload-date. // For the case that the storageConfig of the controller does not specify one of its fields, @@ -32,77 +37,63 @@ type storageController interface { DeleteObsoleteSnapshots(ctx context.Context, defaults StorageConfigDefaults) (int, error) } -// CreateManager creates a Manager controlling the storageController-instances +// CreateManager creates a Manager controlling the StorageController-instances // configured according to the given StoragesConfig and StorageConfigDefaults -func CreateManager(ctx context.Context, storageConfig StoragesConfig) (*Manager, error) { +func CreateManager(storageConfig StoragesConfig) *Manager { manager := &Manager{} if !storageConfig.AWS.Empty { - aws, err := createAWSStorageController(ctx, storageConfig.AWS) - if err != nil { - return nil, err - } - manager.AddStorage(aws) + manager.AddStorageFactory(storageConfig.AWS) } if !storageConfig.Azure.Empty { - azure, err := createAzureStorageController(ctx, storageConfig.Azure) - if err != nil { - return nil, err - } - manager.AddStorage(azure) + manager.AddStorageFactory(storageConfig.Azure) } if !storageConfig.GCP.Empty { - gcp, err := createGCPStorageController(ctx, storageConfig.GCP) - if err != nil { - return nil, err - } - manager.AddStorage(gcp) + manager.AddStorageFactory(storageConfig.GCP) } if !storageConfig.Local.Empty { - local, err := createLocalStorageController(ctx, storageConfig.Local) - if err != nil { - return nil, err - } - - manager.AddStorage(local) + manager.AddStorageFactory(storageConfig.Local) } if !storageConfig.Swift.Empty { - swift, err := createSwiftStorageController(ctx, storageConfig.Swift) - if err != nil { - return nil, err - } - manager.AddStorage(swift) + manager.AddStorageFactory(storageConfig.Swift) } - return manager, nil + return manager } -// AddStorage adds a storageController to the manager -// Allows adding of storageController-implementations for testing -func (m *Manager) AddStorage(controller storageController) { - m.controllers = append(m.controllers, controller) +// AddStorageFactory adds a StorageController to the manager +// Allows adding of StorageController-implementations for testing +func (m *Manager) AddStorageFactory(factory StorageControllerFactory) { + m.factories = append(m.factories, factory) } // ScheduleSnapshot schedules the next snapshot. -// Scheduling of snapshot is delegated to the storageController-instances; the earliest time calculated by all -// controllers is returned. The given time when the last snapshot was taken is passed on to the controllers as fallback +// Scheduling of snapshot is delegated to the StorageController-instances; the earliest time calculated by all +// factories is returned. The given time when the last snapshot was taken is passed on to the factories as fallback // if the time of the last upload cannot be determined func (m *Manager) ScheduleSnapshot(ctx context.Context, lastSnapshotTime time.Time, defaults StorageConfigDefaults) time.Time { nextSnapshot := time.Time{} - for _, controller := range m.controllers { - candidate := controller.ScheduleSnapshot(ctx, lastSnapshotTime, defaults) - if nextSnapshot.IsZero() || candidate.Before(nextSnapshot) { - nextSnapshot = candidate + for _, factory := range m.factories { + controller, err := factory.CreateController(ctx) + if err != nil { + logging.Warn("Could not create controller", "destination", factory.Destination(), "error", err) + } else { + candidate, err := controller.ScheduleSnapshot(ctx, lastSnapshotTime, defaults) + if err != nil { + logging.Warn("Could not schedule snapshot", "destination", factory.Destination(), "error", err) + } else if nextSnapshot.IsZero() || candidate.Before(nextSnapshot) { + nextSnapshot = candidate + } } } return nextSnapshot } -// UploadSnapshot uploads the given snapshot to all storages controlled by the storageController-instances +// UploadSnapshot uploads the given snapshot to all storages controlled by the StorageController-instances // and returns the time the next snapshot should be taken. -// Whether the snapshot is actually uploaded to a storage is controlled by the storageController based +// Whether the snapshot is actually uploaded to a storage is controlled by the StorageController based // on the upload-frequency in its StoragesConfig func (m *Manager) UploadSnapshot(ctx context.Context, snapshot io.ReadSeeker, timestamp time.Time, defaults StorageConfigDefaults) time.Time { var ( @@ -110,30 +101,36 @@ func (m *Manager) UploadSnapshot(ctx context.Context, snapshot io.ReadSeeker, ti errs error ) - for _, controller := range m.controllers { + for _, factory := range m.factories { if _, err := snapshot.Seek(0, io.SeekStart); err != nil { logging.Error("Could not reset snapshot before uploading", "error", err) return timestamp.Add(defaults.Frequency) } - uploaded, candidate, err := controller.UploadSnapshot(ctx, snapshot, timestamp, defaults) - if nextSnapshot.IsZero() || candidate.Before(nextSnapshot) { - nextSnapshot = candidate - } - + controller, err := factory.CreateController(ctx) if err != nil { - logging.Warn("Could not upload snapshot", "destination", controller.Destination(), "error", err, "nextSnapshot", candidate) + logging.Warn("Could not create storage-controller", "destination", factory.Destination(), "error", err) errs = multierr.Append(errs, err) - } else if !uploaded { - logging.Debug("Skipped upload of snapshot", "destination", controller.Destination(), "nextSnapshot", candidate) } else { - logging.Debug("Successfully uploaded snapshot", "destination", controller.Destination(), "nextSnapshot", candidate) + uploaded, candidate, err := controller.UploadSnapshot(ctx, snapshot, timestamp, defaults) + if nextSnapshot.IsZero() || candidate.Before(nextSnapshot) { + nextSnapshot = candidate + } - deleted, err := controller.DeleteObsoleteSnapshots(ctx, defaults) if err != nil { - logging.Warn("Could not delete obsolete snapshots", "destination", controller.Destination(), "error", err) - } else if deleted > 0 { - logging.Debug("Deleted obsolete snapshots", "destination", controller.Destination(), "deleted", deleted) + logging.Warn("Could not upload snapshot", "destination", factory.Destination(), "error", err, "nextSnapshot", candidate) + errs = multierr.Append(errs, err) + } else if !uploaded { + logging.Debug("Skipped upload of snapshot", "destination", factory.Destination(), "nextSnapshot", candidate) + } else { + logging.Debug("Successfully uploaded snapshot", "destination", factory.Destination(), "nextSnapshot", candidate) + + deleted, err := controller.DeleteObsoleteSnapshots(ctx, defaults) + if err != nil { + logging.Warn("Could not delete obsolete snapshots", "destination", factory.Destination(), "error", err) + } else if deleted > 0 { + logging.Debug("Deleted obsolete snapshots", "destination", factory.Destination(), "deleted", deleted) + } } } } diff --git a/internal/agent/storage/manager_test.go b/internal/agent/storage/manager_test.go index d7427e7..e3547d2 100644 --- a/internal/agent/storage/manager_test.go +++ b/internal/agent/storage/manager_test.go @@ -13,15 +13,42 @@ import ( func TestManagerSchedulesEarliestNextSnapshot(t *testing.T) { controller1 := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond * 2)} controller2 := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond)} - manager := Manager{[]storageController{controller1, controller2}} + manager := Manager{ + []StorageControllerFactory{ + storageControllerFactoryStub{controller: controller1}, + storageControllerFactoryStub{controller: controller2}, + }, + } assert.Equal(t, controller2.nextSnapshot, manager.ScheduleSnapshot(context.Background(), controller1.nextSnapshot, StorageConfigDefaults{})) } +func TestScheduleSnapshotIgnoresFactoryAndControllerFailure(t *testing.T) { + controller1 := &storageControllerStub{scheduleFails: true} + controller2 := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond)} + manager := Manager{ + []StorageControllerFactory{ + storageControllerFactoryStub{createFails: true}, + storageControllerFactoryStub{controller: controller1}, + storageControllerFactoryStub{controller: controller2}, + }, + } + + defaults := StorageConfigDefaults{} + nextSnapshot := manager.ScheduleSnapshot(context.Background(), time.Time{}, defaults) + + assert.Equal(t, controller2.nextSnapshot, nextSnapshot) +} + func TestManagerUploadsToAllControllers(t *testing.T) { controller1 := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond * 2)} controller2 := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond)} - manager := Manager{[]storageController{controller1, controller2}} + manager := Manager{ + []StorageControllerFactory{ + storageControllerFactoryStub{controller: controller1}, + storageControllerFactoryStub{controller: controller2}, + }, + } data := "test" nextSnapshot := manager.UploadSnapshot(context.Background(), strings.NewReader(data), controller1.nextSnapshot, StorageConfigDefaults{}) @@ -36,7 +63,12 @@ func TestManagerUploadsToAllControllers(t *testing.T) { func TestManagerDeletesObsoleteSnapshotsWithAllControllers(t *testing.T) { controller1 := &storageControllerStub{} controller2 := &storageControllerStub{} - manager := Manager{[]storageController{controller1, controller2}} + manager := Manager{ + []StorageControllerFactory{ + storageControllerFactoryStub{controller: controller1}, + storageControllerFactoryStub{controller: controller2}, + }, + } defaults := StorageConfigDefaults{Retain: 2} _ = manager.UploadSnapshot(context.Background(), strings.NewReader("test"), controller1.nextSnapshot, defaults) @@ -45,12 +77,18 @@ func TestManagerDeletesObsoleteSnapshotsWithAllControllers(t *testing.T) { assert.Equal(t, defaults, controller2.deleteDefaults) } -func TestManagerIgnoresControllerFailures(t *testing.T) { +func TestManagerIgnoresFactoryAndControllerFailure(t *testing.T) { controller1 := &storageControllerStub{uploadFails: true, nextSnapshot: time.Now().Add(time.Millisecond)} controller2 := &storageControllerStub{deleteFails: true, nextSnapshot: time.Now().Add(time.Millisecond * 2)} controller3 := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond * 3)} - - manager := Manager{[]storageController{controller1, controller2, controller3}} + manager := Manager{ + []StorageControllerFactory{ + storageControllerFactoryStub{createFails: true}, + storageControllerFactoryStub{controller: controller1}, + storageControllerFactoryStub{controller: controller2}, + storageControllerFactoryStub{controller: controller3}, + }, + } data := "test" defaults := StorageConfigDefaults{} @@ -69,7 +107,12 @@ func TestManagerIgnoresControllerFailures(t *testing.T) { func TestManagerIgnoresSkippedControllers(t *testing.T) { controller1 := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond * 2)} controller2 := &storageControllerStub{nextSnapshot: time.Now().Add(time.Millisecond)} - manager := Manager{[]storageController{controller1, controller2}} + manager := Manager{ + []StorageControllerFactory{ + storageControllerFactoryStub{controller: controller1}, + storageControllerFactoryStub{controller: controller2}, + }, + } data := "test" nextSnapshot := manager.UploadSnapshot(context.Background(), strings.NewReader(data), controller2.nextSnapshot, StorageConfigDefaults{}) @@ -81,7 +124,11 @@ func TestManagerIgnoresSkippedControllers(t *testing.T) { func TestManagerFailsIfSnapshotCannotBeReset(t *testing.T) { controller := &storageControllerStub{} - manager := Manager{[]storageController{controller}} + manager := Manager{ + []StorageControllerFactory{ + storageControllerFactoryStub{controller: controller}, + }, + } defaults := StorageConfigDefaults{Frequency: time.Second} timestamp := time.Now() @@ -91,8 +138,25 @@ func TestManagerFailsIfSnapshotCannotBeReset(t *testing.T) { assert.Zero(t, controller.uploadData) } +type storageControllerFactoryStub struct { + createFails bool + controller *storageControllerStub +} + +func (stub storageControllerFactoryStub) Destination() string { + return "" +} + +func (stub storageControllerFactoryStub) CreateController(context.Context) (StorageController, error) { + if stub.createFails { + return nil, errors.New("create failed") + } + return stub.controller, nil +} + type storageControllerStub struct { uploadDefaults StorageConfigDefaults + scheduleFails bool uploadData string uploadFails bool deleteFails bool @@ -101,12 +165,11 @@ type storageControllerStub struct { nextSnapshot time.Time } -func (stub *storageControllerStub) Destination() string { - return "" -} - -func (stub *storageControllerStub) ScheduleSnapshot(context.Context, time.Time, StorageConfigDefaults) time.Time { - return stub.nextSnapshot +func (stub *storageControllerStub) ScheduleSnapshot(context.Context, time.Time, StorageConfigDefaults) (time.Time, error) { + if stub.scheduleFails { + return time.Time{}, errors.New("scheduling failed") + } + return stub.nextSnapshot, nil } func (stub *storageControllerStub) UploadSnapshot(_ context.Context, snapshot io.Reader, timestamp time.Time, defaults StorageConfigDefaults) (bool, time.Time, error) { diff --git a/internal/agent/storage/swift.go b/internal/agent/storage/swift.go index dee633c..e32faac 100644 --- a/internal/agent/storage/swift.go +++ b/internal/agent/storage/swift.go @@ -27,16 +27,19 @@ type swiftStorageImpl struct { container string } -func createSwiftStorageController(ctx context.Context, config SwiftStorageConfig) (*storageControllerImpl[swift.Object], error) { - conn, err := createSwiftConnection(ctx, config) +func (conf SwiftStorageConfig) Destination() string { + return fmt.Sprintf("swift container %s", conf.Container) +} + +func (conf SwiftStorageConfig) CreateController(ctx context.Context) (StorageController, error) { + conn, err := createSwiftConnection(ctx, conf) if err != nil { return nil, err } return newStorageController[swift.Object]( - config.storageConfig, - fmt.Sprintf("swift container %s", config.Container), - swiftStorageImpl{conn, config.Container}, + conf.storageConfig, + swiftStorageImpl{conn, conf.Container}, ), nil } @@ -79,7 +82,7 @@ func createSwiftConnection(ctx context.Context, config SwiftStorageConfig) (*swi // nolint:unused // implements interface storage -func (u swiftStorageImpl) UploadSnapshot(ctx context.Context, name string, data io.Reader) error { +func (u swiftStorageImpl) uploadSnapshot(ctx context.Context, name string, data io.Reader) error { _, header, err := u.conn.Container(ctx, u.container) if err != nil { return err @@ -103,7 +106,7 @@ func (u swiftStorageImpl) UploadSnapshot(ctx context.Context, name string, data // nolint:unused // implements interface storage -func (u swiftStorageImpl) DeleteSnapshot(ctx context.Context, snapshot swift.Object) error { +func (u swiftStorageImpl) deleteSnapshot(ctx context.Context, snapshot swift.Object) error { if err := u.conn.ObjectDelete(ctx, u.container, snapshot.Name); err != nil { return err } @@ -113,12 +116,12 @@ func (u swiftStorageImpl) DeleteSnapshot(ctx context.Context, snapshot swift.Obj // nolint:unused // implements interface storage -func (u swiftStorageImpl) ListSnapshots(ctx context.Context, prefix string, _ string) ([]swift.Object, error) { +func (u swiftStorageImpl) listSnapshots(ctx context.Context, prefix string, _ string) ([]swift.Object, error) { return u.conn.ObjectsAll(ctx, u.container, &swift.ObjectsOpts{Prefix: prefix}) } // nolint:unused // implements interface storage -func (u swiftStorageImpl) GetLastModifiedTime(snapshot swift.Object) time.Time { +func (u swiftStorageImpl) getLastModifiedTime(snapshot swift.Object) time.Time { return snapshot.LastModified }