Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v16] Add metric to expose cluster role count #47812

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion integration/hsm/hsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,10 @@ func TestHSMRevert(t *testing.T) {
clock.Advance(2 * defaults.HighResPollingPeriod)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
alerts, err = auth1.process.GetAuthServer().GetClusterAlerts(ctx, types.GetClusterAlertsRequest{})
require.NoError(t, err)
assert.NoError(t, err)
assert.Empty(t, alerts)

// Keep advancing the clock to make sure the rotation ticker gets fired
clock.Advance(2 * defaults.HighResPollingPeriod)
}, 5*time.Second, 100*time.Millisecond)
}
305 changes: 200 additions & 105 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,14 @@ var (
},
)

roleCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Name: "roles_total",
Help: "Number of roles that exist in the cluster",
},
)

registeredAgents = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: teleport.MetricNamespace,
Expand Down Expand Up @@ -806,6 +814,7 @@ var (
accessRequestsCreatedMetric,
registeredAgentsInstallMethod,
userCertificatesGeneratedMetric,
roleCount,
}
)

Expand Down Expand Up @@ -1269,87 +1278,115 @@ func (a *Server) periodicSyncUpgradeWindowStartHour() {
}
}

// periodicIntervalKey is used to uniquely identify the subintervals registered with
// the interval.MultiInterval instance that we use for managing periodics operations.

type periodicIntervalKey int

const (
heartbeatCheckKey periodicIntervalKey = 1 + iota
rotationCheckKey
metricsKey
releaseCheckKey
localReleaseCheckKey
instancePeriodicsKey
dynamicLabelsCheckKey
desktopCheckKey
upgradeWindowCheckKey
roleCountKey
)

// runPeriodicOperations runs some periodic bookkeeping operations
// performed by auth server
func (a *Server) runPeriodicOperations() {
ctx := context.TODO()
firstReleaseCheck := utils.FullJitter(time.Hour * 6)

// this environment variable is "unstable" since it will be deprecated
// by an upcoming tctl command. currently exists for testing purposes only.
if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" {
firstReleaseCheck = utils.HalfJitter(time.Second * 10)
}

// run periodic functions with a semi-random period
// to avoid contention on the database in case if there are multiple
// auth servers running - so they don't compete trying
// to update the same resources.
r := insecurerand.New(insecurerand.NewSource(a.GetClock().Now().UnixNano()))
period := defaults.HighResPollingPeriod + time.Duration(r.Intn(int(defaults.HighResPollingPeriod/time.Second)))*time.Second
log.Debugf("Ticking with period: %v.", period)
a.lock.RLock()
ticker := a.clock.NewTicker(period)
a.lock.RUnlock()
// Create a ticker with jitter
heartbeatCheckTicker := interval.New(interval.Config{
Duration: apidefaults.ServerKeepAliveTTL() * 2,
Jitter: retryutils.NewSeventhJitter(),
})
promTicker := interval.New(interval.Config{
FirstDuration: 5 * time.Second,
Duration: defaults.PrometheusScrapeInterval,
Jitter: retryutils.NewSeventhJitter(),
})
missedKeepAliveCount := 0
defer ticker.Stop()
defer heartbeatCheckTicker.Stop()
defer promTicker.Stop()

firstReleaseCheck := utils.FullJitter(time.Hour * 6)
ticker := interval.NewMulti(
a.GetClock(),
interval.SubInterval[periodicIntervalKey]{
Key: rotationCheckKey,
Duration: period,
},
interval.SubInterval[periodicIntervalKey]{
Key: metricsKey,
Duration: defaults.PrometheusScrapeInterval,
FirstDuration: 5 * time.Second,
Jitter: retryutils.NewSeventhJitter(),
},
interval.SubInterval[periodicIntervalKey]{
Key: instancePeriodicsKey,
Duration: 9 * time.Minute,
FirstDuration: utils.HalfJitter(time.Minute),
Jitter: retryutils.NewSeventhJitter(),
},
interval.SubInterval[periodicIntervalKey]{
Key: roleCountKey,
Duration: 12 * time.Hour,
FirstDuration: utils.FullJitter(time.Minute),
Jitter: retryutils.NewSeventhJitter(),
},
)

// this environment variable is "unstable" since it will be deprecated
// by an upcoming tctl command. currently exists for testing purposes only.
if os.Getenv("TELEPORT_UNSTABLE_VC_SYNC_ON_START") == "yes" {
firstReleaseCheck = utils.HalfJitter(time.Second * 10)
}
defer ticker.Stop()

