From 05ca5e82ff6523e57628c0cbef655b0640224415 Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Thu, 15 Feb 2024 09:25:44 -0500 Subject: [PATCH] Restore relative node expiry to original behavior When relative node expiry was originally implemented it only ran on Auth and propagated delete events downstream. However, that was quickly changed in #12002 because the event system could not keep up in clusters with high churn and would result in buffer overflow errors in downstream caches. The event system has been overhauled to use FanoutV2, which no longer suffers from the burst of events causing problems. The change to not propagate delete events also breaks any node watchers on the local cache from ever expiring the server since they never receive a delete event. This reverts some of the changes from #12002 such that relative expiry now only runs on the auth cache and emits delete events. Each relative expiry interval is however still limited to only remove up to a fixed number of nodes. Closes #37527 --- lib/cache/cache.go | 47 ++++++++++++++++++++--------------------- lib/cache/cache_test.go | 12 +++++++---- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/lib/cache/cache.go b/lib/cache/cache.go index dd9171acc25fa..1b1fce089cbe0 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -112,6 +112,7 @@ func makeAllKnownCAsFilter() types.CertAuthorityFilter { // ForAuth sets up watch configuration for the auth server func ForAuth(cfg Config) Config { cfg.target = "auth" + cfg.EnableRelativeExpiry = true cfg.Watches = []types.WatchKind{ {Kind: types.KindCertAuthority, LoadSecrets: true}, {Kind: types.KindClusterName}, @@ -739,6 +740,9 @@ type Config struct { // healthy even if some of the requested resource kinds aren't // supported by the event source. DisablePartialHealth bool + // EnableRelativeExpiry turns on purging expired items from the cache even + // if delete events have not been received from the backend. + EnableRelativeExpiry bool } // CheckAndSetDefaults checks parameters and sets default values @@ -1280,18 +1284,14 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer retry.Reset() - // only enable relative node expiry if the cache is configured - // to watch for types.KindNode + // Only enable relative node expiry for the auth cache. relativeExpiryInterval := interval.NewNoop() - for _, watch := range c.Config.Watches { - if watch.Kind == types.KindNode { - relativeExpiryInterval = interval.New(interval.Config{ - Duration: c.Config.RelativeExpiryCheckInterval, - FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval), - Jitter: retryutils.NewSeventhJitter(), - }) - break - } + if c.EnableRelativeExpiry { + relativeExpiryInterval = interval.New(interval.Config{ + Duration: c.Config.RelativeExpiryCheckInterval, + FirstDuration: utils.HalfJitter(c.Config.RelativeExpiryCheckInterval), + Jitter: retryutils.NewSeventhJitter(), + }) } defer relativeExpiryInterval.Stop() @@ -1344,8 +1344,7 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer } } - err = c.processEvent(ctx, event, true) - if err != nil { + if err := c.processEvent(ctx, event); err != nil { return trace.Wrap(err) } c.notify(c.ctx, Event{Event: event, Type: EventProcessed}) @@ -1438,7 +1437,7 @@ func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error { Kind: types.KindNode, Metadata: node.GetMetadata(), }, - }, false); err != nil { + }); err != nil { return trace.Wrap(err) } @@ -1605,9 +1604,9 @@ func (c *Cache) fetch(ctx context.Context, confirmedKinds map[resourceKind]types } // processEvent hands the event off to the appropriate collection for processing. Any -// resources which were not registered are ignored. If processing completed successfully -// and emit is true the event will be emitted via the fanout. -func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) error { +// resources which were not registered are ignored. If processing completed successfully, +// the event will be emitted via the fanout. +func (c *Cache) processEvent(ctx context.Context, event types.Event) error { resourceKind := resourceKindFromResource(event.Resource) collection, ok := c.collections.byKind[resourceKind] if !ok { @@ -1617,14 +1616,14 @@ func (c *Cache) processEvent(ctx context.Context, event types.Event, emit bool) if err := collection.processEvent(ctx, event); err != nil { return trace.Wrap(err) } - if emit { - c.eventsFanout.Emit(event) - if !isHighVolumeResource(resourceKind.kind) { - c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) { - f.Emit(event) - }) - } + + c.eventsFanout.Emit(event) + if !isHighVolumeResource(resourceKind.kind) { + c.lowVolumeEventsFanout.ForEach(func(f *services.FanoutV2) { + f.Emit(event) + }) } + return nil } diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 7cd84f56f8feb..c4c4da4fa9cf7 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -2538,6 +2538,7 @@ func TestRelativeExpiry(t *testing.T) { } func TestRelativeExpiryLimit(t *testing.T) { + t.Parallel() const ( checkInterval = time.Second nodeCount = 100 @@ -2555,7 +2556,7 @@ func TestRelativeExpiryLimit(t *testing.T) { c.RelativeExpiryCheckInterval = checkInterval c.RelativeExpiryLimit = expiryLimit c.Clock = clock - return ForProxy(c) + return ForAuth(c) }) t.Cleanup(p.Close) @@ -2595,7 +2596,8 @@ func TestRelativeExpiryLimit(t *testing.T) { } } -func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) { +func TestRelativeExpiryOnlyForAuth(t *testing.T) { + t.Parallel() clock := clockwork.NewFakeClockAt(time.Now().Add(time.Hour)) p := newTestPack(t, func(c Config) Config { c.RelativeExpiryCheckInterval = time.Second @@ -2608,9 +2610,10 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) { p2 := newTestPack(t, func(c Config) Config { c.RelativeExpiryCheckInterval = time.Second c.Clock = clock + c.target = "llama" c.Watches = []types.WatchKind{ {Kind: types.KindNamespace}, - {Kind: types.KindNamespace}, + {Kind: types.KindNode}, {Kind: types.KindCertAuthority}, } return c @@ -2620,8 +2623,9 @@ func TestRelativeExpiryOnlyForNodeWatches(t *testing.T) { for i := 0; i < 2; i++ { clock.Advance(time.Hour * 24) drainEvents(p.eventsC) - expectEvent(t, p.eventsC, RelativeExpiry) + unexpectedEvent(t, p.eventsC, RelativeExpiry) + clock.Advance(time.Hour * 24) drainEvents(p2.eventsC) unexpectedEvent(t, p2.eventsC, RelativeExpiry) }