Skip to content

Commit

Permalink
reduce flake during backend interruptions & slow auth cache init (#44601
Browse files Browse the repository at this point in the history
) (#44696)
  • Loading branch information
fspmarshall authored Jul 26, 2024
1 parent 075a133 commit 5a62eec
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 11 deletions.
28 changes: 21 additions & 7 deletions lib/backend/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
)

type bufferConfig struct {
gracePeriod time.Duration
capacity int
clock clockwork.Clock
gracePeriod time.Duration
creationGracePeriod time.Duration
capacity int
clock clockwork.Clock
}

type BufferOption func(*bufferConfig)
Expand All @@ -59,6 +60,16 @@ func BacklogGracePeriod(d time.Duration) BufferOption {
}
}

// CreationGracePeriod sets the amount of time delay after watcher creation before
// it will be considered for removal due to backlog.
func CreationGracePeriod(d time.Duration) BufferOption {
return func(cfg *bufferConfig) {
if d > 0 {
cfg.creationGracePeriod = d
}
}
}

// BufferClock sets a custom clock for the buffer (used in tests).
func BufferClock(c clockwork.Clock) BufferOption {
return func(cfg *bufferConfig) {
Expand All @@ -81,9 +92,10 @@ type CircularBuffer struct {
// NewCircularBuffer returns a new uninitialized instance of circular buffer.
func NewCircularBuffer(opts ...BufferOption) *CircularBuffer {
cfg := bufferConfig{
gracePeriod: DefaultBacklogGracePeriod,
capacity: DefaultBufferCapacity,
clock: clockwork.NewRealClock(),
gracePeriod: DefaultBacklogGracePeriod,
creationGracePeriod: DefaultCreationGracePeriod,
capacity: DefaultBufferCapacity,
clock: clockwork.NewRealClock(),
}
for _, opt := range opts {
opt(&cfg)
Expand Down Expand Up @@ -255,6 +267,7 @@ func (c *CircularBuffer) NewWatcher(ctx context.Context, watch Watch) (Watcher,
buffer: c,
Watch: watch,
eventsC: make(chan Event, watch.QueueSize),
created: c.cfg.clock.Now(),
ctx: closeCtx,
cancel: cancel,
capacity: watch.QueueSize,
Expand Down Expand Up @@ -294,6 +307,7 @@ type BufferWatcher struct {
bmu sync.Mutex
backlog []Event
backlogSince time.Time
created time.Time

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -349,7 +363,7 @@ func (w *BufferWatcher) emit(e Event) (ok bool) {
defer w.bmu.Unlock()

if !w.flushBacklog() {
if w.buffer.cfg.clock.Now().After(w.backlogSince.Add(w.buffer.cfg.gracePeriod)) {
if now := w.buffer.cfg.clock.Now(); now.After(w.backlogSince.Add(w.buffer.cfg.gracePeriod)) && now.After(w.created.Add(w.buffer.cfg.creationGracePeriod)) {
// backlog has existed for longer than grace period,
// this watcher needs to be removed.
return false
Expand Down
66 changes: 66 additions & 0 deletions lib/backend/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestWatcherCapacity(t *testing.T) {
BufferCapacity(1),
BufferClock(clock),
BacklogGracePeriod(gracePeriod),
CreationGracePeriod(time.Nanosecond),
)
defer b.Close()
b.SetInit()
Expand Down Expand Up @@ -142,6 +143,71 @@ func TestWatcherCapacity(t *testing.T) {
}
}

func TestWatcherCreationGracePeriod(t *testing.T) {
const backlogGracePeriod = time.Second
const creationGracePeriod = backlogGracePeriod * 3
const queueSize = 1
clock := clockwork.NewFakeClock()

ctx := context.Background()
b := NewCircularBuffer(
BufferCapacity(1),
BufferClock(clock),
BacklogGracePeriod(backlogGracePeriod),
CreationGracePeriod(creationGracePeriod),
)
defer b.Close()
b.SetInit()

w, err := b.NewWatcher(ctx, Watch{
QueueSize: queueSize,
})
require.NoError(t, err)
defer w.Close()

select {
case e := <-w.Events():
require.Equal(t, types.OpInit, e.Type)
default:
t.Fatalf("Expected immediate OpInit.")
}

// emit enough events to create a backlog
for i := 0; i < queueSize*2; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}}})
}

select {
case <-w.Done():
t.Fatal("watcher closed unexpectedly")
default:
}

// sanity-check
require.Greater(t, creationGracePeriod, backlogGracePeriod*2)

// advance well past the backlog grace period, but not past the creation grace period
clock.Advance(backlogGracePeriod * 2)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})

select {
case <-w.Done():
t.Fatal("watcher closed unexpectedly")
default:
}

// advance well past creation grace period
clock.Advance(creationGracePeriod)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})
select {
case <-w.Done():
default:
t.Fatal("watcher did not close after creation grace period exceeded")
}
}

// TestWatcherClose makes sure that closed watcher
// will be removed
func TestWatcherClose(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions lib/backend/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ const (
// (e.g. heartbeats) are be created. If a watcher can't catch up in under a minute,
// it probably won't catch up.
DefaultBacklogGracePeriod = time.Second * 59
// DefaultCreationGracePeriod is the default amount of time time that the circular buffer
// will wait before enforcing the backlog grace period. This is intended to give downstream
// caches time to initialize before they start receiving events. Without this, large caches
// may be unable to successfully initialize even if they would otherwise be able to keep up
// with the event stream once established.
DefaultCreationGracePeriod = DefaultBacklogGracePeriod * 3
// DefaultPollStreamPeriod is a default event poll stream period
DefaultPollStreamPeriod = time.Second
// DefaultEventsTTL is a default events TTL period
Expand Down
19 changes: 17 additions & 2 deletions lib/backend/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -82,6 +83,8 @@ type Reporter struct {
// This will keep an upper limit on our memory usage while still always
// reporting the most active keys.
topRequestsCache *lru.Cache[topRequestsCacheKey, struct{}]

slowRangeLogLimiter *rate.Limiter
}

// NewReporter returns a new Reporter.
Expand All @@ -103,8 +106,9 @@ func NewReporter(cfg ReporterConfig) (*Reporter, error) {
return nil, trace.Wrap(err)
}
r := &Reporter{
ReporterConfig: cfg,
topRequestsCache: cache,
ReporterConfig: cfg,
topRequestsCache: cache,
slowRangeLogLimiter: rate.NewLimiter(rate.Every(time.Minute), 12),
}
return r, nil
}
Expand Down Expand Up @@ -134,6 +138,17 @@ func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte,
batchReadRequestsFailed.WithLabelValues(s.Component).Inc()
}
s.trackRequest(types.OpGet, startKey, endKey)
end := s.Clock().Now()
if d := end.Sub(start); d > time.Second*3 {
if s.slowRangeLogLimiter.AllowN(end, 1) {
log.WithFields(log.Fields{
"start_key": string(startKey),
"end_key": string(endKey),
"limit": limit,
"duration": d.String(),
}).Warn("slow GetRange request")
}
}
return res, err
}