// note the use of FullJitter for the releases check interval. this lets us ensure
// that frequent restarts don't prevent checks from happening despite the infrequent
// effective check rate.
releaseCheck := interval.New(interval.Config{
Duration: time.Hour * 24,
FirstDuration: firstReleaseCheck,
Jitter: retryutils.NewFullJitter(),
})
defer releaseCheck.Stop()

// more frequent release check that just re-calculates alerts based on previously
// pulled versioning info.
localReleaseCheck := interval.New(interval.Config{
Duration: time.Minute * 10,
FirstDuration: utils.HalfJitter(time.Second * 10),
Jitter: retryutils.NewHalfJitter(),
})
defer localReleaseCheck.Stop()
missedKeepAliveCount := 0

instancePeriodics := interval.New(interval.Config{
Duration: time.Minute * 9,
FirstDuration: utils.HalfJitter(time.Minute),
Jitter: retryutils.NewSeventhJitter(),
})
defer instancePeriodics.Stop()
// Prevent some periodic operations from running for dashboard tenants.
if !services.IsDashboard(*modules.GetModules().Features().ToProto()) {
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: dynamicLabelsCheckKey,
Duration: dynamicLabelCheckPeriod,
FirstDuration: utils.HalfJitter(10 * time.Second),
Jitter: retryutils.NewSeventhJitter(),
})
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: heartbeatCheckKey,
Duration: apidefaults.ServerKeepAliveTTL() * 2,
Jitter: retryutils.NewSeventhJitter(),
})
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: releaseCheckKey,
Duration: 24 * time.Hour,
FirstDuration: firstReleaseCheck,
// note the use of FullJitter for the releases check interval. this lets us ensure
// that frequent restarts don't prevent checks from happening despite the infrequent
// effective check rate.
Jitter: retryutils.NewFullJitter(),
})
// more frequent release check that just re-calculates alerts based on previously
// pulled versioning info.
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: localReleaseCheckKey,
Duration: 10 * time.Minute,
FirstDuration: utils.HalfJitter(10 * time.Second),
Jitter: retryutils.NewHalfJitter(),
})
}

var ossDesktopsCheck <-chan time.Time
if modules.GetModules().IsOSSBuild() {
ossDesktopsCheck = interval.New(interval.Config{
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: desktopCheckKey,
Duration: OSSDesktopsCheckPeriod,
FirstDuration: utils.HalfJitter(time.Second * 10),
FirstDuration: utils.HalfJitter(10 * time.Second),
Jitter: retryutils.NewHalfJitter(),
}).Next()
} else if err := a.DeleteClusterAlert(ctx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) {
})
} else if err := a.DeleteClusterAlert(a.closeCtx, OSSDesktopsAlertID); err != nil && !trace.IsNotFound(err) {
log.Warnf("Can't delete OSS non-AD desktops limit alert: %v", err)
}

dynamicLabelsCheck := interval.New(interval.Config{
Duration: dynamicLabelCheckPeriod,
FirstDuration: utils.HalfJitter(time.Second * 10),
Jitter: retryutils.NewSeventhJitter(),
})
defer dynamicLabelsCheck.Stop()

// isolate the schedule of potentially long-running refreshRemoteClusters() from other tasks
go func() {
// reasonably small interval to ensure that users observe clusters as online within 1 minute of adding them.
Expand All @@ -1364,68 +1401,126 @@ func (a *Server) runPeriodicOperations() {
case <-a.closeCtx.Done():
return
case <-remoteClustersRefresh.Next():
a.refreshRemoteClusters(ctx, r)
a.refreshRemoteClusters(a.closeCtx, r)
}
}
}()

// cloud auth servers need to periodically sync the upgrade window
// from the cloud db.
if modules.GetModules().Features().Cloud {
go a.periodicSyncUpgradeWindowStartHour()
}

// disable periodics that are not required for cloud dashboard tenants
if services.IsDashboard(*modules.GetModules().Features().ToProto()) {
releaseCheck.Stop()
localReleaseCheck.Stop()
heartbeatCheckTicker.Stop()
dynamicLabelsCheck.Stop()
ticker.Push(interval.SubInterval[periodicIntervalKey]{
Key: upgradeWindowCheckKey,
Duration: 3 * time.Minute,
FirstDuration: utils.FullJitter(30 * time.Second),
Jitter: retryutils.NewSeventhJitter(),
})
}

