From ac85eb32411af6916a82686ca97c9bd723af4fc0 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Jul 2024 11:09:24 +1000 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20no=20need=20to=20specify=20=E2=80=94?= =?UTF-8?q?opvault=20to=20access=20secrets?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/configuration/1password_provider.go | 50 +++++++++++++--------- common/configuration/api.go | 6 +-- common/configuration/asm.go | 2 +- common/configuration/cache.go | 34 +++++---------- 4 files changed, 45 insertions(+), 47 deletions(-) diff --git a/common/configuration/1password_provider.go b/common/configuration/1password_provider.go index 0b98e1d5c4..2bcc2a6391 100644 --- a/common/configuration/1password_provider.go +++ b/common/configuration/1password_provider.go @@ -15,7 +15,6 @@ import ( "github.com/TBD54566975/ftl/internal/exec" "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/slices" ) // OnePasswordProvider is a configuration provider that reads passwords from @@ -41,35 +40,46 @@ func (o OnePasswordProvider) SyncInterval() time.Duration { return time.Second * 10 } -func (o OnePasswordProvider) Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error { +// Sync will fetch all secrets from the 1Password vault and store them in the values map. +// Do not just sync the o.Vault, instead find all vaults found in entries and sync them. +// If there are no entries, we should not attempt any access of 1Password. +func (o OnePasswordProvider) Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error { logger := log.FromContext(ctx) - if o.Vault == "" { - return fmt.Errorf("1Password vault not set: use --opvault flag to specify the vault") - } if err := checkOpBinary(); err != nil { return err } - full, err := o.getItem(ctx, o.Vault) - if err != nil { - return fmt.Errorf("get item failed: %w", err) + // find vaults + vaults := map[string]bool{} + for _, e := range entries { + vault := e.Accessor.Host + if vault == "" { + logger.Warnf("empty vault name for %s", e.Ref) + continue + } + vaults[e.Accessor.Host] = true } - - for _, field := range full.Fields { - ref, err := ParseRef(field.Label) + // get all secrets from all vaults + refs := map[Ref]bool{} + for vault := range vaults { + full, err := o.getItem(ctx, vault) if err != nil { - logger.Warnf("invalid field label found in 1Password: %q", field.Label) - continue + return fmt.Errorf("get item failed: %w", err) + } + for _, field := range full.Fields { + ref, err := ParseRef(field.Label) + if err != nil { + logger.Warnf("invalid field label found in 1Password: %q", field.Label) + continue + } + refs[ref] = true + values.Store(ref, SyncedValue{ + Value: []byte(field.Value), + }) } - values.Store(ref, SyncedValue{ - Value: []byte(field.Value), - }) } - // delete old values values.Range(func(ref Ref, _ SyncedValue) bool { - if _, ok := slices.Find(full.Fields, func(item entry) bool { - return item.Label == ref.String() - }); !ok { + if _, ok := refs[ref]; !ok { values.Delete(ref) } return true diff --git a/common/configuration/api.go b/common/configuration/api.go index 626836535b..1d03ce9a2e 100644 --- a/common/configuration/api.go +++ b/common/configuration/api.go @@ -117,9 +117,9 @@ type AsynchronousProvider[R Role] interface { // SyncInterval() provides the expected time between syncs. // If Sync() returns an error, sync will be retried with an exponential backoff. // - // Sync is only called if the Router has keys referring to this provider. - // If the Router did have keys for this provider but removed them, one more round of sync is executed until Sync() will stop being called - Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error + // Values should be updated by Sync(). + // An array of known entries from the router is provided in case it is helpful, but the provider can store any values it wants. + Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error } type VersionToken any diff --git a/common/configuration/asm.go b/common/configuration/asm.go index c7604d157d..74bf8826dd 100644 --- a/common/configuration/asm.go +++ b/common/configuration/asm.go @@ -92,7 +92,7 @@ func (a *ASM) SyncInterval() time.Duration { return client.syncInterval() } -func (a *ASM) Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error { +func (a *ASM) Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error { client, err := a.coordinator.Get() if err != nil { return fmt.Errorf("could not coordinate ASM: %w", err) diff --git a/common/configuration/cache.go b/common/configuration/cache.go index 4ef06bcb31..5e3cd4e0cf 100644 --- a/common/configuration/cache.go +++ b/common/configuration/cache.go @@ -8,7 +8,6 @@ import ( "time" "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/slices" "github.com/alecthomas/types/optional" "github.com/alecthomas/types/pubsub" "github.com/benbjohnson/clock" @@ -157,18 +156,22 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) { if len(providersToSync) == 0 { continue } - list, err := c.listProvider.List(ctx) + entries, err := c.listProvider.List(ctx) if err != nil { logger.Warnf("could not sync: could not get list: %v", err) continue } for _, cp := range providersToSync { - _, hasValues := slices.Find(list, func(e Entry) bool { - return ProviderKeyForAccessor(e.Accessor) == cp.provider.Key() - }) + entriesForProvider := []Entry{} + for _, e := range entries { + if ProviderKeyForAccessor(e.Accessor) != cp.provider.Key() { + continue + } + entriesForProvider = append(entriesForProvider, e) + } wg.Add(1) go func(cp *cacheProvider[R]) { - cp.sync(ctx, clock, hasValues) + cp.sync(ctx, entriesForProvider, clock) wg.Done() }(cp) } @@ -191,11 +194,6 @@ type cacheProvider[R Role] struct { provider AsynchronousProvider[R] values *xsync.MapOf[Ref, SyncedValue] - // isActive is the provider is used by any value - // When inactive, the provider will not be synced. This helps avoid accessing resources that are not needed, like 1Password. - // When the provider stops being used, we do one final sync to ensure the cache is up-to-date - isActive bool - loaded chan bool // closed when values have been synced for the first time loadedOnce *sync.Once // ensures we close the loaded channel only once @@ -228,20 +226,11 @@ func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool { } // sync executes sync on the provider and updates the cacheProvider sync state -func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValues bool) { +func (c *cacheProvider[R]) sync(ctx context.Context, entries []Entry, clock clock.Clock) { logger := log.FromContext(ctx) - if !hasValues && !c.isActive { - // skip - c.loadedOnce.Do(func() { - // treat this as a successful initial sync, so we don't ever block trying to access this provider - close(c.loaded) - }) - return - } - c.lastSyncAttempt = optional.Some(clock.Now()) - err := c.provider.Sync(ctx, c.values) + err := c.provider.Sync(ctx, entries, c.values) if err != nil { logger.Errorf(err, "Error syncing %s", c.provider.Key()) if backoff, ok := c.currentBackoff.Get(); ok { @@ -256,7 +245,6 @@ func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValue c.loadedOnce.Do(func() { close(c.loaded) }) - c.isActive = hasValues } // processEvent updates the cache after a value was set or deleted From 7f56d8cac0ef83ee692e6f30cf7ad3b90473bf43 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 15 Jul 2024 11:51:52 +1000 Subject: [PATCH 2/2] clean up and fix tests --- common/configuration/asm_test.go | 12 ++++++------ common/configuration/cache.go | 11 ++++------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/common/configuration/asm_test.go b/common/configuration/asm_test.go index bc91641f8e..19ce5a9580 100644 --- a/common/configuration/asm_test.go +++ b/common/configuration/asm_test.go @@ -174,9 +174,9 @@ func TestLeaderSync(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) sm, _, _, externalClient, clock, _ := setUp(ctx, t, None[Router[Secrets]]()) testClientSync(ctx, t, sm, externalClient, true, func(percentage float64) { - clock.Add(time.Duration(percentage) * asmLeaderSyncInterval) + clock.Add(time.Second * (time.Duration(asmLeaderSyncInterval.Seconds()*percentage) + 1.0)) if percentage == 1.0 { - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 4) } }) } @@ -202,15 +202,15 @@ func TestFollowerSync(t *testing.T) { testClientSync(ctx, t, sm, externalClient, false, func(percentage float64) { // sync leader - leaderClock.Add(time.Duration(percentage) * asmLeaderSyncInterval) + leaderClock.Add(time.Second * (time.Duration(asmLeaderSyncInterval.Seconds()*percentage) + 1.0)) if percentage == 1.0 { - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 4) } // then sync follower - followerClock.Add(time.Duration(percentage) * asmFollowerSyncInterval) + followerClock.Add(time.Second * (time.Duration(asmFollowerSyncInterval.Seconds()*percentage) + 1.0)) if percentage == 1.0 { - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 4) } }) } diff --git a/common/configuration/cache.go b/common/configuration/cache.go index 5e3cd4e0cf..8e8173f1df 100644 --- a/common/configuration/cache.go +++ b/common/configuration/cache.go @@ -8,6 +8,7 @@ import ( "time" "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/slices" "github.com/alecthomas/types/optional" "github.com/alecthomas/types/pubsub" "github.com/benbjohnson/clock" @@ -162,13 +163,9 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) { continue } for _, cp := range providersToSync { - entriesForProvider := []Entry{} - for _, e := range entries { - if ProviderKeyForAccessor(e.Accessor) != cp.provider.Key() { - continue - } - entriesForProvider = append(entriesForProvider, e) - } + entriesForProvider := slices.Filter(entries, func(e Entry) bool { + return ProviderKeyForAccessor(e.Accessor) == cp.provider.Key() + }) wg.Add(1) go func(cp *cacheProvider[R]) { cp.sync(ctx, entriesForProvider, clock)