Skip to content

Commit

Permalink
Add metric to expose cluster role count (#47731)
Browse files Browse the repository at this point in the history
A new `teleport_roles_total` is added to count the number of roles
in the cluster. The metric is intentionally updated infrequently
to avoid putting any additional strain on the backend. Additionally,
the reads performed when calculating the metric are rate limited
to prevent any bursts in reads when the metric timer does fire.

The auth periodic tickers have also been consolidated into a single
multi-interval instead of a ticker per operation. To prevent the
sub-intervals from impacting each other all operations are moved
to their own goroutine once their ticker fires.
  • Loading branch information
rosstimothy committed Oct 22, 2024
1 parent 37463f4 commit 28070d1
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 128 deletions.
5 changes: 4 additions & 1 deletion integration/hsm/hsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,10 @@ func TestHSMRevert(t *testing.T) {
clock.Advance(2 * defaults.HighResPollingPeriod)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
alerts, err = auth1.process.GetAuthServer().GetClusterAlerts(ctx, types.GetClusterAlertsRequest{})
require.NoError(t, err)
assert.NoError(t, err)
assert.Empty(t, alerts)

// Keep advancing the clock to make sure the rotation ticker gets fired
clock.Advance(2 * defaults.HighResPollingPeriod)
}, 5*time.Second, 100*time.Millisecond)
}
311 changes: 206 additions & 105 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,14 @@ var (
},
)

roleCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Name: "roles_total",
Help: "Number of roles that exist in the cluster",
},
)

registeredAgents = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Expand Down Expand Up @@ -806,6 +814,7 @@ var (
accessRequestsCreatedMetric,
registeredAgentsInstallMethod,
userCertificatesGeneratedMetric,
roleCount,
}
)

Expand Down Expand Up @@ -1269,87 +1278,121 @@ func (a *Server) periodicSyncUpgradeWindowStartHour() {
}
}

// periodicIntervalKey is used to uniquely identify the subintervals registered with
// the interval.MultiInterval instance that we use for managing periodics operations.

type periodicIntervalKey int

const (
heartbeatCheckKey periodicIntervalKey = 1 + iota
rotationCheckKey
metricsKey
releaseCheckKey
localReleaseCheckKey
instancePeriodicsKey
dynamicLabelsCheckKey
desktopCheckKey
upgradeWindowCheckKey
roleCountKey
)

// runPeriodicOperations runs some periodic bookkeeping operations
// performed by auth server
func (a *Server) runPeriodicOperations() {
ctx := context.TODO()
firstReleaseCheck := utils.FullJitter(time.Hour * 6)

// this environment variable is "unstable" since it will be deprecated
// by an upcoming tctl command. currently exists for testing purposes only.
if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" {
firstReleaseCheck = utils.HalfJitter(time.Second * 10)
}

// run periodic functions with a semi-random period
// to avoid contention on the database in case if there are multiple
// auth servers running - so they don't compete trying
// to update the same resources.
r := insecurerand.New(insecurerand.NewSource(a.GetClock().Now().UnixNano()))
period := defaults.HighResPollingPeriod + time.Duration(r.Intn(int(defaults.HighResPollingPeriod/time.Second)))*time.Second
log.Debugf("Ticking with period: %v.", period)
a.lock.RLock()
ticker := a.clock.NewTicker(period)
a.lock.RUnlock()
// Create a ticker with jitter
heartbeatCheckTicker := interval.New(interval.Config{
Duration: apidefaults.ServerKeepAliveTTL() * 2,
Jitter: retryutils.NewSeventhJitter(),
})
promTicker := interval.New(interval.Config{
FirstDuration: 5 * time.Second,
Duration: defaults.PrometheusScrapeInterval,
Jitter: retryutils.NewSeventhJitter(),
})
missedKeepAliveCount := 0
defer ticker.Stop()
defer heartbeatCheckTicker.Stop()
defer promTicker.Stop()

firstReleaseCheck := utils.FullJitter(time.Hour * 6)
ticker := interval.NewMulti(
a.GetClock(),
interval.SubInterval[periodicIntervalKey]{
Key: rotationCheckKey,
Duration: period,
},
interval.SubInterval[periodicIntervalKey]{
Key: metricsKey,
Duration: defaults.PrometheusScrapeInterval,
FirstDuration: 5 * time.Second,
Jitter: retryutils.NewSeventhJitter(),
},
interval.SubInterval[periodicIntervalKey]{
Key: instancePeriodicsKey,
Duration: 9 * time.Minute,
FirstDuration: utils.HalfJitter(time.Minute),
Jitter: retryutils.NewSeventhJitter(),
},
interval.SubInterval[periodicIntervalKey]{
Key: notificationsCleanupKey,
Duration: 48 * time.Hour,
FirstDuration: utils.FullJitter(time.Hour),
Jitter: retryutils.NewSeventhJitter(),
},
interval.SubInterval[periodicIntervalKey]{
Key: roleCountKey,
Duration: 12 * time.Hour,
FirstDuration: utils.FullJitter(time.Minute),
Jitter: retryutils.NewSeventhJitter(),
},
)