Expand Down
23 changes: 22 additions & 1 deletion lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,8 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
return trace.ConnectionProblem(nil, "timeout waiting for watcher init")
}

fetchAndApplyStart := time.Now()

confirmedKindsMap := make(map[resourceKind]types.WatchKind, len(confirmedKinds))
for _, kind := range confirmedKinds {
confirmedKindsMap[resourceKind{kind: kind.Kind, subkind: kind.SubKind}] = kind
Expand Down Expand Up @@ -1310,6 +1312,19 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer

c.notify(c.ctx, Event{Type: WatcherStarted})

fetchAndApplyDuration := time.Since(fetchAndApplyStart)
if fetchAndApplyDuration > time.Second*20 {
c.Logger.WithFields(log.Fields{
"cache_target": c.Config.target,
"duration": fetchAndApplyDuration.String(),
}).Warn("slow fetch and apply")
} else {
c.Logger.WithFields(log.Fields{
"cache_target": c.Config.target,
"duration": fetchAndApplyDuration.String(),
}).Debug("fetch and apply")
}

var lastStalenessWarning time.Time
var staleEventCount int
for {
Expand Down Expand Up @@ -1394,7 +1409,13 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
// cannot run concurrently with event processing.
func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
// TODO(fspmarshall): Start using dynamic value once it is implemented.
gracePeriod := apidefaults.ServerAnnounceTTL

// because event streams are not necessarily ordered across keys expiring on the
// server announce TTL may sometimes generate false positives. Using the watcher
// creation grace period as our safety buffer is mostly an arbitrary choice, but
// since it approximates our expected worst-case staleness of the event stream its
// a fairly reasonable one.
gracePeriod := apidefaults.ServerAnnounceTTL + backend.DefaultCreationGracePeriod

// latestExp will be the value that we choose to consider the most recent "expired"
// timestamp. This will either end up being the most recently seen node expiry, or
Expand Down
2 changes: 1 addition & 1 deletion lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2618,7 +2618,7 @@ func TestRelativeExpiryLimit(t *testing.T) {
require.Len(t, nodes, nodeCount)

clock.Advance(time.Hour * 24)
for expired := nodeCount - expiryLimit; expired > 0; expired -= expiryLimit {
for expired := nodeCount - expiryLimit; expired > expiryLimit; expired -= expiryLimit {
// get rid of events that were emitted before clock advanced
drainEvents(p.eventsC)
// wait for next relative expiry check to run
Expand Down

0 comments on commit 5a62eec

Please sign in to comment.