From acf74ef1bdbd0c6faaccb6448d158643dc740f4b Mon Sep 17 00:00:00 2001 From: rosstimothy <39066650+rosstimothy@users.noreply.github.com> Date: Tue, 22 Oct 2024 13:20:16 +0000 Subject: [PATCH] Add metric to expose cluster role count (#47731) A new `teleport_roles_total` is added to count the number of roles in the cluster. The metric is intentionally updated infrequently to avoid putting any additional strain on the backend. Additionally, the reads performed when calculating the metric are rate limited to prevent any bursts in reads when the metric timer does fire. The auth periodic tickers have also been consolidated into a single multi-interval instead of a ticker per operation. To prevent the sub-intervals from impacting each other all operations are moved to their own goroutine once their ticker fires. --- integration/hsm/hsm_test.go | 5 +- lib/auth/auth.go | 305 ++++++++++++++++++++----------- lib/inventory/controller.go | 15 +- lib/utils/interval/multi.go | 32 ++-- lib/utils/interval/multi_test.go | 15 +- 5 files changed, 244 insertions(+), 128 deletions(-) diff --git a/integration/hsm/hsm_test.go b/integration/hsm/hsm_test.go index 5c7427b06a148..935a8709e5dfb 100644 --- a/integration/hsm/hsm_test.go +++ b/integration/hsm/hsm_test.go @@ -677,7 +677,10 @@ func TestHSMRevert(t *testing.T) { clock.Advance(2 * defaults.HighResPollingPeriod) assert.EventuallyWithT(t, func(t *assert.CollectT) { alerts, err = auth1.process.GetAuthServer().GetClusterAlerts(ctx, types.GetClusterAlertsRequest{}) - require.NoError(t, err) + assert.NoError(t, err) assert.Empty(t, alerts) + + // Keep advancing the clock to make sure the rotation ticker gets fired + clock.Advance(2 * defaults.HighResPollingPeriod) }, 5*time.Second, 100*time.Millisecond) } diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 09cd9127a3a91..540df63049468 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -722,6 +722,14 @@ var ( }, ) + roleCount = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Name: "roles_total", + Help: "Number of roles that exist in the cluster", + }, + ) + registeredAgents = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: teleport.MetricNamespace, @@ -806,6 +814,7 @@ var ( accessRequestsCreatedMetric, registeredAgentsInstallMethod, userCertificatesGeneratedMetric, + roleCount, } ) @@ -1269,87 +1278,115 @@ func (a *Server) periodicSyncUpgradeWindowStartHour() { } } +// periodicIntervalKey is used to uniquely identify the subintervals registered with +// the interval.MultiInterval instance that we use for managing periodics operations. + +type periodicIntervalKey int + +const ( + heartbeatCheckKey periodicIntervalKey = 1 + iota + rotationCheckKey + metricsKey + releaseCheckKey + localReleaseCheckKey + instancePeriodicsKey + dynamicLabelsCheckKey + desktopCheckKey + upgradeWindowCheckKey + roleCountKey +) + // runPeriodicOperations runs some periodic bookkeeping operations // performed by auth server func (a *Server) runPeriodicOperations() { - ctx := context.TODO() + firstReleaseCheck := utils.FullJitter(time.Hour * 6) + + // this environment variable is "unstable" since it will be deprecated + // by an upcoming tctl command. currently exists for testing purposes only. + if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" { + firstReleaseCheck = utils.HalfJitter(time.Second * 10) + } + // run periodic functions with a semi-random period // to avoid contention on the database in case if there are multiple // auth servers running - so they don't compete trying // to update the same resources. r := insecurerand.New(insecurerand.NewSource(a.GetClock().Now().UnixNano())) period := defaults.HighResPollingPeriod + time.Duration(r.Intn(int(defaults.HighResPollingPeriod/time.Second)))*time.Second - log.Debugf("Ticking with period: %v.", period) - a.lock.RLock() - ticker := a.clock.NewTicker(period) - a.lock.RUnlock() - // Create a ticker with jitter - heartbeatCheckTicker := interval.New(interval.Config{ - Duration: apidefaults.ServerKeepAliveTTL() * 2, - Jitter: retryutils.NewSeventhJitter(), - }) - promTicker := interval.New(interval.Config{ - FirstDuration: 5 * time.Second, - Duration: defaults.PrometheusScrapeInterval, - Jitter: retryutils.NewSeventhJitter(), - }) - missedKeepAliveCount := 0 - defer ticker.Stop() - defer heartbeatCheckTicker.Stop() - defer promTicker.Stop() - firstReleaseCheck := utils.FullJitter(time.Hour * 6) + ticker := interval.NewMulti( + a.GetClock(), + interval.SubInterval[periodicIntervalKey]{ + Key: rotationCheckKey, + Duration: period, + }, + interval.SubInterval[periodicIntervalKey]{ + Key: metricsKey, + Duration: defaults.PrometheusScrapeInterval, + FirstDuration: 5 * time.Second, + Jitter: retryutils.NewSeventhJitter(), + }, + interval.SubInterval[periodicIntervalKey]{ + Key: instancePeriodicsKey, + Duration: 9 * time.Minute, + FirstDuration: utils.HalfJitter(time.Minute), + Jitter: retryutils.NewSeventhJitter(), + }, + interval.SubInterval[periodicIntervalKey]{ + Key: roleCountKey, + Duration: 12 * time.Hour, + FirstDuration: utils.FullJitter(time.Minute), + Jitter: retryutils.NewSeventhJitter(), + }, + ) - // this environment variable is "unstable" since it will be deprecated - // by an upcoming tctl command. currently exists for testing purposes only. - if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" { - firstReleaseCheck = utils.HalfJitter(time.Second * 10) - } + defer ticker.Stop() - // note the use of FullJitter for the releases check interval. this lets us ensure - // that frequent restarts don't prevent checks from happening despite the infrequent - // effective check rate. - releaseCheck := interval.New(interval.Config{ - Duration: time.Hour * 24, - FirstDuration: firstReleaseCheck, - Jitter: retryutils.NewFullJitter(), - }) - defer releaseCheck.Stop() - - // more frequent release check that just re-calculates alerts based on previously - // pulled versioning info. - localReleaseCheck := interval.New(interval.Config{ - Duration: time.Minute * 10, - FirstDuration: utils.HalfJitter(time.Second * 10), - Jitter: retryutils.NewHalfJitter(), - }) - defer localReleaseCheck.Stop() + missedKeepAliveCount := 0 - instancePeriodics := interval.New(interval.Config{ - Duration: time.Minute * 9, - FirstDuration: utils.HalfJitter(time.Minute), - Jitter: retryutils.NewSeventhJitter(), - }) - defer instancePeriodics.Stop() + // Prevent some periodic operations from running for dashboard tenants. + if !services.IsDashboard(*modules.GetModules().Features().ToProto()) { + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: dynamicLabelsCheckKey, + Duration: dynamicLabelCheckPeriod, + FirstDuration: utils.HalfJitter(10 * time.Second), + Jitter: retryutils.NewSeventhJitter(), + }) + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: heartbeatCheckKey, + Duration: apidefaults.ServerKeepAliveTTL() * 2, + Jitter: retryutils.NewSeventhJitter(), + }) + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: releaseCheckKey, + Duration: 24 * time.Hour, + FirstDuration: firstReleaseCheck, + // note the use of FullJitter for the releases check interval. this lets us ensure + // that frequent restarts don't prevent checks from happening despite the infrequent + // effective check rate. + Jitter: retryutils.NewFullJitter(), + }) + // more frequent release check that just re-calculates alerts based on previously + // pulled versioning info. + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: localReleaseCheckKey, + Duration: 10 * time.Minute, + FirstDuration: utils.HalfJitter(10 * time.Second), + Jitter: retryutils.NewHalfJitter(), + }) + } - var ossDesktopsCheck <-chan time.Time if modules.GetModules().IsOSSBuild() { - ossDesktopsCheck = interval.New(interval.Config{ + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: desktopCheckKey, Duration: OSSDesktopsCheckPeriod, - FirstDuration: utils.HalfJitter(time.Second * 10), + FirstDuration: utils.HalfJitter(10 * time.Second), Jitter: retryutils.NewHalfJitter(), - }).Next() - } else if err := a.DeleteClusterAlert(ctx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) { + }) + } else if err := a.DeleteClusterAlert(a.closeCtx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) { log.Warnf("Can't delete OSS non-AD desktops limit alert: %v", err) } - dynamicLabelsCheck := interval.New(interval.Config{ - Duration: dynamicLabelCheckPeriod, - FirstDuration: utils.HalfJitter(time.Second * 10), - Jitter: retryutils.NewSeventhJitter(), - }) - defer dynamicLabelsCheck.Stop() - // isolate the schedule of potentially long-running refreshRemoteClusters() from other tasks go func() { // reasonably small interval to ensure that users observe clusters as online within 1 minute of adding them. @@ -1364,7 +1401,7 @@ func (a *Server) runPeriodicOperations() { case <-a.closeCtx.Done(): return case <-remoteClustersRefresh.Next(): - a.refreshRemoteClusters(ctx, r) + a.refreshRemoteClusters(a.closeCtx, r) } } }() @@ -1372,60 +1409,118 @@ func (a *Server) runPeriodicOperations() { // cloud auth servers need to periodically sync the upgrade window // from the cloud db. if modules.GetModules().Features().Cloud { - go a.periodicSyncUpgradeWindowStartHour() - } - - // disable periodics that are not required for cloud dashboard tenants - if services.IsDashboard(*modules.GetModules().Features().ToProto()) { - releaseCheck.Stop() - localReleaseCheck.Stop() - heartbeatCheckTicker.Stop() - dynamicLabelsCheck.Stop() + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: upgradeWindowCheckKey, + Duration: 3 * time.Minute, + FirstDuration: utils.FullJitter(30 * time.Second), + Jitter: retryutils.NewSeventhJitter(), + }) } for { select { case <-a.closeCtx.Done(): return - case <-ticker.Chan(): - err := a.autoRotateCertAuthorities(ctx) - if err != nil { - if trace.IsCompareFailed(err) { - log.Debugf("Cert authority has been updated concurrently: %v.", err) - } else { - log.Errorf("Failed to perform cert rotation check: %v.", err) - } - } - case <-heartbeatCheckTicker.Next(): - nodes, err := a.GetNodes(ctx, apidefaults.Namespace) - if err != nil { - log.Errorf("Failed to load nodes for heartbeat metric calculation: %v", err) - } - for _, node := range nodes { - if services.NodeHasMissedKeepAlives(node) { - missedKeepAliveCount++ - } + case tick := <-ticker.Next(): + switch tick.Key { + case rotationCheckKey: + go func() { + if err := a.autoRotateCertAuthorities(a.closeCtx); err != nil { + if trace.IsCompareFailed(err) { + log.Debugf("Cert authority has been updated concurrently: %v.", err) + } else { + log.Errorf("Failed to perform cert rotation check: %v.", err) + } + } + }() + case heartbeatCheckKey: + go func() { + req := &proto.ListUnifiedResourcesRequest{Kinds: []string{types.KindNode}, SortBy: types.SortBy{Field: types.ResourceKind}} + + for { + _, next, err := a.UnifiedResourceCache.IterateUnifiedResources(a.closeCtx, + func(rwl types.ResourceWithLabels) (bool, error) { + srv, ok := rwl.(types.Server) + if !ok { + return false, nil + } + if services.NodeHasMissedKeepAlives(srv) { + missedKeepAliveCount++ + } + return false, nil + }, + req, + ) + if err != nil { + log.Errorf("Failed to load nodes for heartbeat metric calculation: %v", err) + return + } + + req.StartKey = next + if req.StartKey == "" { + break + } + } + + // Update prometheus gauge + heartbeatsMissedByAuth.Set(float64(missedKeepAliveCount)) + }() + case metricsKey: + go a.updateAgentMetrics() + case releaseCheckKey: + go a.syncReleaseAlerts(a.closeCtx, true) + case localReleaseCheckKey: + go a.syncReleaseAlerts(a.closeCtx, false) + case instancePeriodicsKey: + go a.doInstancePeriodics(a.closeCtx) + case desktopCheckKey: + go a.syncDesktopsLimitAlert(a.closeCtx) + case dynamicLabelsCheckKey: + go a.syncDynamicLabelsAlert(a.closeCtx) + case upgradeWindowCheckKey: + go a.periodicSyncUpgradeWindowStartHour() + case roleCountKey: + go a.tallyRoles(a.closeCtx) } - // Update prometheus gauge - heartbeatsMissedByAuth.Set(float64(missedKeepAliveCount)) - case <-promTicker.Next(): - a.updateAgentMetrics() - case <-releaseCheck.Next(): - a.syncReleaseAlerts(ctx, true) - case <-localReleaseCheck.Next(): - a.syncReleaseAlerts(ctx, false) - case <-instancePeriodics.Next(): - // instance periodics are rate-limited and may be time-consuming in large - // clusters, so launch them in the background. - go a.doInstancePeriodics(ctx) - case <-ossDesktopsCheck: - a.syncDesktopsLimitAlert(ctx) - case <-dynamicLabelsCheck.Next(): - a.syncDynamicLabelsAlert(ctx) } } } +func (a *Server) tallyRoles(ctx context.Context) { + var count = 0 + log.Debug("tallying roles") + defer func() { + log.Debugf("tallying roles completed, role_count=%d", count) + }() + + req := &proto.ListRolesRequest{Limit: 20} + + readLimiter := time.NewTicker(20 * time.Millisecond) + defer readLimiter.Stop() + + for { + resp, err := a.Cache.ListRoles(ctx, req) + if err != nil { + return + } + + count += len(resp.Roles) + req.StartKey = resp.NextKey + + if req.StartKey == "" { + break + } + + select { + case <-readLimiter.C: + case <-ctx.Done(): + return + } + } + + roleCount.Set(float64(count)) +} + func (a *Server) doInstancePeriodics(ctx context.Context) { const slowRate = time.Millisecond * 200 // 5 reads per second const fastRate = time.Millisecond * 5 // 200 reads per second diff --git a/lib/inventory/controller.go b/lib/inventory/controller.go index 61d9da851bc16..ae5258cf97630 100644 --- a/lib/inventory/controller.go +++ b/lib/inventory/controller.go @@ -25,6 +25,7 @@ import ( "time" "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" log "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/client" @@ -262,12 +263,14 @@ func (c *Controller) RegisterControlStream(stream client.UpstreamInventoryContro // as much as possible. this is intended to mitigate load spikes on auth restart, and is reasonably // safe to do since the instance resource is not directly relied upon for use of any particular teleport // service. - ticker := interval.NewMulti(interval.SubInterval[intervalKey]{ - Key: instanceHeartbeatKey, - VariableDuration: c.instanceHBVariableDuration, - FirstDuration: fullJitter(c.instanceHBVariableDuration.Duration()), - Jitter: seventhJitter, - }) + ticker := interval.NewMulti( + clockwork.NewRealClock(), + interval.SubInterval[intervalKey]{ + Key: instanceHeartbeatKey, + VariableDuration: c.instanceHBVariableDuration, + FirstDuration: fullJitter(c.instanceHBVariableDuration.Duration()), + Jitter: seventhJitter, + }) handle := newUpstreamHandle(stream, hello, ticker) c.store.Insert(handle) go c.handleControlStream(handle) diff --git a/lib/utils/interval/multi.go b/lib/utils/interval/multi.go index 9932203ea6076..f3c1fae80d79f 100644 --- a/lib/utils/interval/multi.go +++ b/lib/utils/interval/multi.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/jonboulle/clockwork" + "github.com/gravitational/teleport/api/utils/retryutils" ) @@ -39,6 +41,7 @@ import ( // but it is still a potential source of bugs/confusion when transitioning to using this type from one // of the single-interval alternatives. type MultiInterval[T comparable] struct { + clock clockwork.Clock subs []subIntervalEntry[T] push chan subIntervalEntry[T] ch chan Tick[T] @@ -125,12 +128,17 @@ func (s *subIntervalEntry[T]) increment() { // NewMulti creates a new multi-interval instance. This function panics on non-positive // interval durations (equivalent to time.NewTicker) or if no sub-intervals are provided. -func NewMulti[T comparable](intervals ...SubInterval[T]) *MultiInterval[T] { +func NewMulti[T comparable](clock clockwork.Clock, intervals ...SubInterval[T]) *MultiInterval[T] { if len(intervals) == 0 { panic(errors.New("empty sub-interval set for interval.NewMulti")) } + if clock == nil { + clock = clockwork.NewRealClock() + } + interval := &MultiInterval[T]{ + clock: clock, subs: make([]subIntervalEntry[T], 0, len(intervals)), push: make(chan subIntervalEntry[T]), ch: make(chan Tick[T], 1), @@ -140,7 +148,7 @@ func NewMulti[T comparable](intervals ...SubInterval[T]) *MultiInterval[T] { } // check and initialize our sub-intervals. - now := time.Now() + now := clock.Now() for _, sub := range intervals { if sub.Duration <= 0 && (sub.VariableDuration == nil || sub.VariableDuration.Duration() <= 0) { panic(errors.New("non-positive sub interval for interval.NewMulti")) @@ -156,7 +164,7 @@ func NewMulti[T comparable](intervals ...SubInterval[T]) *MultiInterval[T] { // start the timer in this goroutine to improve // consistency of first tick. - timer := time.NewTimer(d) + timer := clock.NewTimer(d) go interval.run(timer, key) @@ -173,7 +181,7 @@ func (i *MultiInterval[T]) Push(sub SubInterval[T]) { SubInterval: sub, } // we initialize here in order to improve consistency of start time - entry.init(time.Now()) + entry.init(i.clock.Now()) select { case i.push <- entry: case <-i.done: @@ -257,7 +265,7 @@ func (i *MultiInterval[T]) pushEntry(entry subIntervalEntry[T]) { i.subs = append(i.subs, entry) } -func (i *MultiInterval[T]) run(timer *time.Timer, key T) { +func (i *MultiInterval[T]) run(timer clockwork.Timer, key T) { defer timer.Stop() var pending pendingTicks[T] @@ -276,7 +284,7 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { } select { - case t := <-timer.C: + case t := <-timer.Chan(): // increment the sub-interval for the current key i.increment(key) @@ -292,7 +300,7 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { timer.Reset(d) case resetKey := <-i.reset: - now := time.Now() + now := i.clock.Now() // reset the sub-interval for the target key i.resetEntry(now, resetKey) @@ -307,14 +315,14 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { // stop and drain timer if !timer.Stop() { - <-timer.C + <-timer.Chan() } // apply the new duration timer.Reset(d) case fireKey := <-i.fire: - now := time.Now() + now := i.clock.Now() // reset the sub-interval for the key we are firing i.resetEntry(now, fireKey) @@ -329,13 +337,13 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { // stop and drain timer. if !timer.Stop() { - <-timer.C + <-timer.Chan() } // re-set the timer timer.Reset(d) case entry := <-i.push: - now := time.Now() + now := i.clock.Now() // add the new sub-interval entry i.pushEntry(entry) @@ -351,7 +359,7 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { // stop and drain timer if !timer.Stop() { - <-timer.C + <-timer.Chan() } // apply the new duration diff --git a/lib/utils/interval/multi_test.go b/lib/utils/interval/multi_test.go index 37cf3a10b1614..3ef8b1f17ad56 100644 --- a/lib/utils/interval/multi_test.go +++ b/lib/utils/interval/multi_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) @@ -47,10 +48,12 @@ func TestMultiIntervalReset(t *testing.T) { resetTimer := time.NewTimer(duration / 3) defer resetTimer.Stop() - interval := NewMulti[string](SubInterval[string]{ - Key: "key", - Duration: duration, - }) + interval := NewMulti[string]( + clockwork.NewRealClock(), + SubInterval[string]{ + Key: "key", + Duration: duration, + }) defer interval.Stop() start := time.Now() @@ -92,6 +95,7 @@ func TestMultiIntervalReset(t *testing.T) { func TestMultiIntervalBasics(t *testing.T) { t.Parallel() interval := NewMulti[string]( + clockwork.NewRealClock(), SubInterval[string]{ Key: "fast", Duration: time.Millisecond * 8, @@ -151,6 +155,7 @@ func TestMultiIntervalVariableDuration(t *testing.T) { bar.counter.Store(1) interval := NewMulti[string]( + clockwork.NewRealClock(), SubInterval[string]{ Key: "foo", VariableDuration: foo, @@ -216,6 +221,7 @@ func TestMultiIntervalVariableDuration(t *testing.T) { func TestMultiIntervalPush(t *testing.T) { t.Parallel() interval := NewMulti[string]( + clockwork.NewRealClock(), SubInterval[string]{ Key: "foo", Duration: time.Millisecond * 6, @@ -289,6 +295,7 @@ func TestMultiIntervalFireNow(t *testing.T) { // set up one sub-interval that fires frequently, and another that will never // fire during this test unless we trigger with FireNow. interval := NewMulti[string]( + clockwork.NewRealClock(), SubInterval[string]{ Key: "slow", Duration: time.Hour,