diff --git a/lib/backend/buffer.go b/lib/backend/buffer.go index bee29dc2bc0c3..7456ce300a43d 100644 --- a/lib/backend/buffer.go +++ b/lib/backend/buffer.go @@ -34,9 +34,10 @@ import ( ) type bufferConfig struct { - gracePeriod time.Duration - capacity int - clock clockwork.Clock + gracePeriod time.Duration + creationGracePeriod time.Duration + capacity int + clock clockwork.Clock } type BufferOption func(*bufferConfig) @@ -59,6 +60,16 @@ func BacklogGracePeriod(d time.Duration) BufferOption { } } +// CreationGracePeriod sets the amount of time delay after watcher creation before +// it will be considered for removal due to backlog. +func CreationGracePeriod(d time.Duration) BufferOption { + return func(cfg *bufferConfig) { + if d > 0 { + cfg.creationGracePeriod = d + } + } +} + // BufferClock sets a custom clock for the buffer (used in tests). func BufferClock(c clockwork.Clock) BufferOption { return func(cfg *bufferConfig) { @@ -81,9 +92,10 @@ type CircularBuffer struct { // NewCircularBuffer returns a new uninitialized instance of circular buffer. func NewCircularBuffer(opts ...BufferOption) *CircularBuffer { cfg := bufferConfig{ - gracePeriod: DefaultBacklogGracePeriod, - capacity: DefaultBufferCapacity, - clock: clockwork.NewRealClock(), + gracePeriod: DefaultBacklogGracePeriod, + creationGracePeriod: DefaultCreationGracePeriod, + capacity: DefaultBufferCapacity, + clock: clockwork.NewRealClock(), } for _, opt := range opts { opt(&cfg) @@ -255,6 +267,7 @@ func (c *CircularBuffer) NewWatcher(ctx context.Context, watch Watch) (Watcher, buffer: c, Watch: watch, eventsC: make(chan Event, watch.QueueSize), + created: c.cfg.clock.Now(), ctx: closeCtx, cancel: cancel, capacity: watch.QueueSize, @@ -294,6 +307,7 @@ type BufferWatcher struct { bmu sync.Mutex backlog []Event backlogSince time.Time + created time.Time ctx context.Context cancel context.CancelFunc @@ -349,7 +363,7 @@ func (w *BufferWatcher) emit(e Event) (ok bool) { defer w.bmu.Unlock() if !w.flushBacklog() { - if w.buffer.cfg.clock.Now().After(w.backlogSince.Add(w.buffer.cfg.gracePeriod)) { + if now := w.buffer.cfg.clock.Now(); now.After(w.backlogSince.Add(w.buffer.cfg.gracePeriod)) && now.After(w.created.Add(w.buffer.cfg.creationGracePeriod)) { // backlog has existed for longer than grace period, // this watcher needs to be removed. return false diff --git a/lib/backend/buffer_test.go b/lib/backend/buffer_test.go index e295a83da5abf..bb4b89cd181af 100644 --- a/lib/backend/buffer_test.go +++ b/lib/backend/buffer_test.go @@ -80,6 +80,7 @@ func TestWatcherCapacity(t *testing.T) { BufferCapacity(1), BufferClock(clock), BacklogGracePeriod(gracePeriod), + CreationGracePeriod(time.Nanosecond), ) defer b.Close() b.SetInit() @@ -142,6 +143,71 @@ func TestWatcherCapacity(t *testing.T) { } } +func TestWatcherCreationGracePeriod(t *testing.T) { + const backlogGracePeriod = time.Second + const creationGracePeriod = backlogGracePeriod * 3 + const queueSize = 1 + clock := clockwork.NewFakeClock() + + ctx := context.Background() + b := NewCircularBuffer( + BufferCapacity(1), + BufferClock(clock), + BacklogGracePeriod(backlogGracePeriod), + CreationGracePeriod(creationGracePeriod), + ) + defer b.Close() + b.SetInit() + + w, err := b.NewWatcher(ctx, Watch{ + QueueSize: queueSize, + }) + require.NoError(t, err) + defer w.Close() + + select { + case e := <-w.Events(): + require.Equal(t, types.OpInit, e.Type) + default: + t.Fatalf("Expected immediate OpInit.") + } + + // emit enough events to create a backlog + for i := 0; i < queueSize*2; i++ { + b.Emit(Event{Item: Item{Key: []byte{Separator}}}) + } + + select { + case <-w.Done(): + t.Fatal("watcher closed unexpectedly") + default: + } + + // sanity-check + require.Greater(t, creationGracePeriod, backlogGracePeriod*2) + + // advance well past the backlog grace period, but not past the creation grace period + clock.Advance(backlogGracePeriod * 2) + + b.Emit(Event{Item: Item{Key: []byte{Separator}}}) + + select { + case <-w.Done(): + t.Fatal("watcher closed unexpectedly") + default: + } + + // advance well past creation grace period + clock.Advance(creationGracePeriod) + + b.Emit(Event{Item: Item{Key: []byte{Separator}}}) + select { + case <-w.Done(): + default: + t.Fatal("watcher did not close after creation grace period exceeded") + } +} + // TestWatcherClose makes sure that closed watcher // will be removed func TestWatcherClose(t *testing.T) { diff --git a/lib/backend/defaults.go b/lib/backend/defaults.go index 357ce94db5b44..807b901826cbf 100644 --- a/lib/backend/defaults.go +++ b/lib/backend/defaults.go @@ -28,6 +28,12 @@ const ( // (e.g. heartbeats) are be created. If a watcher can't catch up in under a minute, // it probably won't catch up. DefaultBacklogGracePeriod = time.Second * 59 + // DefaultCreationGracePeriod is the default amount of time time that the circular buffer + // will wait before enforcing the backlog grace period. This is intended to give downstream + // caches time to initialize before they start receiving events. Without this, large caches + // may be unable to successfully initialize even if they would otherwise be able to keep up + // with the event stream once established. + DefaultCreationGracePeriod = DefaultBacklogGracePeriod * 3 // DefaultPollStreamPeriod is a default event poll stream period DefaultPollStreamPeriod = time.Second // DefaultEventsTTL is a default events TTL period diff --git a/lib/backend/report.go b/lib/backend/report.go index e77134658f28e..b661ecd7b7f51 100644 --- a/lib/backend/report.go +++ b/lib/backend/report.go @@ -29,6 +29,7 @@ import ( log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" oteltrace "go.opentelemetry.io/otel/trace" + "golang.org/x/time/rate" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -82,6 +83,8 @@ type Reporter struct { // This will keep an upper limit on our memory usage while still always // reporting the most active keys. topRequestsCache *lru.Cache[topRequestsCacheKey, struct{}] + + slowRangeLogLimiter *rate.Limiter } // NewReporter returns a new Reporter. @@ -103,8 +106,9 @@ func NewReporter(cfg ReporterConfig) (*Reporter, error) { return nil, trace.Wrap(err) } r := &Reporter{ - ReporterConfig: cfg, - topRequestsCache: cache, + ReporterConfig: cfg, + topRequestsCache: cache, + slowRangeLogLimiter: rate.NewLimiter(rate.Every(time.Minute), 12), } return r, nil } @@ -134,6 +138,17 @@ func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte, batchReadRequestsFailed.WithLabelValues(s.Component).Inc() } s.trackRequest(types.OpGet, startKey, endKey) + end := s.Clock().Now() + if d := end.Sub(start); d > time.Second*3 { + if s.slowRangeLogLimiter.AllowN(end, 1) { + log.WithFields(log.Fields{ + "start_key": string(startKey), + "end_key": string(endKey), + "limit": limit, + "duration": d.String(), + }).Warn("slow GetRange request") + } + } return res, err } diff --git a/lib/cache/cache.go b/lib/cache/cache.go index 073cb41b0b08e..2a03476b07240 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -1247,6 +1247,8 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer return trace.ConnectionProblem(nil, "timeout waiting for watcher init") } + fetchAndApplyStart := time.Now() + confirmedKindsMap := make(map[resourceKind]types.WatchKind, len(confirmedKinds)) for _, kind := range confirmedKinds { confirmedKindsMap[resourceKind{kind: kind.Kind, subkind: kind.SubKind}] = kind @@ -1310,6 +1312,19 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer c.notify(c.ctx, Event{Type: WatcherStarted}) + fetchAndApplyDuration := time.Since(fetchAndApplyStart) + if fetchAndApplyDuration > time.Second*20 { + c.Logger.WithFields(log.Fields{ + "cache_target": c.Config.target, + "duration": fetchAndApplyDuration.String(), + }).Warn("slow fetch and apply") + } else { + c.Logger.WithFields(log.Fields{ + "cache_target": c.Config.target, + "duration": fetchAndApplyDuration.String(), + }).Debug("fetch and apply") + } + var lastStalenessWarning time.Time var staleEventCount int for { @@ -1394,7 +1409,13 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer // cannot run concurrently with event processing. func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error { // TODO(fspmarshall): Start using dynamic value once it is implemented. - gracePeriod := apidefaults.ServerAnnounceTTL + + // because event streams are not necessarily ordered across keys expiring on the + // server announce TTL may sometimes generate false positives. Using the watcher + // creation grace period as our safety buffer is mostly an arbitrary choice, but + // since it approximates our expected worst-case staleness of the event stream its + // a fairly reasonable one. + gracePeriod := apidefaults.ServerAnnounceTTL + backend.DefaultCreationGracePeriod // latestExp will be the value that we choose to consider the most recent "expired" // timestamp. This will either end up being the most recently seen node expiry, or diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 9ba859868075c..98ad1d2ca9ecf 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -2618,7 +2618,7 @@ func TestRelativeExpiryLimit(t *testing.T) { require.Len(t, nodes, nodeCount) clock.Advance(time.Hour * 24) - for expired := nodeCount - expiryLimit; expired > 0; expired -= expiryLimit { + for expired := nodeCount - expiryLimit; expired > expiryLimit; expired -= expiryLimit { // get rid of events that were emitted before clock advanced drainEvents(p.eventsC) // wait for next relative expiry check to run