Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: no need to specify —opvault to access secrets #2071

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 30 additions & 20 deletions common/configuration/1password_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/TBD54566975/ftl/internal/exec"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/slices"
)

// OnePasswordProvider is a configuration provider that reads passwords from
Expand All @@ -41,35 +40,46 @@ func (o OnePasswordProvider) SyncInterval() time.Duration {
return time.Second * 10
}

func (o OnePasswordProvider) Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
// Sync will fetch all secrets from the 1Password vault and store them in the values map.
// Do not just sync the o.Vault, instead find all vaults found in entries and sync them.
// If there are no entries, we should not attempt any access of 1Password.
func (o OnePasswordProvider) Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error {
logger := log.FromContext(ctx)
if o.Vault == "" {
return fmt.Errorf("1Password vault not set: use --opvault flag to specify the vault")
}
if err := checkOpBinary(); err != nil {
return err
}
full, err := o.getItem(ctx, o.Vault)
if err != nil {
return fmt.Errorf("get item failed: %w", err)
// find vaults
vaults := map[string]bool{}
for _, e := range entries {
vault := e.Accessor.Host
if vault == "" {
logger.Warnf("empty vault name for %s", e.Ref)
continue
}
vaults[e.Accessor.Host] = true
}

for _, field := range full.Fields {
ref, err := ParseRef(field.Label)
// get all secrets from all vaults
refs := map[Ref]bool{}
for vault := range vaults {
full, err := o.getItem(ctx, vault)
if err != nil {
logger.Warnf("invalid field label found in 1Password: %q", field.Label)
continue
return fmt.Errorf("get item failed: %w", err)
}
for _, field := range full.Fields {
ref, err := ParseRef(field.Label)
if err != nil {
logger.Warnf("invalid field label found in 1Password: %q", field.Label)
continue
}
refs[ref] = true
values.Store(ref, SyncedValue{
Value: []byte(field.Value),
})
}
values.Store(ref, SyncedValue{
Value: []byte(field.Value),
})
}

// delete old values
values.Range(func(ref Ref, _ SyncedValue) bool {
if _, ok := slices.Find(full.Fields, func(item entry) bool {
return item.Label == ref.String()
}); !ok {
if _, ok := refs[ref]; !ok {
values.Delete(ref)
}
return true
Expand Down
6 changes: 3 additions & 3 deletions common/configuration/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ type AsynchronousProvider[R Role] interface {
// SyncInterval() provides the expected time between syncs.
// If Sync() returns an error, sync will be retried with an exponential backoff.
//
// Sync is only called if the Router has keys referring to this provider.
// If the Router did have keys for this provider but removed them, one more round of sync is executed until Sync() will stop being called
Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error
// Values should be updated by Sync().
// An array of known entries from the router is provided in case it is helpful, but the provider can store any values it wants.
Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error
}

type VersionToken any
Expand Down
2 changes: 1 addition & 1 deletion common/configuration/asm.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (a *ASM) SyncInterval() time.Duration {
return client.syncInterval()
}

func (a *ASM) Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error {
func (a *ASM) Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error {
client, err := a.coordinator.Get()
if err != nil {
return fmt.Errorf("could not coordinate ASM: %w", err)
Expand Down
34 changes: 11 additions & 23 deletions common/configuration/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/pubsub"
"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -157,18 +156,22 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
if len(providersToSync) == 0 {
continue
}
list, err := c.listProvider.List(ctx)
entries, err := c.listProvider.List(ctx)
if err != nil {
logger.Warnf("could not sync: could not get list: %v", err)
continue
}
for _, cp := range providersToSync {
_, hasValues := slices.Find(list, func(e Entry) bool {
return ProviderKeyForAccessor(e.Accessor) == cp.provider.Key()
})
entriesForProvider := []Entry{}
for _, e := range entries {
if ProviderKeyForAccessor(e.Accessor) != cp.provider.Key() {
continue
}
entriesForProvider = append(entriesForProvider, e)
}
wg.Add(1)
go func(cp *cacheProvider[R]) {
cp.sync(ctx, clock, hasValues)
cp.sync(ctx, entriesForProvider, clock)
wg.Done()
}(cp)
}
Expand All @@ -191,11 +194,6 @@ type cacheProvider[R Role] struct {
provider AsynchronousProvider[R]
values *xsync.MapOf[Ref, SyncedValue]

// isActive is the provider is used by any value
// When inactive, the provider will not be synced. This helps avoid accessing resources that are not needed, like 1Password.
// When the provider stops being used, we do one final sync to ensure the cache is up-to-date
isActive bool

loaded chan bool // closed when values have been synced for the first time
loadedOnce *sync.Once // ensures we close the loaded channel only once

Expand Down Expand Up @@ -228,20 +226,11 @@ func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool {
}

// sync executes sync on the provider and updates the cacheProvider sync state
func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValues bool) {
func (c *cacheProvider[R]) sync(ctx context.Context, entries []Entry, clock clock.Clock) {
logger := log.FromContext(ctx)

if !hasValues && !c.isActive {
// skip
c.loadedOnce.Do(func() {
// treat this as a successful initial sync, so we don't ever block trying to access this provider
close(c.loaded)
})
return
}

c.lastSyncAttempt = optional.Some(clock.Now())
err := c.provider.Sync(ctx, c.values)
err := c.provider.Sync(ctx, entries, c.values)
if err != nil {
logger.Errorf(err, "Error syncing %s", c.provider.Key())
if backoff, ok := c.currentBackoff.Get(); ok {
Expand All @@ -256,7 +245,6 @@ func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValue
c.loadedOnce.Do(func() {
close(c.loaded)
})
c.isActive = hasValues
}

// processEvent updates the cache after a value was set or deleted
Expand Down
Loading