Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 20, 2024
1 parent 1e0bc6f commit ac6177c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 30 deletions.
1 change: 1 addition & 0 deletions common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type asmClient interface {
// Only supports loading "string" secrets, not binary secrets.
//
// The resolver does a direct/proxy map from a Ref to a URL, module.name <-> asm://module.name and does not access ASM at all.
//
// One controller is elected as the leader and is responsible for syncing the cache of secrets from ASM (see asmLeader).
// Others get secrets from the leader via AdminService (see asmFollower).
type ASM struct {
Expand Down
63 changes: 33 additions & 30 deletions common/configuration/asm_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ const (
loadSecretsTimeout = time.Second * 5
)

type asmCacheValue struct {
type asmSecretValue struct {
value []byte

// lastModified is retrieved from ASM when syncing.
// it is nil when our cache is updated after writing to ASM (we do not know the lastModified time in ASM)
// it is nil when our cache is updated after writing to ASM (lastModified is not returned when writing),
// until the next sync refetches it from ASM
lastModified optional.Option[time.Time]
}

type updateSecretEvent struct {
type updateASMSecretEvent struct {
ref Ref
// value is nil when the secret was deleted
value optional.Option[[]byte]
Expand All @@ -52,9 +53,9 @@ type asmLeader struct {
// it is updated via:
// - polling ASM
// - when we write to ASM
secrets *xsync.MapOf[Ref, asmCacheValue]
secrets *xsync.MapOf[Ref, asmSecretValue]

topic *pubsub.Topic[updateSecretEvent]
topic *pubsub.Topic[updateASMSecretEvent]
// used by tests to wait for events to be processed
topicWaitGroup sync.WaitGroup
}
Expand All @@ -64,8 +65,8 @@ var _ asmClient = &asmLeader{}
func newASMLeader(ctx context.Context, client *secretsmanager.Client) *asmLeader {
l := &asmLeader{
client: client,
secrets: xsync.NewMapOf[Ref, asmCacheValue](),
topic: pubsub.New[updateSecretEvent](),
secrets: xsync.NewMapOf[Ref, asmSecretValue](),
topic: pubsub.New[updateASMSecretEvent](),
}
go func() {
l.watchForUpdates(ctx)
Expand All @@ -75,7 +76,7 @@ func newASMLeader(ctx context.Context, client *secretsmanager.Client) *asmLeader

func (l *asmLeader) watchForUpdates(ctx context.Context) {
logger := log.FromContext(ctx)
events := make(chan updateSecretEvent, 64)
events := make(chan updateASMSecretEvent, 64)
l.topic.Subscribe(events)
defer l.topic.Unsubscribe(events)

Expand Down Expand Up @@ -111,8 +112,8 @@ func (l *asmLeader) watchForUpdates(ctx context.Context) {

// sync retrieves all secrets from ASM and updates the cache
func (l *asmLeader) sync(ctx context.Context) error {
previous := map[Ref]asmCacheValue{}
l.secrets.Range(func(ref Ref, value asmCacheValue) bool {
previous := map[Ref]asmSecretValue{}
l.secrets.Range(func(ref Ref, value asmSecretValue) bool {
previous[ref] = value
return true
})
Expand Down Expand Up @@ -186,7 +187,7 @@ func (l *asmLeader) sync(ctx context.Context) error {
return fmt.Errorf("secret for %s is not a string", ref)
}
data := []byte(*s.SecretString)
l.secrets.Store(ref, asmCacheValue{
l.secrets.Store(ref, asmSecretValue{
value: data,
lastModified: optional.Some(refsToLoad[ref]),
})
Expand All @@ -198,27 +199,32 @@ func (l *asmLeader) sync(ctx context.Context) error {
return nil
}

func (l *asmLeader) processEvent(e updateSecretEvent) {
// processEvent updates the cache after writes to ASM
func (l *asmLeader) processEvent(e updateASMSecretEvent) {
// waitGroup updated so testing can wait for events to be processed
defer l.topicWaitGroup.Done()

if !l.loaded.Load() {
// cache not loaded, nothing to update
return
}
if data, ok := e.value.Get(); ok {
l.secrets.Store(e.ref, asmCacheValue{
// updated value
l.secrets.Store(e.ref, asmSecretValue{
value: data,
lastModified: optional.None[time.Time](),
})
} else {
// removed value
l.secrets.Delete(e.ref)
}
}

// publish an event to the topic
// publishes an event to update the cache
//
// wrapped in a method to ensure we always update topicWaitGroup
func (l *asmLeader) publish(event updateSecretEvent) {
// Called after writing to ASM.
// Wrapped in a method to ensure we always update topicWaitGroup
func (l *asmLeader) publish(event updateASMSecretEvent) {
l.topicWaitGroup.Add(1)
l.topic.Publish(event)
}
Expand All @@ -227,24 +233,23 @@ func (l *asmLeader) publish(event updateSecretEvent) {
//
// If secrets have not yet been synced, this will retry until they are, returning an error if it takes too long.
func (l *asmLeader) waitForSecrets() error {
retryRate := time.Second / time.Duration(10)
for range int(loadSecretsTimeout / retryRate) {
deadline := time.Now().Add(loadSecretsTimeout)
for deadline.After(time.Now()) {
if loaded := l.loaded.Load(); loaded {
return nil
}
time.Sleep(retryRate)
time.Sleep(time.Millisecond * 100)
}
return errors.New("secrets not synced yet")
return errors.New("secrets not synced from ASM yet")
}

// List all secrets in the account. This might require multiple calls to the AWS API if there are more than 100 secrets.
// list all secrets in the account.
func (l *asmLeader) list(ctx context.Context) ([]Entry, error) {
err := l.waitForSecrets()
if err != nil {
if err := l.waitForSecrets(); err != nil {
return nil, err
}
entries := []Entry{}
l.secrets.Range(func(ref Ref, value asmCacheValue) bool {
l.secrets.Range(func(ref Ref, value asmSecretValue) bool {
entries = append(entries, Entry{
Ref: ref,
Accessor: asmURLForRef(ref),
Expand All @@ -254,10 +259,8 @@ func (l *asmLeader) list(ctx context.Context) ([]Entry, error) {
return entries, nil
}

// Load only supports loading "string" secrets, not binary secrets.
func (l *asmLeader) load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
err := l.waitForSecrets()
if err != nil {
if err := l.waitForSecrets(); err != nil {
return nil, err
}
if v, ok := l.secrets.Load(ref); ok {
Expand All @@ -266,7 +269,7 @@ func (l *asmLeader) load(ctx context.Context, ref Ref, key *url.URL) ([]byte, er
return nil, fmt.Errorf("secret not found: %s", ref)
}

// Store and if the secret already exists, update it.
// store and if the secret already exists, update it.
func (l *asmLeader) store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) {
_, err := l.client.CreateSecret(ctx, &secretsmanager.CreateSecretInput{
Name: aws.String(ref.String()),
Expand All @@ -287,7 +290,7 @@ func (l *asmLeader) store(ctx context.Context, ref Ref, value []byte) (*url.URL,
} else if err != nil {
return nil, fmt.Errorf("unable to store secret: %w", err)
}
l.publish(updateSecretEvent{
l.publish(updateASMSecretEvent{
ref: ref,
value: optional.Some(value),
})
Expand All @@ -303,7 +306,7 @@ func (l *asmLeader) delete(ctx context.Context, ref Ref) error {
if err != nil {
return fmt.Errorf("unable to delete secret: %w", err)
}
l.publish(updateSecretEvent{
l.publish(updateASMSecretEvent{
ref: ref,
value: optional.None[[]byte](),
})
Expand Down

0 comments on commit ac6177c

Please sign in to comment.