From 92b31096f9ad69055ab9de86629f621a2abc6f2f Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Jul 2024 11:43:13 +1000 Subject: [PATCH] improve docs --- common/configuration/api.go | 8 ++++++++ common/configuration/cache.go | 19 ++++++++++++------- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/common/configuration/api.go b/common/configuration/api.go index 267dad6335..626836535b 100644 --- a/common/configuration/api.go +++ b/common/configuration/api.go @@ -111,6 +111,14 @@ type AsynchronousProvider[R Role] interface { Provider[R] SyncInterval() time.Duration + + // Sync is called periodically to update the cache with the latest values. + // + // SyncInterval() provides the expected time between syncs. + // If Sync() returns an error, sync will be retried with an exponential backoff. + // + // Sync is only called if the Router has keys referring to this provider. + // If the Router did have keys for this provider but removed them, one more round of sync is executed until Sync() will stop being called Sync(ctx context.Context, values *xsync.MapOf[Ref, SyncedValue]) error } diff --git a/common/configuration/cache.go b/common/configuration/cache.go index df0e36f744..4ef06bcb31 100644 --- a/common/configuration/cache.go +++ b/common/configuration/cache.go @@ -32,13 +32,15 @@ type updateCacheEvent struct { value optional.Option[[]byte] } -// cache maintains a cache of providers that can be synced +// cache maintains a cache for providers that can be synced // // Loading values always returns the cached value. // Sync happens periodically. // Updates do not go through the cache, but the cache is notified after the update occurs. type cache[R Role] struct { - providers map[string]*cacheProvider[R] + providers map[string]*cacheProvider[R] + + // list provider is used to determine which providers are expected to have values, and therefore need to be synced listProvider listProvider topic *pubsub.Topic[updateCacheEvent] @@ -67,6 +69,7 @@ func newCache[R Role](ctx context.Context, providers []AsynchronousProvider[R], return cache } +// load is called by the manager to get a value from the cache func (c *cache[R]) load(ref Ref, key *url.URL) ([]byte, error) { providerKey := ProviderKeyForAccessor(key) provider, ok := c.providers[key.Scheme] @@ -193,10 +196,10 @@ type cacheProvider[R Role] struct { // When the provider stops being used, we do one final sync to ensure the cache is up-to-date isActive bool - // closed when values have been synced for the first time - loaded chan bool - loadedOnce *sync.Once - lastSyncAttempt optional.Option[time.Time] + loaded chan bool // closed when values have been synced for the first time + loadedOnce *sync.Once // ensures we close the loaded channel only once + + lastSyncAttempt optional.Option[time.Time] // updated each time we attempt to sync, regardless of success/failure currentBackoff optional.Option[time.Duration] } @@ -212,6 +215,7 @@ func (c *cacheProvider[R]) waitForInitialSync() error { } } +// needsSync returns true if the provider needs to be synced. func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool { lastSyncAttempt, ok := c.lastSyncAttempt.Get() if !ok { @@ -223,6 +227,7 @@ func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool { return clock.Now().After(lastSyncAttempt.Add(c.provider.SyncInterval())) } +// sync executes sync on the provider and updates the cacheProvider sync state func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValues bool) { logger := log.FromContext(ctx) @@ -254,7 +259,7 @@ func (c *cacheProvider[R]) sync(ctx context.Context, clock clock.Clock, hasValue c.isActive = hasValues } -// processEvent updates the cache +// processEvent updates the cache after a value was set or deleted func (c *cacheProvider[R]) processEvent(e updateCacheEvent) { select { case <-c.loaded: