Skip to content

Commit

Permalink
feat: asynchronous providers (#2055)
Browse files Browse the repository at this point in the history
closes #1949

Changes:
- Splits `Provider` interface into:
- `SynchronousProvider`: same as old interface, can load values from the
data source on-demand
- `AsynchronousProvider`: used by providers that prefer to sync with
their data source
- `Manager` owns a `cache` which syncs asynchronous providers
- When `Manager` is getting values from an asynchronous provider, it now
accesses it via `cache`
- When `Manager` is setting values for an asynchronous provider, it
directly sets it with the provider and then notifies the cache
- `OnePasswordProvider` is now an `AsynchronousProvider` so that we
don't cause multiple annoying authorization prompts
- `ASM` is now an `AsynchronousProvider` with it's previous
`secretsCache` now being repurposed to work more generally.
  • Loading branch information
matt2e authored Jul 12, 2024
1 parent 3ee92a1 commit 6508d57
Show file tree
Hide file tree
Showing 14 changed files with 605 additions and 395 deletions.
55 changes: 36 additions & 19 deletions common/configuration/1password_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"net/url"
"regexp"
"strings"
"time"

"github.com/kballard/go-shellquote"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/internal/exec"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -23,6 +25,8 @@ type OnePasswordProvider struct {
ProjectName string
}

var _ AsynchronousProvider[Secrets] = OnePasswordProvider{}

