diff --git a/common/configuration/asm.go b/common/configuration/asm.go index 6278ab89e9..4882b0a021 100644 --- a/common/configuration/asm.go +++ b/common/configuration/asm.go @@ -7,6 +7,7 @@ import ( "time" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/benbjohnson/clock" "github.com/TBD54566975/ftl/backend/controller/leader" "github.com/TBD54566975/ftl/backend/controller/leases" @@ -38,8 +39,12 @@ var _ Resolver[Secrets] = &ASM{} var _ Provider[Secrets] = &ASM{} func NewASM(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser) *ASM { + return newASMForTesting(ctx, secretsClient, advertise, leaser, clock.New()) +} + +func newASMForTesting(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser, clock clock.Clock) *ASM { leaderFactory := func(ctx context.Context) (asmClient, error) { - return newASMLeader(ctx, secretsClient), nil + return newASMLeader(ctx, secretsClient, clock), nil } followerFactory := func(ctx context.Context, url *url.URL) (client asmClient, err error) { rpcClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, url.String(), log.Error) diff --git a/common/configuration/asm_leader.go b/common/configuration/asm_leader.go index 52cd2ba9dd..9cf1562ed7 100644 --- a/common/configuration/asm_leader.go +++ b/common/configuration/asm_leader.go @@ -9,6 +9,7 @@ import ( "time" "github.com/alecthomas/atomic" + "github.com/benbjohnson/clock" "github.com/puzpuzpuz/xsync/v3" "github.com/TBD54566975/ftl/internal/log" @@ -62,25 +63,25 @@ type asmLeader struct { var _ asmClient = &asmLeader{} -func newASMLeader(ctx context.Context, client *secretsmanager.Client) *asmLeader { +func newASMLeader(ctx context.Context, client *secretsmanager.Client, clock clock.Clock) *asmLeader { l := &asmLeader{ client: client, secrets: xsync.NewMapOf[Ref, asmSecretValue](), topic: pubsub.New[updateASMSecretEvent](), } go func() { - l.watchForUpdates(ctx) + l.watchForUpdates(ctx, clock) }() return l } -func (l *asmLeader) watchForUpdates(ctx context.Context) { +func (l *asmLeader) watchForUpdates(ctx context.Context, clock clock.Clock) { logger := log.FromContext(ctx) events := make(chan updateASMSecretEvent, 64) l.topic.Subscribe(events) defer l.topic.Unsubscribe(events) - nextSync := time.Now() + nextSync := clock.Now() backOff := syncInitialBackoff for { select { @@ -90,8 +91,8 @@ func (l *asmLeader) watchForUpdates(ctx context.Context) { case e := <-events: l.processEvent(e) - case <-time.After(time.Until(nextSync)): - nextSync = time.Now().Add(syncInterval) + case <-clock.After(clock.Until(nextSync)): + nextSync = clock.Now().Add(syncInterval) err := l.sync(ctx) if err == nil { @@ -100,9 +101,9 @@ func (l *asmLeader) watchForUpdates(ctx context.Context) { } // back off if we fail to sync logger.Warnf("unable to sync ASM secrets: %v", err) - nextSync = time.Now().Add(backOff) - if nextSync.After(time.Now().Add(syncInterval)) { - nextSync = time.Now().Add(syncInterval) + nextSync = clock.Now().Add(backOff) + if nextSync.After(clock.Now().Add(syncInterval)) { + nextSync = clock.Now().Add(syncInterval) } else { backOff *= 2 } diff --git a/common/configuration/asm_test.go b/common/configuration/asm_test.go index 754d49adde..732101b27f 100644 --- a/common/configuration/asm_test.go +++ b/common/configuration/asm_test.go @@ -7,9 +7,11 @@ import ( "fmt" "sort" "testing" + "time" "github.com/TBD54566975/ftl/backend/controller/leases" "github.com/TBD54566975/ftl/internal/log" + "github.com/benbjohnson/clock" "github.com/alecthomas/assert/v2" . "github.com/alecthomas/types/optional" @@ -19,24 +21,24 @@ import ( "github.com/aws/aws-sdk-go-v2/service/secretsmanager" ) -func localstack(ctx context.Context, t *testing.T) (*ASM, *asmLeader) { +func localstack(ctx context.Context, t *testing.T) (*ASM, *asmLeader, *secretsmanager.Client, *clock.Mock) { t.Helper() + mockClock := clock.NewMock() cc := aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider("test", "test", "")) cfg, err := config.LoadDefaultConfig(ctx, config.WithCredentialsProvider(cc), config.WithRegion("us-west-2")) if err != nil { t.Fatal(err) } - sm := secretsmanager.NewFromConfig(cfg, func(o *secretsmanager.Options) { o.BaseEndpoint = aws.String("http://localhost:4566") }) - asm := NewASM(ctx, sm, URL("http://localhost:1234"), leases.NewFakeLeaser()) + asm := newASMForTesting(ctx, sm, URL("http://localhost:1234"), leases.NewFakeLeaser(), mockClock) leaderOrFollower, err := asm.coordinator.Get() assert.NoError(t, err) leader, ok := leaderOrFollower.(*asmLeader) assert.True(t, ok, "expected test to get an asm leader not a follower") - return asm, leader + return asm, leader, sm, mockClock } func waitForUpdatesToProcess(l *asmLeader) { @@ -45,7 +47,7 @@ func waitForUpdatesToProcess(l *asmLeader) { func TestASMWorkflow(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) - asm, leader := localstack(ctx, t) + asm, leader, _, _ := localstack(ctx, t) ref := Ref{Module: Some("foo"), Name: "bar"} var mySecret = []byte("my secret") manager, err := New(ctx, asm, []Provider[Secrets]{asm}) @@ -95,7 +97,7 @@ func TestASMWorkflow(t *testing.T) { // Suggest not running this against a real AWS account (especially in CI) due to the cost. Maybe costs a few $. func TestASMPagination(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) - asm, leader := localstack(ctx, t) + asm, leader, _, _ := localstack(ctx, t) manager, err := New(ctx, asm, []Provider[Secrets]{asm}) assert.NoError(t, err) @@ -140,3 +142,116 @@ func TestASMPagination(t *testing.T) { assert.NoError(t, err) assert.Equal(t, len(items), 0) } + +func TestSync(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + asm, leader, sm, clock := localstack(ctx, t) + + // wait for initial load + err := leader.waitForSecrets() + assert.NoError(t, err) + + // advance clock to half way between syncs + clock.Add(syncInterval / 2) + + // write a secret via leader + leaderRef := Ref{Module: Some("sync"), Name: "set-by-leader"} + _, err = asm.Store(ctx, leaderRef, []byte("leader-first")) + assert.NoError(t, err) + waitForUpdatesToProcess(leader) + value, err := asm.Load(ctx, leaderRef, asmURLForRef(leaderRef)) + assert.NoError(t, err, "failed to load secret via asm") + assert.Equal(t, value, []byte("leader-first"), "unexpected secret value") + + // write another secret via sm directly + smRef := Ref{Module: Some("sync"), Name: "set-by-sm"} + _, err = sm.CreateSecret(ctx, &secretsmanager.CreateSecretInput{ + Name: aws.String(smRef.String()), + SecretString: aws.String(string("sm-first")), + }) + assert.NoError(t, err, "failed to create secret via sm") + waitForUpdatesToProcess(leader) + value, err = asm.Load(ctx, smRef, asmURLForRef(smRef)) + assert.Error(t, err, "expected to fail because asm leader has not synced secret yet") + + // write a secret via leader and then by sm directly + leaderSmRef := Ref{Module: Some("sync"), Name: "set-by-leader-then-sm"} + _, err = asm.Store(ctx, leaderSmRef, []byte("leader-sm-first")) + assert.NoError(t, err) + _, err = sm.UpdateSecret(ctx, &secretsmanager.UpdateSecretInput{ + SecretId: aws.String(leaderSmRef.String()), + SecretString: aws.String("leader-sm-second"), + }) + assert.NoError(t, err) + waitForUpdatesToProcess(leader) + value, err = asm.Load(ctx, leaderSmRef, asmURLForRef(leaderSmRef)) + assert.NoError(t, err, "failed to load secret via asm") + assert.Equal(t, value, []byte("leader-sm-first"), "expected initial value before leader has a chance to sync newest value") + + // write a secret via sm directly and then by leader + smLeaderRef := Ref{Module: Some("sync"), Name: "set-by-sm-then-leader"} + _, err = sm.CreateSecret(ctx, &secretsmanager.CreateSecretInput{ + Name: aws.String(smLeaderRef.String()), + SecretString: aws.String(string("sm-leader-first")), + }) + assert.NoError(t, err, "failed to create secret via sm") + _, err = asm.Store(ctx, smLeaderRef, []byte("sm-leader-second")) + assert.NoError(t, err) + waitForUpdatesToProcess(leader) + value, err = asm.Load(ctx, smLeaderRef, asmURLForRef(smLeaderRef)) + assert.NoError(t, err, "failed to load secret via asm") + assert.Equal(t, value, []byte("sm-leader-second"), "unexpected secret value") + + // give leader a change to sync + clock.Add(syncInterval) + time.Sleep(time.Second * 5) + + // confirm that all secrets are up to date + list, err := asm.List(ctx) + assert.NoError(t, err) + assert.Equal(t, len(list), 4, "expected 4 secrets") + for _, entry := range list { + value, err = asm.Load(ctx, entry.Ref, asmURLForRef(entry.Ref)) + assert.NoError(t, err, "failed to load secret via asm") + var expectedValue string + switch entry.Ref { + case leaderRef: + expectedValue = "leader-first" + case smRef: + expectedValue = "sm-first" + case leaderSmRef: + expectedValue = "leader-sm-second" + case smLeaderRef: + expectedValue = "sm-leader-second" + default: + panic(fmt.Sprintf("unexpected ref: %s", entry.Ref)) + } + assert.Equal(t, expectedValue, string(value), "unexpected secret value for %s", entry.Ref) + } + + // delete 2 secrets without leader knowing + tr := true + _, err = sm.DeleteSecret(ctx, &secretsmanager.DeleteSecretInput{ + SecretId: aws.String(smRef.String()), + ForceDeleteWithoutRecovery: &tr, + }) + assert.NoError(t, err) + _, err = sm.DeleteSecret(ctx, &secretsmanager.DeleteSecretInput{ + SecretId: aws.String(smLeaderRef.String()), + ForceDeleteWithoutRecovery: &tr, + }) + assert.NoError(t, err) + + // give leader a change to sync + clock.Add(syncInterval) + time.Sleep(time.Second * 5) + + // confirm secrets were deleted + list, err = asm.List(ctx) + assert.NoError(t, err) + assert.Equal(t, len(list), 2, "expected 2 secrets") + _, err = asm.Load(ctx, smRef, asmURLForRef(smRef)) + assert.Error(t, err, "expected to fail because secret was deleted") + _, err = asm.Load(ctx, smLeaderRef, asmURLForRef(smLeaderRef)) + assert.Error(t, err, "expected to fail because secret was deleted") +}