Skip to content

Commit

Permalink
actually make asm leader threadsafe, rather than just look threadsafe
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 20, 2024
1 parent 26f8eab commit 3b33f8f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 37 deletions.
6 changes: 1 addition & 5 deletions common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/types/pubsub"

"github.com/aws/aws-sdk-go-v2/service/secretsmanager"
)
Expand All @@ -38,10 +37,7 @@ var _ Resolver[Secrets] = &ASM{}
var _ Provider[Secrets] = &ASM{}

func NewASM(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser) *ASM {
lead := &asmLeader{
client: secretsClient,
topic: pubsub.New[updateSecretEvent](),
}
lead := newASMLeader(secretsClient)
return &ASM{
coordinator: leader.New[asmClient](ctx,
advertise,
Expand Down
84 changes: 52 additions & 32 deletions common/configuration/asm_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/alecthomas/atomic"
"golang.org/x/exp/maps"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/backend/controller/leader"
"github.com/TBD54566975/ftl/internal/log"
Expand Down Expand Up @@ -45,22 +45,32 @@ type updateSecretEvent struct {
type asmLeader struct {
client *secretsmanager.Client

// indicates if the initial sync with ASM has finished
loaded atomic.Value[bool]

// secrets is a map of secrets that have been loaded from ASM.
// optional is nil when not loaded yet
// it is updated via:
// - polling ASM
// - when we write to ASM
secrets atomic.Value[optional.Option[map[Ref]asmCacheValue]]
secrets *xsync.MapOf[Ref, asmCacheValue]

topic *pubsub.Topic[updateSecretEvent]

// used by tests to wait for events to be processed
topicWaitGroup sync.WaitGroup
}

var _ asmClient = &asmLeader{}
var _ leader.ActivatedLeader = &asmLeader{}

func newASMLeader(client *secretsmanager.Client) *asmLeader {
return &asmLeader{
client: client,
secrets: xsync.NewMapOf[Ref, asmCacheValue](),
topic: pubsub.New[updateSecretEvent](),
}
}

// ActivateLeader is called when this controller becomes the leader
func (l *asmLeader) ActivateLeader(ctx context.Context) {
go func() {
Expand Down Expand Up @@ -106,13 +116,12 @@ func (l *asmLeader) watchForUpdates(ctx context.Context) {

// sync retrieves all secrets from ASM and updates the cache
func (l *asmLeader) sync(ctx context.Context) error {
var previous map[Ref]asmCacheValue
if p, ok := l.secrets.Load().Get(); ok {
previous = p
} else {
previous = map[Ref]asmCacheValue{}
}
secrets := map[Ref]asmCacheValue{}
previous := map[Ref]asmCacheValue{}
l.secrets.Range(func(ref Ref, value asmCacheValue) bool {
previous[ref] = value
return true
})
seen := map[Ref]bool{}

// get list of secrets
refsToLoad := map[Ref]time.Time{}
Expand All @@ -134,9 +143,10 @@ func (l *asmLeader) sync(ctx context.Context) error {
if err != nil {
return fmt.Errorf("unable to parse ref: %w", err)
}
seen[ref] = true

// check if we already have the value from previous sync
if pValue, ok := previous[ref]; ok && pValue.lastModified == optional.Some(*s.LastChangedDate) {
secrets[ref] = pValue
continue
}
refsToLoad[ref] = *s.LastChangedDate
Expand All @@ -148,6 +158,13 @@ func (l *asmLeader) sync(ctx context.Context) error {
}
}

// remove secrets not found in ASM
for ref := range previous {
if _, ok := seen[ref]; !ok {
l.secrets.Delete(ref)
}
}

// get values for new and updated secrets
for len(refsToLoad) > 0 {
batchSize := 20
Expand All @@ -174,32 +191,32 @@ func (l *asmLeader) sync(ctx context.Context) error {
return fmt.Errorf("secret for %s is not a string", ref)
}
data := []byte(*s.SecretString)
secrets[ref] = asmCacheValue{
l.secrets.Store(ref, asmCacheValue{
value: data,
lastModified: optional.Some(refsToLoad[ref]),
}
})
delete(refsToLoad, ref)
}
}

l.secrets.Store(optional.Some(secrets))
l.loaded.Store(true)
return nil
}

func (l *asmLeader) processEvent(e updateSecretEvent) {
defer l.topicWaitGroup.Done()

secrets, ok := l.secrets.Load().Get()
if !ok {
if !l.loaded.Load() {
// cache not loaded, nothing to update
return
}
if data, ok := e.value.Get(); ok {
secrets[e.ref] = asmCacheValue{
l.secrets.Store(e.ref, asmCacheValue{
value: data,
lastModified: optional.None[time.Time](),
}
})
} else {
delete(secrets, e.ref)
l.secrets.Delete(e.ref)
}
}

Expand All @@ -211,41 +228,44 @@ func (l *asmLeader) publish(event updateSecretEvent) {
l.topic.Publish(event)
}

// loadSecrets provides a way to retrieve secrets from the cache
// waitForSecrets waits until the initial sync of secrets has completed.
//
// If secrets have not yet been loaded, this will retry until they are, returning an error if it takes too long.
func (l *asmLeader) loadSecrets() (map[Ref]asmCacheValue, error) {
// If secrets have not yet been synced, this will retry until they are, returning an error if it takes too long.
func (l *asmLeader) waitForSecrets() error {
retryRate := time.Second / time.Duration(10)
for range int(loadSecretsTimeout / retryRate) {
if v, ok := l.secrets.Load().Get(); ok {
return v, nil
if loaded := l.loaded.Load(); loaded {
return nil
}
time.Sleep(retryRate)
}
return nil, errors.New("secrets not synced yet")
return errors.New("secrets not synced yet")
}

// List all secrets in the account. This might require multiple calls to the AWS API if there are more than 100 secrets.
func (l *asmLeader) list(ctx context.Context) ([]Entry, error) {
secrets, err := l.loadSecrets()
err := l.waitForSecrets()
if err != nil {
return nil, err
}
return slices.Map(maps.Keys(secrets), func(ref Ref) Entry {
return Entry{
entries := []Entry{}
l.secrets.Range(func(ref Ref, value asmCacheValue) bool {
entries = append(entries, Entry{
Ref: ref,
Accessor: asmURLForRef(ref),
}
}), nil
})
return true
})
return entries, nil
}

// Load only supports loading "string" secrets, not binary secrets.
func (l *asmLeader) load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
secrets, err := l.loadSecrets()
err := l.waitForSecrets()
if err != nil {
return nil, err
}
if v, ok := secrets[ref]; ok {
if v, ok := l.secrets.Load(ref); ok {
return v.value, nil
}
return nil, fmt.Errorf("secret not found: %s", ref)
Expand Down

0 comments on commit 3b33f8f

Please sign in to comment.