// this environment variable is "unstable" since it will be deprecated
// by an upcoming tctl command. currently exists for testing purposes only.
if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" {
firstReleaseCheck = utils.HalfJitter(time.Second * 10)
}
defer ticker.Stop()

// note the use of FullJitter for the releases check interval. this lets us ensure
// that frequent restarts don't prevent checks from happening despite the infrequent
// effective check rate.
releaseCheck := interval.New(interval.Config{
Duration: time.Hour * 24,
FirstDuration: firstReleaseCheck,
Jitter: retryutils.NewFullJitter(),
})
defer releaseCheck.Stop()

// more frequent release check that just re-calculates alerts based on previously
// pulled versioning info.
localReleaseCheck := interval.New(interval.Config{
Duration: time.Minute * 10,
FirstDuration: utils.HalfJitter(time.Second * 10),
Jitter: retryutils.NewHalfJitter(),
})
defer localReleaseCheck.Stop()
missedKeepAliveCount := 0

instancePeriodics := interval.New(interval.Config{
Duration: time.Minute * 9,
FirstDuration: utils.HalfJitter(time.Minute),
Jitter: retryutils.NewSeventhJitter(),
})
defer instancePeriodics.Stop()
// Prevent some periodic operations from running for dashboard tenants.
if !services.IsDashboard(*modules.GetModules().Features().ToProto()) {
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: dynamicLabelsCheckKey,
Duration: dynamicLabelCheckPeriod,
FirstDuration: utils.HalfJitter(10 * time.Second),
Jitter: retryutils.NewSeventhJitter(),
})
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: heartbeatCheckKey,
Duration: apidefaults.ServerKeepAliveTTL() * 2,
Jitter: retryutils.NewSeventhJitter(),
})
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: releaseCheckKey,
Duration: 24 * time.Hour,
FirstDuration: firstReleaseCheck,
// note the use of FullJitter for the releases check interval. this lets us ensure
// that frequent restarts don't prevent checks from happening despite the infrequent
// effective check rate.
Jitter: retryutils.NewFullJitter(),
})
// more frequent release check that just re-calculates alerts based on previously
// pulled versioning info.
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: localReleaseCheckKey,
Duration: 10 * time.Minute,
FirstDuration: utils.HalfJitter(10 * time.Second),
Jitter: retryutils.NewHalfJitter(),
})
}

var ossDesktopsCheck <-chan time.Time
if modules.GetModules().IsOSSBuild() {
ossDesktopsCheck = interval.New(interval.Config{
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: desktopCheckKey,
Duration: OSSDesktopsCheckPeriod,
FirstDuration: utils.HalfJitter(time.Second * 10),
FirstDuration: utils.HalfJitter(10 * time.Second),
Jitter: retryutils.NewHalfJitter(),
}).Next()
} else if err := a.DeleteClusterAlert(ctx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) {
})
} else if err := a.DeleteClusterAlert(a.closeCtx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) {
log.Warnf("Can't delete OSS non-AD desktops limit alert: %v", err)
}

dynamicLabelsCheck := interval.New(interval.Config{
Duration: dynamicLabelCheckPeriod,
FirstDuration: utils.HalfJitter(time.Second * 10),
Jitter: retryutils.NewSeventhJitter(),
})
defer dynamicLabelsCheck.Stop()

// isolate the schedule of potentially long-running refreshRemoteClusters() from other tasks
go func() {
// reasonably small interval to ensure that users observe clusters as online within 1 minute of adding them.
Expand All @@ -1364,68 +1407,126 @@ func (a *Server) runPeriodicOperations() {
case <-a.closeCtx.Done():
return
case <-remoteClustersRefresh.Next():
a.refreshRemoteClusters(ctx, r)
a.refreshRemoteClusters(a.closeCtx, r)
}
}
}()

// cloud auth servers need to periodically sync the upgrade window
// from the cloud db.
if modules.GetModules().Features().Cloud {
go a.periodicSyncUpgradeWindowStartHour()
}

