Skip to content

Commit

Permalink
and sync integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 20, 2024
1 parent 4ff6ff6 commit 9b0726e
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 16 deletions.
7 changes: 6 additions & 1 deletion common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/TBD54566975/ftl/internal/rpc"
"github.com/benbjohnson/clock"

"github.com/TBD54566975/ftl/backend/controller/leader"
"github.com/TBD54566975/ftl/backend/controller/leases"
Expand Down Expand Up @@ -38,8 +39,12 @@ var _ Resolver[Secrets] = &ASM{}
var _ Provider[Secrets] = &ASM{}

func NewASM(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser) *ASM {
return newASMForTesting(ctx, secretsClient, advertise, leaser, clock.New())
}

func newASMForTesting(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser, clock clock.Clock) *ASM {
leaderFactory := func(ctx context.Context) (asmClient, error) {
return newASMLeader(ctx, secretsClient), nil
return newASMLeader(ctx, secretsClient, clock), nil
}
followerFactory := func(ctx context.Context, url *url.URL) (client asmClient, err error) {
rpcClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, url.String(), log.Error)
Expand Down
19 changes: 10 additions & 9 deletions common/configuration/asm_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/alecthomas/atomic"
"github.com/benbjohnson/clock"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/internal/log"
Expand Down Expand Up @@ -62,25 +63,25 @@ type asmLeader struct {

var _ asmClient = &asmLeader{}

func newASMLeader(ctx context.Context, client *secretsmanager.Client) *asmLeader {
func newASMLeader(ctx context.Context, client *secretsmanager.Client, clock clock.Clock) *asmLeader {
l := &asmLeader{
client: client,
secrets: xsync.NewMapOf[Ref, asmSecretValue](),
topic: pubsub.New[updateASMSecretEvent](),
}
go func() {
l.watchForUpdates(ctx)
l.watchForUpdates(ctx, clock)
}()
return l
}

func (l *asmLeader) watchForUpdates(ctx context.Context) {
func (l *asmLeader) watchForUpdates(ctx context.Context, clock clock.Clock) {
logger := log.FromContext(ctx)
events := make(chan updateASMSecretEvent, 64)
l.topic.Subscribe(events)
defer l.topic.Unsubscribe(events)

nextSync := time.Now()
nextSync := clock.Now()
backOff := syncInitialBackoff
for {
select {
Expand All @@ -90,8 +91,8 @@ func (l *asmLeader) watchForUpdates(ctx context.Context) {
case e := <-events:
l.processEvent(e)

case <-time.After(time.Until(nextSync)):
nextSync = time.Now().Add(syncInterval)
case <-clock.After(clock.Until(nextSync)):
nextSync = clock.Now().Add(syncInterval)

err := l.sync(ctx)
if err == nil {
Expand All @@ -100,9 +101,9 @@ func (l *asmLeader) watchForUpdates(ctx context.Context) {
}
// back off if we fail to sync
logger.Warnf("unable to sync ASM secrets: %v", err)
nextSync = time.Now().Add(backOff)
if nextSync.After(time.Now().Add(syncInterval)) {
nextSync = time.Now().Add(syncInterval)
nextSync = clock.Now().Add(backOff)
if nextSync.After(clock.Now().Add(syncInterval)) {
nextSync = clock.Now().Add(syncInterval)
} else {
backOff *= 2
}
Expand Down
127 changes: 121 additions & 6 deletions common/configuration/asm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"fmt"
"sort"
"testing"
"time"

"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/internal/log"
"github.com/benbjohnson/clock"

"github.com/alecthomas/assert/v2"
. "github.com/alecthomas/types/optional"
Expand All @@ -19,24 +21,24 @@ import (
"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
)

func localstack(ctx context.Context, t *testing.T) (*ASM, *asmLeader) {
func localstack(ctx context.Context, t *testing.T) (*ASM, *asmLeader, *secretsmanager.Client, *clock.Mock) {
t.Helper()
mockClock := clock.NewMock()
cc := aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider("test", "test", ""))
cfg, err := config.LoadDefaultConfig(ctx, config.WithCredentialsProvider(cc), config.WithRegion("us-west-2"))
if err != nil {
t.Fatal(err)
}

sm := secretsmanager.NewFromConfig(cfg, func(o *secretsmanager.Options) {
o.BaseEndpoint = aws.String("http://localhost:4566")
})
asm := NewASM(ctx, sm, URL("http://localhost:1234"), leases.NewFakeLeaser())
asm := newASMForTesting(ctx, sm, URL("http://localhost:1234"), leases.NewFakeLeaser(), mockClock)

leaderOrFollower, err := asm.coordinator.Get()
assert.NoError(t, err)
leader, ok := leaderOrFollower.(*asmLeader)
assert.True(t, ok, "expected test to get an asm leader not a follower")
return asm, leader
return asm, leader, sm, mockClock
}

func waitForUpdatesToProcess(l *asmLeader) {
Expand All @@ -45,7 +47,7 @@ func waitForUpdatesToProcess(l *asmLeader) {

func TestASMWorkflow(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
asm, leader := localstack(ctx, t)
asm, leader, _, _ := localstack(ctx, t)
ref := Ref{Module: Some("foo"), Name: "bar"}
var mySecret = []byte("my secret")
manager, err := New(ctx, asm, []Provider[Secrets]{asm})
Expand Down Expand Up @@ -95,7 +97,7 @@ func TestASMWorkflow(t *testing.T) {
// Suggest not running this against a real AWS account (especially in CI) due to the cost. Maybe costs a few $.
func TestASMPagination(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
asm, leader := localstack(ctx, t)
asm, leader, _, _ := localstack(ctx, t)
manager, err := New(ctx, asm, []Provider[Secrets]{asm})
assert.NoError(t, err)

Expand Down Expand Up @@ -140,3 +142,116 @@ func TestASMPagination(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, len(items), 0)
}

func TestSync(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
asm, leader, sm, clock := localstack(ctx, t)

// wait for initial load
err := leader.waitForSecrets()
assert.NoError(t, err)

// advance clock to half way between syncs
clock.Add(syncInterval / 2)

// write a secret via leader
leaderRef := Ref{Module: Some("sync"), Name: "set-by-leader"}
_, err = asm.Store(ctx, leaderRef, []byte("leader-first"))
assert.NoError(t, err)
waitForUpdatesToProcess(leader)
value, err := asm.Load(ctx, leaderRef, asmURLForRef(leaderRef))
assert.NoError(t, err, "failed to load secret via asm")
assert.Equal(t, value, []byte("leader-first"), "unexpected secret value")

// write another secret via sm directly
smRef := Ref{Module: Some("sync"), Name: "set-by-sm"}
_, err = sm.CreateSecret(ctx, &secretsmanager.CreateSecretInput{
Name: aws.String(smRef.String()),
SecretString: aws.String(string("sm-first")),
})
assert.NoError(t, err, "failed to create secret via sm")
waitForUpdatesToProcess(leader)
value, err = asm.Load(ctx, smRef, asmURLForRef(smRef))
assert.Error(t, err, "expected to fail because asm leader has not synced secret yet")

// write a secret via leader and then by sm directly
leaderSmRef := Ref{Module: Some("sync"), Name: "set-by-leader-then-sm"}
_, err = asm.Store(ctx, leaderSmRef, []byte("leader-sm-first"))
assert.NoError(t, err)
_, err = sm.UpdateSecret(ctx, &secretsmanager.UpdateSecretInput{
SecretId: aws.String(leaderSmRef.String()),
SecretString: aws.String("leader-sm-second"),
})
assert.NoError(t, err)
waitForUpdatesToProcess(leader)
value, err = asm.Load(ctx, leaderSmRef, asmURLForRef(leaderSmRef))
assert.NoError(t, err, "failed to load secret via asm")
assert.Equal(t, value, []byte("leader-sm-first"), "expected initial value before leader has a chance to sync newest value")

// write a secret via sm directly and then by leader
smLeaderRef := Ref{Module: Some("sync"), Name: "set-by-sm-then-leader"}
_, err = sm.CreateSecret(ctx, &secretsmanager.CreateSecretInput{
Name: aws.String(smLeaderRef.String()),
SecretString: aws.String(string("sm-leader-first")),
})
assert.NoError(t, err, "failed to create secret via sm")
_, err = asm.Store(ctx, smLeaderRef, []byte("sm-leader-second"))
assert.NoError(t, err)
waitForUpdatesToProcess(leader)
value, err = asm.Load(ctx, smLeaderRef, asmURLForRef(smLeaderRef))
assert.NoError(t, err, "failed to load secret via asm")
assert.Equal(t, value, []byte("sm-leader-second"), "unexpected secret value")

// give leader a change to sync
clock.Add(syncInterval)
time.Sleep(time.Second * 5)

// confirm that all secrets are up to date
list, err := asm.List(ctx)
assert.NoError(t, err)
assert.Equal(t, len(list), 4, "expected 4 secrets")
for _, entry := range list {
value, err = asm.Load(ctx, entry.Ref, asmURLForRef(entry.Ref))
assert.NoError(t, err, "failed to load secret via asm")
var expectedValue string
switch entry.Ref {
case leaderRef:
expectedValue = "leader-first"
case smRef:
expectedValue = "sm-first"
case leaderSmRef:
expectedValue = "leader-sm-second"
case smLeaderRef:
expectedValue = "sm-leader-second"
default:
t.Fatal(fmt.Sprintf("unexpected ref: %s", entry.Ref))
}
assert.Equal(t, expectedValue, string(value), "unexpected secret value for %s", entry.Ref)
}

// delete 2 secrets without leader knowing
tr := true
_, err = sm.DeleteSecret(ctx, &secretsmanager.DeleteSecretInput{
SecretId: aws.String(smRef.String()),
ForceDeleteWithoutRecovery: &tr,
})
assert.NoError(t, err)
_, err = sm.DeleteSecret(ctx, &secretsmanager.DeleteSecretInput{
SecretId: aws.String(smLeaderRef.String()),
ForceDeleteWithoutRecovery: &tr,
})
assert.NoError(t, err)

// give leader a change to sync
clock.Add(syncInterval)
time.Sleep(time.Second * 5)

// confirm secrets were deleted
list, err = asm.List(ctx)
assert.NoError(t, err)
assert.Equal(t, len(list), 2, "expected 2 secrets")
_, err = asm.Load(ctx, smRef, asmURLForRef(smRef))
assert.Error(t, err, "expected to fail because secret was deleted")
_, err = asm.Load(ctx, smLeaderRef, asmURLForRef(smLeaderRef))
assert.Error(t, err, "expected to fail because secret was deleted")
}

0 comments on commit 9b0726e

Please sign in to comment.