diff --git a/common/configuration/asm.go b/common/configuration/asm.go index 93c69ca91a..6278ab89e9 100644 --- a/common/configuration/asm.go +++ b/common/configuration/asm.go @@ -27,6 +27,7 @@ type asmClient interface { // Only supports loading "string" secrets, not binary secrets. // // The resolver does a direct/proxy map from a Ref to a URL, module.name <-> asm://module.name and does not access ASM at all. +// // One controller is elected as the leader and is responsible for syncing the cache of secrets from ASM (see asmLeader). // Others get secrets from the leader via AdminService (see asmFollower). type ASM struct { diff --git a/common/configuration/asm_leader.go b/common/configuration/asm_leader.go index b07bf6eaf4..52cd2ba9dd 100644 --- a/common/configuration/asm_leader.go +++ b/common/configuration/asm_leader.go @@ -27,15 +27,16 @@ const ( loadSecretsTimeout = time.Second * 5 ) -type asmCacheValue struct { +type asmSecretValue struct { value []byte // lastModified is retrieved from ASM when syncing. - // it is nil when our cache is updated after writing to ASM (we do not know the lastModified time in ASM) + // it is nil when our cache is updated after writing to ASM (lastModified is not returned when writing), + // until the next sync refetches it from ASM lastModified optional.Option[time.Time] } -type updateSecretEvent struct { +type updateASMSecretEvent struct { ref Ref // value is nil when the secret was deleted value optional.Option[[]byte] @@ -52,9 +53,9 @@ type asmLeader struct { // it is updated via: // - polling ASM // - when we write to ASM - secrets *xsync.MapOf[Ref, asmCacheValue] + secrets *xsync.MapOf[Ref, asmSecretValue] - topic *pubsub.Topic[updateSecretEvent] + topic *pubsub.Topic[updateASMSecretEvent] // used by tests to wait for events to be processed topicWaitGroup sync.WaitGroup } @@ -64,8 +65,8 @@ var _ asmClient = &asmLeader{} func newASMLeader(ctx context.Context, client *secretsmanager.Client) *asmLeader { l := &asmLeader{ client: client, - secrets: xsync.NewMapOf[Ref, asmCacheValue](), - topic: pubsub.New[updateSecretEvent](), + secrets: xsync.NewMapOf[Ref, asmSecretValue](), + topic: pubsub.New[updateASMSecretEvent](), } go func() { l.watchForUpdates(ctx) @@ -75,7 +76,7 @@ func newASMLeader(ctx context.Context, client *secretsmanager.Client) *asmLeader func (l *asmLeader) watchForUpdates(ctx context.Context) { logger := log.FromContext(ctx) - events := make(chan updateSecretEvent, 64) + events := make(chan updateASMSecretEvent, 64) l.topic.Subscribe(events) defer l.topic.Unsubscribe(events) @@ -111,8 +112,8 @@ func (l *asmLeader) watchForUpdates(ctx context.Context) { // sync retrieves all secrets from ASM and updates the cache func (l *asmLeader) sync(ctx context.Context) error { - previous := map[Ref]asmCacheValue{} - l.secrets.Range(func(ref Ref, value asmCacheValue) bool { + previous := map[Ref]asmSecretValue{} + l.secrets.Range(func(ref Ref, value asmSecretValue) bool { previous[ref] = value return true }) @@ -186,7 +187,7 @@ func (l *asmLeader) sync(ctx context.Context) error { return fmt.Errorf("secret for %s is not a string", ref) } data := []byte(*s.SecretString) - l.secrets.Store(ref, asmCacheValue{ + l.secrets.Store(ref, asmSecretValue{ value: data, lastModified: optional.Some(refsToLoad[ref]), }) @@ -198,7 +199,9 @@ func (l *asmLeader) sync(ctx context.Context) error { return nil } -func (l *asmLeader) processEvent(e updateSecretEvent) { +// processEvent updates the cache after writes to ASM +func (l *asmLeader) processEvent(e updateASMSecretEvent) { + // waitGroup updated so testing can wait for events to be processed defer l.topicWaitGroup.Done() if !l.loaded.Load() { @@ -206,19 +209,22 @@ func (l *asmLeader) processEvent(e updateSecretEvent) { return } if data, ok := e.value.Get(); ok { - l.secrets.Store(e.ref, asmCacheValue{ + // updated value + l.secrets.Store(e.ref, asmSecretValue{ value: data, lastModified: optional.None[time.Time](), }) } else { + // removed value l.secrets.Delete(e.ref) } } -// publish an event to the topic +// publishes an event to update the cache // -// wrapped in a method to ensure we always update topicWaitGroup -func (l *asmLeader) publish(event updateSecretEvent) { +// Called after writing to ASM. +// Wrapped in a method to ensure we always update topicWaitGroup +func (l *asmLeader) publish(event updateASMSecretEvent) { l.topicWaitGroup.Add(1) l.topic.Publish(event) } @@ -227,24 +233,23 @@ func (l *asmLeader) publish(event updateSecretEvent) { // // If secrets have not yet been synced, this will retry until they are, returning an error if it takes too long. func (l *asmLeader) waitForSecrets() error { - retryRate := time.Second / time.Duration(10) - for range int(loadSecretsTimeout / retryRate) { + deadline := time.Now().Add(loadSecretsTimeout) + for deadline.After(time.Now()) { if loaded := l.loaded.Load(); loaded { return nil } - time.Sleep(retryRate) + time.Sleep(time.Millisecond * 100) } - return errors.New("secrets not synced yet") + return errors.New("secrets not synced from ASM yet") } -// List all secrets in the account. This might require multiple calls to the AWS API if there are more than 100 secrets. +// list all secrets in the account. func (l *asmLeader) list(ctx context.Context) ([]Entry, error) { - err := l.waitForSecrets() - if err != nil { + if err := l.waitForSecrets(); err != nil { return nil, err } entries := []Entry{} - l.secrets.Range(func(ref Ref, value asmCacheValue) bool { + l.secrets.Range(func(ref Ref, value asmSecretValue) bool { entries = append(entries, Entry{ Ref: ref, Accessor: asmURLForRef(ref), @@ -254,10 +259,8 @@ func (l *asmLeader) list(ctx context.Context) ([]Entry, error) { return entries, nil } -// Load only supports loading "string" secrets, not binary secrets. func (l *asmLeader) load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) { - err := l.waitForSecrets() - if err != nil { + if err := l.waitForSecrets(); err != nil { return nil, err } if v, ok := l.secrets.Load(ref); ok { @@ -266,7 +269,7 @@ func (l *asmLeader) load(ctx context.Context, ref Ref, key *url.URL) ([]byte, er return nil, fmt.Errorf("secret not found: %s", ref) } -// Store and if the secret already exists, update it. +// store and if the secret already exists, update it. func (l *asmLeader) store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) { _, err := l.client.CreateSecret(ctx, &secretsmanager.CreateSecretInput{ Name: aws.String(ref.String()), @@ -287,7 +290,7 @@ func (l *asmLeader) store(ctx context.Context, ref Ref, value []byte) (*url.URL, } else if err != nil { return nil, fmt.Errorf("unable to store secret: %w", err) } - l.publish(updateSecretEvent{ + l.publish(updateASMSecretEvent{ ref: ref, value: optional.Some(value), }) @@ -303,7 +306,7 @@ func (l *asmLeader) delete(ctx context.Context, ref Ref) error { if err != nil { return fmt.Errorf("unable to delete secret: %w", err) } - l.publish(updateSecretEvent{ + l.publish(updateASMSecretEvent{ ref: ref, value: optional.None[[]byte](), })