// disable periodics that are not required for cloud dashboard tenants
if services.IsDashboard(*modules.GetModules().Features().ToProto()) {
releaseCheck.Stop()
localReleaseCheck.Stop()
heartbeatCheckTicker.Stop()
dynamicLabelsCheck.Stop()
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: upgradeWindowCheckKey,
Duration: 3 * time.Minute,
FirstDuration: utils.FullJitter(30 * time.Second),
Jitter: retryutils.NewSeventhJitter(),
})
}

for {
select {
case <-a.closeCtx.Done():
return
case <-ticker.Chan():
err := a.autoRotateCertAuthorities(ctx)
if err != nil {
if trace.IsCompareFailed(err) {
log.Debugf("Cert authority has been updated concurrently: %v.", err)
} else {
log.Errorf("Failed to perform cert rotation check: %v.", err)
}
}
case <-heartbeatCheckTicker.Next():
nodes, err := a.GetNodes(ctx, apidefaults.Namespace)
if err != nil {
log.Errorf("Failed to load nodes for heartbeat metric calculation: %v", err)
}
for _, node := range nodes {
if services.NodeHasMissedKeepAlives(node) {
missedKeepAliveCount++
}
case tick := <-ticker.Next():
switch tick.Key {
case rotationCheckKey:
go func() {
if err := a.autoRotateCertAuthorities(a.closeCtx); err != nil {
if trace.IsCompareFailed(err) {
log.Debugf("Cert authority has been updated concurrently: %v.", err)
} else {
log.Errorf("Failed to perform cert rotation check: %v.", err)
}
}
}()
case heartbeatCheckKey:
go func() {
req := &proto.ListUnifiedResourcesRequest{Kinds: []string{types.KindNode}, SortBy: types.SortBy{Field: types.ResourceKind}}

for {
_, next, err := a.UnifiedResourceCache.IterateUnifiedResources(a.closeCtx,
func(rwl types.ResourceWithLabels) (bool, error) {
srv, ok := rwl.(types.Server)
if !ok {
return false, nil
}
if services.NodeHasMissedKeepAlives(srv) {
missedKeepAliveCount++
}
return false, nil
},
req,
)
if err != nil {
log.Errorf("Failed to load nodes for heartbeat metric calculation: %v", err)
return
}

req.StartKey = next
if req.StartKey == "" {
break
}
}

// Update prometheus gauge
heartbeatsMissedByAuth.Set(float64(missedKeepAliveCount))
}()
case metricsKey:
go a.updateAgentMetrics()
case releaseCheckKey:
go a.syncReleaseAlerts(a.closeCtx, true)
case localReleaseCheckKey:
go a.syncReleaseAlerts(a.closeCtx, false)
case instancePeriodicsKey:
go a.doInstancePeriodics(a.closeCtx)
case desktopCheckKey:
go a.syncDesktopsLimitAlert(a.closeCtx)
case dynamicLabelsCheckKey:
go a.syncDynamicLabelsAlert(a.closeCtx)
case upgradeWindowCheckKey:
go a.periodicSyncUpgradeWindowStartHour()
case roleCountKey:
go a.tallyRoles(a.closeCtx)
}
// Update prometheus gauge
heartbeatsMissedByAuth.Set(float64(missedKeepAliveCount))
case <-promTicker.Next():
a.updateAgentMetrics()
case <-releaseCheck.Next():
a.syncReleaseAlerts(ctx, true)
case <-localReleaseCheck.Next():
a.syncReleaseAlerts(ctx, false)
case <-instancePeriodics.Next():
// instance periodics are rate-limited and may be time-consuming in large
// clusters, so launch them in the background.
go a.doInstancePeriodics(ctx)
case <-ossDesktopsCheck:
a.syncDesktopsLimitAlert(ctx)
case <-dynamicLabelsCheck.Next():
a.syncDynamicLabelsAlert(ctx)
}
}
}

func (a *Server) tallyRoles(ctx context.Context) {
var count = 0
log.Debug("tallying roles")
defer func() {
log.Debugf("tallying roles completed, role_count=%d", count)
}()

req := &proto.ListRolesRequest{Limit: 20}

readLimiter := time.NewTicker(20 * time.Millisecond)
defer readLimiter.Stop()

for {
resp, err := a.Cache.ListRoles(ctx, req)
if err != nil {
return
}

count += len(resp.Roles)
req.StartKey = resp.NextKey

if req.StartKey == "" {
break
}

select {
case <-readLimiter.C:
case <-ctx.Done():
return
}
}

roleCount.Set(float64(count))
}

func (a *Server) doInstancePeriodics(ctx context.Context) {
const slowRate = time.Millisecond * 200 // 5 reads per second
const fastRate = time.Millisecond * 5 // 200 reads per second
Expand Down
Loading

0 comments on commit 28070d1

Please sign in to comment.