Skip to content

Commit

Permalink
improve docs
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jul 12, 2024
1 parent a1061f0 commit 92b3109
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
8 changes: 8 additions & 0 deletions common/configuration/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ type AsynchronousProvider[R Role] interface {
Provider[R]

SyncInterval() time.Duration

// Sync is called periodically to update the cache with the latest values.
//
// SyncInterval() provides the expected time between syncs.
// If Sync() returns an error, sync will be retried with an exponential backoff.
//
// Sync is only called if the Router has keys referring to this provider.
// If the Router did have keys for this provider but removed them, one more round of sync is executed until Sync() will stop being called
Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error
}

Expand Down
19 changes: 12 additions & 7 deletions common/configuration/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ type updateCacheEvent struct {
value optional.Option[[]byte]
}

// cache maintains a cache of providers that can be synced
// cache maintains a cache for providers that can be synced
//
// Loading values always returns the cached value.
// Sync happens periodically.
// Updates do not go through the cache, but the cache is notified after the update occurs.
type cache[R Role] struct {
providers map[string]*cacheProvider[R]
providers map[string]*cacheProvider[R]

// list provider is used to determine which providers are expected to have values, and therefore need to be synced
listProvider listProvider

topic *pubsub.Topic[updateCacheEvent]
Expand Down Expand Up @@ -67,6 +69,7 @@ func newCache[R Role](ctx context.Context, providers []AsynchronousProvider[R],
return cache
}

// load is called by the manager to get a value from the cache
func (c *cache[R]) load(ref Ref, key *url.URL) ([]byte, error) {
providerKey := ProviderKeyForAccessor(key)
provider, ok := c.providers[key.Scheme]
Expand Down Expand Up @@ -193,10 +196,10 @@ type cacheProvider[R Role] struct {
// When the provider stops being used, we do one final sync to ensure the cache is up-to-date
isActive bool

// closed when values have been synced for the first time
loaded chan bool
loadedOnce *sync.Once
lastSyncAttempt optional.Option[time.Time]
loaded chan bool // closed when values have been synced for the first time
loadedOnce *sync.Once // ensures we close the loaded channel only once

lastSyncAttempt optional.Option[time.Time] // updated each time we attempt to sync, regardless of success/failure
currentBackoff optional.Option[time.Duration]
}

Expand All @@ -212,6 +215,7 @@ func (c *cacheProvider[R]) waitForInitialSync() error {
}
}

// needsSync returns true if the provider needs to be synced.
func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool {
lastSyncAttempt, ok := c.lastSyncAttempt.Get()
if !ok {
Expand All @@ -223,6 +227,7 @@ func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool {
return clock.Now().After(lastSyncAttempt.Add(c.provider.SyncInterval()))
}

// sync executes sync on the provider and updates the cacheProvider sync state
func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValues bool) {
logger := log.FromContext(ctx)

Expand Down Expand Up @@ -254,7 +259,7 @@ func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValue
c.isActive = hasValues
}

// processEvent updates the cache
// processEvent updates the cache after a value was set or deleted
func (c *cacheProvider[R]) processEvent(e updateCacheEvent) {
select {
case <-c.loaded:
Expand Down

0 comments on commit 92b3109

Please sign in to comment.