func (OnePasswordProvider) Role() Secrets { return Secrets{} }
func (o OnePasswordProvider) Key() string { return "op" }
func (o OnePasswordProvider) Delete(ctx context.Context, ref Ref) error {
Expand All @@ -33,24 +37,44 @@ func (o OnePasswordProvider) itemName() string {
return o.ProjectName + ".secrets"
}

// Load returns the secret stored in 1password.
func (o OnePasswordProvider) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
func (o OnePasswordProvider) SyncInterval() time.Duration {
return time.Second * 10
}

func (o OnePasswordProvider) Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
logger := log.FromContext(ctx)
if o.Vault == "" {
return fmt.Errorf("1Password vault not set: use --opvault flag to specify the vault")
}
if err := checkOpBinary(); err != nil {
return nil, err
return err
}

vault := key.Host
full, err := o.getItem(ctx, vault)
full, err := o.getItem(ctx, o.Vault)
if err != nil {
return nil, fmt.Errorf("get item failed: %w", err)
return fmt.Errorf("get item failed: %w", err)
}

secret, ok := full.value(ref)
if !ok {
return nil, fmt.Errorf("field %q not found in 1Password item %q: %v", ref, o.itemName(), full.Fields)
for _, field := range full.Fields {
ref, err := ParseRef(field.Label)
if err != nil {
logger.Warnf("invalid field label found in 1Password: %q", field.Label)
continue
}
values.Store(ref, SyncedValue{
Value: []byte(field.Value),
})
}

return secret, nil
// delete old values
values.Range(func(ref Ref, _ SyncedValue) bool {
if _, ok := slices.Find(full.Fields, func(item entry) bool {
return item.Label == ref.String()
}); !ok {
values.Delete(ref)
}
return true
})
return nil
}

var vaultRegex = regexp.MustCompile(`^[a-zA-Z0-9_\-.]+$`)
Expand Down Expand Up @@ -118,13 +142,6 @@ type entry struct {
Value string `json:"value"`
}

func (i item) value(ref Ref) ([]byte, bool) {
secret, ok := slices.Find(i.Fields, func(item entry) bool {
return item.Label == ref.String()
})
return []byte(secret.Value), ok
}

// getItem gets the single 1Password item for all project secrets
// op --format json item get --vault Personal "projectname.secrets"
func (o OnePasswordProvider) getItem(ctx context.Context, vault string) (*item, error) {
Expand All @@ -135,7 +152,7 @@ func (o OnePasswordProvider) getItem(ctx context.Context, vault string) (*item,
"--format", "json",
}
output, err := exec.Capture(ctx, ".", "op", args...)
logger.Debugf("Getting item with args %s", shellquote.Join(args...))
logger.Tracef("Getting item with args %s", shellquote.Join(args...))
if err != nil {
// This is specifically not itemNotFoundError, to distinguish between vault not found and item not found.
if strings.Contains(string(output), "isn't a vault") {
Expand Down
41 changes: 40 additions & 1 deletion common/configuration/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"errors"
"net/url"
"strings"
"time"

"github.com/alecthomas/types/optional"
"github.com/puzpuzpuz/xsync/v3"
)

// ErrNotFound is returned when a configuration entry is not found or cannot be resolved.
Expand Down Expand Up @@ -88,9 +90,46 @@ type Router[R Role] interface {
type Provider[R Role] interface {
Role() R
Key() string
Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error)

// Store a configuration value and return its key.
Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error)
// Delete a configuration value.
Delete(ctx context.Context, ref Ref) error
}

// SynchronousProvider is an interface for providers that can load values on-demand.
// This is recommended if the provider allows inexpensive loading of values.
type SynchronousProvider[R Role] interface {
Provider[R]

Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error)
}

// AsynchronousProvider is an interface for providers that support syncing values.
// This is recommended if the provider allows batch access, or is expensive to load.
type AsynchronousProvider[R Role] interface {
Provider[R]

SyncInterval() time.Duration

// Sync is called periodically to update the cache with the latest values.
//
// SyncInterval() provides the expected time between syncs.
// If Sync() returns an error, sync will be retried with an exponential backoff.
//
// Sync is only called if the Router has keys referring to this provider.
// If the Router did have keys for this provider but removed them, one more round of sync is executed until Sync() will stop being called
Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error
}

type VersionToken any

type SyncedValue struct {
Value []byte

// VersionToken is a way of storing a version provided by the source of truth (eg: lastModified)
// it is nil when:
// - the owner of the cache is not using version tokens
// - the cache is updated after writing
VersionToken optional.Option[VersionToken]
}
56 changes: 43 additions & 13 deletions common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package configuration

import (
"context"
"fmt"
"net/url"
"time"

"github.com/TBD54566975/ftl/internal/rpc"
"github.com/benbjohnson/clock"
"github.com/alecthomas/types/optional"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/backend/controller/leader"
"github.com/TBD54566975/ftl/backend/controller/leases"
Expand All @@ -17,8 +19,9 @@ import (
)

type asmClient interface {
list(ctx context.Context) ([]Entry, error)
load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error)
name() string
syncInterval() time.Duration
sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error
store(ctx context.Context, ref Ref, value []byte) (*url.URL, error)
delete(ctx context.Context, ref Ref) error
}
Expand All @@ -32,19 +35,25 @@ type ASM struct {
coordinator *leader.Coordinator[asmClient]
}

var _ Provider[Secrets] = &ASM{}
var _ AsynchronousProvider[Secrets] = &ASM{}

func NewASM(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser) *ASM {
return newASMForTesting(ctx, secretsClient, advertise, leaser, clock.New())
return newASMForTesting(ctx, secretsClient, advertise, leaser, optional.None[asmClient]())
}

func newASMForTesting(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser, clock clock.Clock) *ASM {
func newASMForTesting(ctx context.Context, secretsClient *secretsmanager.Client, advertise *url.URL, leaser leases.Leaser, override optional.Option[asmClient]) *ASM {
leaderFactory := func(ctx context.Context) (asmClient, error) {
return newASMLeader(ctx, secretsClient, clock), nil
if override, ok := override.Get(); ok {
return override, nil
}
return newASMLeader(secretsClient), nil
}
followerFactory := func(ctx context.Context, url *url.URL) (client asmClient, err error) {
if override, ok := override.Get(); ok {
return override, nil
}
rpcClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, url.String(), log.Error)
return newASMFollower(ctx, rpcClient, url.String(), clock), nil
return newASMFollower(rpcClient, url.String()), nil
}
return &ASM{
coordinator: leader.NewCoordinator[asmClient](
Expand Down Expand Up @@ -74,12 +83,25 @@ func (ASM) Key() string {
return "asm"
}

func (a *ASM) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
func (a *ASM) SyncInterval() time.Duration {
client, err := a.coordinator.Get()
if err != nil {
return nil, err
// Could not coordinate, try again soon
return time.Second * 5
}
return client.syncInterval()
}

func (a *ASM) Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
client, err := a.coordinator.Get()
if err != nil {
return fmt.Errorf("could not coordinate ASM: %w", err)
}
return client.load(ctx, ref, key)
err = client.sync(ctx, values)
if err != nil {
return fmt.Errorf("%s: %w", client.name(), err)
}
return nil
}

// Store and if the secret already exists, update it.
Expand All @@ -88,13 +110,21 @@ func (a *ASM) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error
if err != nil {
return nil, err
}
return client.store(ctx, ref, value)
url, err := client.store(ctx, ref, value)
if err != nil {
return nil, fmt.Errorf("%s: %w", client.name(), err)
}
return url, nil
}

func (a *ASM) Delete(ctx context.Context, ref Ref) error {
client, err := a.coordinator.Get()
if err != nil {
return err
}
return client.delete(ctx, ref)
err = client.delete(ctx, ref)
if err != nil {
return fmt.Errorf("%s: %w", client.name(), err)
}
return nil
}
55 changes: 19 additions & 36 deletions common/configuration/asm_follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/url"
"time"

"github.com/benbjohnson/clock"
"github.com/puzpuzpuz/xsync/v3"

"connectrpc.com/connect"
Expand All @@ -18,27 +17,32 @@ const asmFollowerSyncInterval = time.Minute * 1

// asmFollower uses AdminService to get/set secrets from the leader
type asmFollower struct {
leaderName string

// client requests/responses use unobfuscated values
client ftlv1connect.AdminServiceClient

// cache stores obfuscated values
cache *secretsCache
}

var _ asmClient = &asmFollower{}

func newASMFollower(ctx context.Context, rpcClient ftlv1connect.AdminServiceClient, leaderName string, clock clock.Clock) *asmFollower {
func newASMFollower(rpcClient ftlv1connect.AdminServiceClient, leaderName string) *asmFollower {
f := &asmFollower{
client: rpcClient,
cache: newSecretsCache(fmt.Sprintf("asm/follower/%s", leaderName)),
leaderName: leaderName,
client: rpcClient,
}
go f.cache.sync(ctx, asmFollowerSyncInterval, func(ctx context.Context, secrets *xsync.MapOf[Ref, cachedSecret]) error {
return f.sync(ctx, secrets)
}, clock)
return f
}

func (f *asmFollower) sync(ctx context.Context, secrets *xsync.MapOf[Ref, cachedSecret]) error {
func (f *asmFollower) name() string {
return fmt.Sprintf("asm/follower/%s", f.leaderName)
}

func (f *asmFollower) syncInterval() time.Duration {
return asmFollowerSyncInterval
}

func (f *asmFollower) sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
// values must store obfuscated values, but f.client gives unobfuscated values
obfuscator := Secrets{}.obfuscator()
module := ""
includeValues := true
Expand All @@ -60,39 +64,20 @@ func (f *asmFollower) sync(ctx context.Context, secrets *xsync.MapOf[Ref, cached
return fmt.Errorf("asm follower could not obfuscate value for ref %q: %w", s.RefPath, err)
}
visited[ref] = true
secrets.Store(ref, cachedSecret{
value: obfuscatedValue,
values.Store(ref, SyncedValue{
Value: obfuscatedValue,
})
}
// delete old values
secrets.Range(func(ref Ref, _ cachedSecret) bool {
values.Range(func(ref Ref, _ SyncedValue) bool {
if !visited[ref] {
secrets.Delete(ref)
values.Delete(ref)
}
return true
})
return nil
}

// list all secrets in the account.
func (f *asmFollower) list(ctx context.Context) ([]Entry, error) {
entries := []Entry{}
err := f.cache.iterate(func(ref Ref, _ []byte) {
entries = append(entries, Entry{
Ref: ref,
Accessor: asmURLForRef(ref),
})
})
if err != nil {
return nil, err
}
return entries, nil
}

func (f *asmFollower) load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
return f.cache.getSecret(ref)
}

func (f *asmFollower) store(ctx context.Context, ref Ref, obfuscatedValue []byte) (*url.URL, error) {
obfuscator := Secrets{}.obfuscator()
unobfuscatedValue, err := obfuscator.Reveal(obfuscatedValue)
Expand All @@ -111,7 +96,6 @@ func (f *asmFollower) store(ctx context.Context, ref Ref, obfuscatedValue []byte
if err != nil {
return nil, err
}
f.cache.updatedSecret(ref, obfuscatedValue)
return asmURLForRef(ref), nil
}

Expand All @@ -127,6 +111,5 @@ func (f *asmFollower) delete(ctx context.Context, ref Ref) error {
if err != nil {
return err
}
f.cache.deletedSecret(ref)
return nil
}
Loading

0 comments on commit 6508d57

Please sign in to comment.