for {
select {
case <-a.closeCtx.Done():
return
case <-ticker.Chan():
err := a.autoRotateCertAuthorities(ctx)
if err != nil {
if trace.IsCompareFailed(err) {
log.Debugf("Cert authority has been updated concurrently: %v.", err)
} else {
log.Errorf("Failed to perform cert rotation check: %v.", err)
}
}
case <-heartbeatCheckTicker.Next():
nodes, err := a.GetNodes(ctx, apidefaults.Namespace)
if err != nil {
log.Errorf("Failed to load nodes for heartbeat metric calculation: %v", err)
}
for _, node := range nodes {
if services.NodeHasMissedKeepAlives(node) {
missedKeepAliveCount++
}
case tick := <-ticker.Next():
switch tick.Key {
case rotationCheckKey:
go func() {
if err := a.autoRotateCertAuthorities(a.closeCtx); err != nil {
if trace.IsCompareFailed(err) {
log.Debugf("Cert authority has been updated concurrently: %v.", err)
} else {
log.Errorf("Failed to perform cert rotation check: %v.", err)
}
}
}()
case heartbeatCheckKey:
go func() {
req := &proto.ListUnifiedResourcesRequest{Kinds: []string{types.KindNode}, SortBy: types.SortBy{Field: types.ResourceKind}}

for {
_, next, err := a.UnifiedResourceCache.IterateUnifiedResources(a.closeCtx,
func(rwl types.ResourceWithLabels) (bool, error) {
srv, ok := rwl.(types.Server)
if !ok {
return false, nil
}
if services.NodeHasMissedKeepAlives(srv) {
missedKeepAliveCount++
}
return false, nil
},
req,
)
if err != nil {
log.Errorf("Failed to load nodes for heartbeat metric calculation: %v", err)
return
}

req.StartKey = next
if req.StartKey == "" {
break
}
}

// Update prometheus gauge
heartbeatsMissedByAuth.Set(float64(missedKeepAliveCount))
}()
case metricsKey:
go a.updateAgentMetrics()
case releaseCheckKey:
go a.syncReleaseAlerts(a.closeCtx, true)
case localReleaseCheckKey:
go a.syncReleaseAlerts(a.closeCtx, false)
case instancePeriodicsKey:
go a.doInstancePeriodics(a.closeCtx)
case desktopCheckKey:
go a.syncDesktopsLimitAlert(a.closeCtx)
case dynamicLabelsCheckKey:
go a.syncDynamicLabelsAlert(a.closeCtx)
case upgradeWindowCheckKey:
go a.periodicSyncUpgradeWindowStartHour()
case roleCountKey:
go a.tallyRoles(a.closeCtx)
}
// Update prometheus gauge
heartbeatsMissedByAuth.Set(float64(missedKeepAliveCount))
case <-promTicker.Next():
a.updateAgentMetrics()
case <-releaseCheck.Next():
a.syncReleaseAlerts(ctx, true)
case <-localReleaseCheck.Next():
a.syncReleaseAlerts(ctx, false)
case <-instancePeriodics.Next():
// instance periodics are rate-limited and may be time-consuming in large
// clusters, so launch them in the background.
go a.doInstancePeriodics(ctx)
case <-ossDesktopsCheck:
a.syncDesktopsLimitAlert(ctx)
case <-dynamicLabelsCheck.Next():
a.syncDynamicLabelsAlert(ctx)
}
}
}

func (a *Server) tallyRoles(ctx context.Context) {
var count = 0
log.Debug("tallying roles")
defer func() {
log.Debugf("tallying roles completed, role_count=%d", count)
}()

req := &proto.ListRolesRequest{Limit: 20}

readLimiter := time.NewTicker(20 * time.Millisecond)
defer readLimiter.Stop()

for {
resp, err := a.Cache.ListRoles(ctx, req)
if err != nil {
return
}

count += len(resp.Roles)
req.StartKey = resp.NextKey

if req.StartKey == "" {
break
}

select {
case <-readLimiter.C:
case <-ctx.Done():
return
}
}

roleCount.Set(float64(count))
}

func (a *Server) doInstancePeriodics(ctx context.Context) {
const slowRate = time.Millisecond * 200 // 5 reads per second
const fastRate = time.Millisecond * 5 // 200 reads per second
Expand Down
Loading
Loading