diff --git a/integration/hsm/hsm_test.go b/integration/hsm/hsm_test.go index 7468009bf6aa5..a3a6b994e9a71 100644 --- a/integration/hsm/hsm_test.go +++ b/integration/hsm/hsm_test.go @@ -674,7 +674,10 @@ func TestHSMRevert(t *testing.T) { clock.Advance(2 * defaults.HighResPollingPeriod) assert.EventuallyWithT(t, func(t *assert.CollectT) { alerts, err = auth1.process.GetAuthServer().GetClusterAlerts(ctx, types.GetClusterAlertsRequest{}) - require.NoError(t, err) + assert.NoError(t, err) assert.Empty(t, alerts) + + // Keep advancing the clock to make sure the rotation ticker gets fired + clock.Advance(2 * defaults.HighResPollingPeriod) }, 5*time.Second, 100*time.Millisecond) } diff --git a/lib/auth/auth.go b/lib/auth/auth.go index d07263676e9fc..63325136dd92f 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -699,6 +699,14 @@ var ( }, ) + roleCount = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Name: "roles_total", + Help: "Number of roles that exist in the cluster", + }, + ) + registeredAgents = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: teleport.MetricNamespace, @@ -783,6 +791,7 @@ var ( accessRequestsCreatedMetric, registeredAgentsInstallMethod, userCertificatesGeneratedMetric, + roleCount, } ) @@ -1217,87 +1226,115 @@ func (a *Server) periodicSyncUpgradeWindowStartHour() { } } +// periodicIntervalKey is used to uniquely identify the subintervals registered with +// the interval.MultiInterval instance that we use for managing periodics operations. + +type periodicIntervalKey int + +const ( + heartbeatCheckKey periodicIntervalKey = 1 + iota + rotationCheckKey + metricsKey + releaseCheckKey + localReleaseCheckKey + instancePeriodicsKey + dynamicLabelsCheckKey + desktopCheckKey + upgradeWindowCheckKey + roleCountKey +) + // runPeriodicOperations runs some periodic bookkeeping operations // performed by auth server func (a *Server) runPeriodicOperations() { - ctx := context.TODO() + firstReleaseCheck := utils.FullJitter(time.Hour * 6) + + // this environment variable is "unstable" since it will be deprecated + // by an upcoming tctl command. currently exists for testing purposes only. + if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" { + firstReleaseCheck = utils.HalfJitter(time.Second * 10) + } + // run periodic functions with a semi-random period // to avoid contention on the database in case if there are multiple // auth servers running - so they don't compete trying // to update the same resources. r := insecurerand.New(insecurerand.NewSource(a.GetClock().Now().UnixNano())) period := defaults.HighResPollingPeriod + time.Duration(r.Intn(int(defaults.HighResPollingPeriod/time.Second)))*time.Second - log.Debugf("Ticking with period: %v.", period) - a.lock.RLock() - ticker := a.clock.NewTicker(period) - a.lock.RUnlock() - // Create a ticker with jitter - heartbeatCheckTicker := interval.New(interval.Config{ - Duration: apidefaults.ServerKeepAliveTTL() * 2, - Jitter: retryutils.NewSeventhJitter(), - }) - promTicker := interval.New(interval.Config{ - FirstDuration: 5 * time.Second, - Duration: defaults.PrometheusScrapeInterval, - Jitter: retryutils.NewSeventhJitter(), - }) - missedKeepAliveCount := 0 - defer ticker.Stop() - defer heartbeatCheckTicker.Stop() - defer promTicker.Stop() - firstReleaseCheck := utils.FullJitter(time.Hour * 6) + ticker := interval.NewMulti( + a.GetClock(), + interval.SubInterval[periodicIntervalKey]{ + Key: rotationCheckKey, + Duration: period, + }, + interval.SubInterval[periodicIntervalKey]{ + Key: metricsKey, + Duration: defaults.PrometheusScrapeInterval, + FirstDuration: 5 * time.Second, + Jitter: retryutils.NewSeventhJitter(), + }, + interval.SubInterval[periodicIntervalKey]{ + Key: instancePeriodicsKey, + Duration: 9 * time.Minute, + FirstDuration: utils.HalfJitter(time.Minute), + Jitter: retryutils.NewSeventhJitter(), + }, + interval.SubInterval[periodicIntervalKey]{ + Key: roleCountKey, + Duration: 12 * time.Hour, + FirstDuration: utils.FullJitter(time.Minute), + Jitter: retryutils.NewSeventhJitter(), + }, + ) - // this environment variable is "unstable" since it will be deprecated - // by an upcoming tctl command. currently exists for testing purposes only. - if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" { - firstReleaseCheck = utils.HalfJitter(time.Second * 10) - } + defer ticker.Stop() - // note the use of FullJitter for the releases check interval. this lets us ensure - // that frequent restarts don't prevent checks from happening despite the infrequent - // effective check rate. - releaseCheck := interval.New(interval.Config{ - Duration: time.Hour * 24, - FirstDuration: firstReleaseCheck, - Jitter: retryutils.NewFullJitter(), - }) - defer releaseCheck.Stop() - - // more frequent release check that just re-calculates alerts based on previously - // pulled versioning info. - localReleaseCheck := interval.New(interval.Config{ - Duration: time.Minute * 10, - FirstDuration: utils.HalfJitter(time.Second * 10), - Jitter: retryutils.NewHalfJitter(), - }) - defer localReleaseCheck.Stop() + missedKeepAliveCount := 0 - instancePeriodics := interval.New(interval.Config{ - Duration: time.Minute * 9, - FirstDuration: utils.HalfJitter(time.Minute), - Jitter: retryutils.NewSeventhJitter(), - }) - defer instancePeriodics.Stop() + // Prevent some periodic operations from running for dashboard tenants. + if !services.IsDashboard(*modules.GetModules().Features().ToProto()) { + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: dynamicLabelsCheckKey, + Duration: dynamicLabelCheckPeriod, + FirstDuration: utils.HalfJitter(10 * time.Second), + Jitter: retryutils.NewSeventhJitter(), + }) + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: heartbeatCheckKey, + Duration: apidefaults.ServerKeepAliveTTL() * 2, + Jitter: retryutils.NewSeventhJitter(), + }) + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: releaseCheckKey, + Duration: 24 * time.Hour, + FirstDuration: firstReleaseCheck, + // note the use of FullJitter for the releases check interval. this lets us ensure + // that frequent restarts don't prevent checks from happening despite the infrequent + // effective check rate. + Jitter: retryutils.NewFullJitter(), + }) + // more frequent release check that just re-calculates alerts based on previously + // pulled versioning info. + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: localReleaseCheckKey, + Duration: 10 * time.Minute, + FirstDuration: utils.HalfJitter(10 * time.Second), + Jitter: retryutils.NewHalfJitter(), + }) + } - var ossDesktopsCheck <-chan time.Time if modules.GetModules().BuildType() == modules.BuildOSS { - ossDesktopsCheck = interval.New(interval.Config{ + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: desktopCheckKey, Duration: OSSDesktopsCheckPeriod, - FirstDuration: utils.HalfJitter(time.Second * 10), + FirstDuration: utils.HalfJitter(10 * time.Second), Jitter: retryutils.NewHalfJitter(), - }).Next() - } else if err := a.DeleteClusterAlert(ctx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) { + }) + } else if err := a.DeleteClusterAlert(a.closeCtx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) { log.Warnf("Can't delete OSS non-AD desktops limit alert: %v", err) } - dynamicLabelsCheck := interval.New(interval.Config{ - Duration: dynamicLabelCheckPeriod, - FirstDuration: utils.HalfJitter(time.Second * 10), - Jitter: retryutils.NewSeventhJitter(), - }) - defer dynamicLabelsCheck.Stop() - // isolate the schedule of potentially long-running refreshRemoteClusters() from other tasks go func() { // reasonably small interval to ensure that users observe clusters as online within 1 minute of adding them. @@ -1312,7 +1349,7 @@ func (a *Server) runPeriodicOperations() { case <-a.closeCtx.Done(): return case <-remoteClustersRefresh.Next(): - a.refreshRemoteClusters(ctx, r) + a.refreshRemoteClusters(a.closeCtx, r) } } }() @@ -1320,60 +1357,118 @@ func (a *Server) runPeriodicOperations() { // cloud auth servers need to periodically sync the upgrade window // from the cloud db. if modules.GetModules().Features().Cloud { - go a.periodicSyncUpgradeWindowStartHour() - } - - // disable periodics that are not required for cloud dashboard tenants - if services.IsDashboard(*modules.GetModules().Features().ToProto()) { - releaseCheck.Stop() - localReleaseCheck.Stop() - heartbeatCheckTicker.Stop() - dynamicLabelsCheck.Stop() + ticker.Push(interval.SubInterval[periodicIntervalKey]{ + Key: upgradeWindowCheckKey, + Duration: 3 * time.Minute, + FirstDuration: utils.FullJitter(30 * time.Second), + Jitter: retryutils.NewSeventhJitter(), + }) } for { select { case <-a.closeCtx.Done(): return - case <-ticker.Chan(): - err := a.autoRotateCertAuthorities(ctx) - if err != nil { - if trace.IsCompareFailed(err) { - log.Debugf("Cert authority has been updated concurrently: %v.", err) - } else { - log.Errorf("Failed to perform cert rotation check: %v.", err) - } - } - case <-heartbeatCheckTicker.Next(): - nodes, err := a.GetNodes(ctx, apidefaults.Namespace) - if err != nil { - log.Errorf("Failed to load nodes for heartbeat metric calculation: %v", err) + case tick := <-ticker.Next(): + switch tick.Key { + case rotationCheckKey: + go func() { + if err := a.autoRotateCertAuthorities(a.closeCtx); err != nil { + if trace.IsCompareFailed(err) { + log.Debugf("Cert authority has been updated concurrently: %v.", err) + } else { + log.Errorf("Failed to perform cert rotation check: %v.", err) + } + } + }() + case heartbeatCheckKey: + go func() { + req := &proto.ListUnifiedResourcesRequest{Kinds: []string{types.KindNode}, SortBy: types.SortBy{Field: types.ResourceKind}} + + for { + _, next, err := a.UnifiedResourceCache.IterateUnifiedResources(a.closeCtx, + func(rwl types.ResourceWithLabels) (bool, error) { + srv, ok := rwl.(types.Server) + if !ok { + return false, nil + } + if services.NodeHasMissedKeepAlives(srv) { + missedKeepAliveCount++ + } + return false, nil + }, + req, + ) + if err != nil { + log.Errorf("Failed to load nodes for heartbeat metric calculation: %v", err) + return + } + + req.StartKey = next + if req.StartKey == "" { + break + } + } + + // Update prometheus gauge + heartbeatsMissedByAuth.Set(float64(missedKeepAliveCount)) + }() + case metricsKey: + go a.updateAgentMetrics() + case releaseCheckKey: + go a.syncReleaseAlerts(a.closeCtx, true) + case localReleaseCheckKey: + go a.syncReleaseAlerts(a.closeCtx, false) + case instancePeriodicsKey: + go a.doInstancePeriodics(a.closeCtx) + case desktopCheckKey: + go a.syncDesktopsLimitAlert(a.closeCtx) + case dynamicLabelsCheckKey: + go a.syncDynamicLabelsAlert(a.closeCtx) + case upgradeWindowCheckKey: + go a.periodicSyncUpgradeWindowStartHour() + case roleCountKey: + go a.tallyRoles(a.closeCtx) } - for _, node := range nodes { - if services.NodeHasMissedKeepAlives(node) { - missedKeepAliveCount++ - } - } - // Update prometheus gauge - heartbeatsMissedByAuth.Set(float64(missedKeepAliveCount)) - case <-promTicker.Next(): - a.updateAgentMetrics() - case <-releaseCheck.Next(): - a.syncReleaseAlerts(ctx, true) - case <-localReleaseCheck.Next(): - a.syncReleaseAlerts(ctx, false) - case <-instancePeriodics.Next(): - // instance periodics are rate-limited and may be time-consuming in large - // clusters, so launch them in the background. - go a.doInstancePeriodics(ctx) - case <-ossDesktopsCheck: - a.syncDesktopsLimitAlert(ctx) - case <-dynamicLabelsCheck.Next(): - a.syncDynamicLabelsAlert(ctx) } } } +func (a *Server) tallyRoles(ctx context.Context) { + var count = 0 + log.Debug("tallying roles") + defer func() { + log.Debugf("tallying roles completed, role_count=%d", count) + }() + + req := &proto.ListRolesRequest{Limit: 20} + + readLimiter := time.NewTicker(20 * time.Millisecond) + defer readLimiter.Stop() + + for { + resp, err := a.Cache.ListRoles(ctx, req) + if err != nil { + return + } + + count += len(resp.Roles) + req.StartKey = resp.NextKey + + if req.StartKey == "" { + break + } + + select { + case <-readLimiter.C: + case <-ctx.Done(): + return + } + } + + roleCount.Set(float64(count)) +} + func (a *Server) doInstancePeriodics(ctx context.Context) { const slowRate = time.Millisecond * 200 // 5 reads per second const fastRate = time.Millisecond * 5 // 200 reads per second diff --git a/lib/inventory/controller.go b/lib/inventory/controller.go index 61d9da851bc16..ae5258cf97630 100644 --- a/lib/inventory/controller.go +++ b/lib/inventory/controller.go @@ -25,6 +25,7 @@ import ( "time" "github.com/gravitational/trace" + "github.com/jonboulle/clockwork" log "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/client" @@ -262,12 +263,14 @@ func (c *Controller) RegisterControlStream(stream client.UpstreamInventoryContro // as much as possible. this is intended to mitigate load spikes on auth restart, and is reasonably // safe to do since the instance resource is not directly relied upon for use of any particular teleport // service. - ticker := interval.NewMulti(interval.SubInterval[intervalKey]{ - Key: instanceHeartbeatKey, - VariableDuration: c.instanceHBVariableDuration, - FirstDuration: fullJitter(c.instanceHBVariableDuration.Duration()), - Jitter: seventhJitter, - }) + ticker := interval.NewMulti( + clockwork.NewRealClock(), + interval.SubInterval[intervalKey]{ + Key: instanceHeartbeatKey, + VariableDuration: c.instanceHBVariableDuration, + FirstDuration: fullJitter(c.instanceHBVariableDuration.Duration()), + Jitter: seventhJitter, + }) handle := newUpstreamHandle(stream, hello, ticker) c.store.Insert(handle) go c.handleControlStream(handle) diff --git a/lib/utils/interval/multi.go b/lib/utils/interval/multi.go index 9932203ea6076..f3c1fae80d79f 100644 --- a/lib/utils/interval/multi.go +++ b/lib/utils/interval/multi.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/jonboulle/clockwork" + "github.com/gravitational/teleport/api/utils/retryutils" ) @@ -39,6 +41,7 @@ import ( // but it is still a potential source of bugs/confusion when transitioning to using this type from one // of the single-interval alternatives. type MultiInterval[T comparable] struct { + clock clockwork.Clock subs []subIntervalEntry[T] push chan subIntervalEntry[T] ch chan Tick[T] @@ -125,12 +128,17 @@ func (s *subIntervalEntry[T]) increment() { // NewMulti creates a new multi-interval instance. This function panics on non-positive // interval durations (equivalent to time.NewTicker) or if no sub-intervals are provided. -func NewMulti[T comparable](intervals ...SubInterval[T]) *MultiInterval[T] { +func NewMulti[T comparable](clock clockwork.Clock, intervals ...SubInterval[T]) *MultiInterval[T] { if len(intervals) == 0 { panic(errors.New("empty sub-interval set for interval.NewMulti")) } + if clock == nil { + clock = clockwork.NewRealClock() + } + interval := &MultiInterval[T]{ + clock: clock, subs: make([]subIntervalEntry[T], 0, len(intervals)), push: make(chan subIntervalEntry[T]), ch: make(chan Tick[T], 1), @@ -140,7 +148,7 @@ func NewMulti[T comparable](intervals ...SubInterval[T]) *MultiInterval[T] { } // check and initialize our sub-intervals. - now := time.Now() + now := clock.Now() for _, sub := range intervals { if sub.Duration <= 0 && (sub.VariableDuration == nil || sub.VariableDuration.Duration() <= 0) { panic(errors.New("non-positive sub interval for interval.NewMulti")) @@ -156,7 +164,7 @@ func NewMulti[T comparable](intervals ...SubInterval[T]) *MultiInterval[T] { // start the timer in this goroutine to improve // consistency of first tick. - timer := time.NewTimer(d) + timer := clock.NewTimer(d) go interval.run(timer, key) @@ -173,7 +181,7 @@ func (i *MultiInterval[T]) Push(sub SubInterval[T]) { SubInterval: sub, } // we initialize here in order to improve consistency of start time - entry.init(time.Now()) + entry.init(i.clock.Now()) select { case i.push <- entry: case <-i.done: @@ -257,7 +265,7 @@ func (i *MultiInterval[T]) pushEntry(entry subIntervalEntry[T]) { i.subs = append(i.subs, entry) } -func (i *MultiInterval[T]) run(timer *time.Timer, key T) { +func (i *MultiInterval[T]) run(timer clockwork.Timer, key T) { defer timer.Stop() var pending pendingTicks[T] @@ -276,7 +284,7 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { } select { - case t := <-timer.C: + case t := <-timer.Chan(): // increment the sub-interval for the current key i.increment(key) @@ -292,7 +300,7 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { timer.Reset(d) case resetKey := <-i.reset: - now := time.Now() + now := i.clock.Now() // reset the sub-interval for the target key i.resetEntry(now, resetKey) @@ -307,14 +315,14 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { // stop and drain timer if !timer.Stop() { - <-timer.C + <-timer.Chan() } // apply the new duration timer.Reset(d) case fireKey := <-i.fire: - now := time.Now() + now := i.clock.Now() // reset the sub-interval for the key we are firing i.resetEntry(now, fireKey) @@ -329,13 +337,13 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { // stop and drain timer. if !timer.Stop() { - <-timer.C + <-timer.Chan() } // re-set the timer timer.Reset(d) case entry := <-i.push: - now := time.Now() + now := i.clock.Now() // add the new sub-interval entry i.pushEntry(entry) @@ -351,7 +359,7 @@ func (i *MultiInterval[T]) run(timer *time.Timer, key T) { // stop and drain timer if !timer.Stop() { - <-timer.C + <-timer.Chan() } // apply the new duration diff --git a/lib/utils/interval/multi_test.go b/lib/utils/interval/multi_test.go index 37cf3a10b1614..3ef8b1f17ad56 100644 --- a/lib/utils/interval/multi_test.go +++ b/lib/utils/interval/multi_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" ) @@ -47,10 +48,12 @@ func TestMultiIntervalReset(t *testing.T) { resetTimer := time.NewTimer(duration / 3) defer resetTimer.Stop() - interval := NewMulti[string](SubInterval[string]{ - Key: "key", - Duration: duration, - }) + interval := NewMulti[string]( + clockwork.NewRealClock(), + SubInterval[string]{ + Key: "key", + Duration: duration, + }) defer interval.Stop() start := time.Now() @@ -92,6 +95,7 @@ func TestMultiIntervalReset(t *testing.T) { func TestMultiIntervalBasics(t *testing.T) { t.Parallel() interval := NewMulti[string]( + clockwork.NewRealClock(), SubInterval[string]{ Key: "fast", Duration: time.Millisecond * 8, @@ -151,6 +155,7 @@ func TestMultiIntervalVariableDuration(t *testing.T) { bar.counter.Store(1) interval := NewMulti[string]( + clockwork.NewRealClock(), SubInterval[string]{ Key: "foo", VariableDuration: foo, @@ -216,6 +221,7 @@ func TestMultiIntervalVariableDuration(t *testing.T) { func TestMultiIntervalPush(t *testing.T) { t.Parallel() interval := NewMulti[string]( + clockwork.NewRealClock(), SubInterval[string]{ Key: "foo", Duration: time.Millisecond * 6, @@ -289,6 +295,7 @@ func TestMultiIntervalFireNow(t *testing.T) { // set up one sub-interval that fires frequently, and another that will never // fire during this test unless we trigger with FireNow. interval := NewMulti[string]( + clockwork.NewRealClock(), SubInterval[string]{ Key: "slow", Duration: time.Hour,