From 2b7e177b2f851d6fd575c82de8cac4f38ce45545 Mon Sep 17 00:00:00 2001 From: Edoardo Spadolini Date: Thu, 19 Dec 2024 16:54:19 +0100 Subject: [PATCH] [v17] Variable rate heartbeats (#50430) * Convert lib/inventory to use slog (#50315) * Variable rate heartbeats (#49562) * Allow resetting the inventory delay * Upsert-only SSH heartbeats * Variable rate SSH heartbeats * Variable rate heartbeats for non-SSH resources * Add a helper method to create the HB delays --------- Co-authored-by: rosstimothy <39066650+rosstimothy@users.noreply.github.com> --- lib/inventory/controller.go | 427 +++++++++++++++-------- lib/inventory/controller_test.go | 31 +- lib/inventory/internal/delay/delay.go | 20 +- lib/inventory/inventory.go | 14 +- lib/inventory/metadata/metadata_other.go | 9 +- 5 files changed, 346 insertions(+), 155 deletions(-) diff --git a/lib/inventory/controller.go b/lib/inventory/controller.go index 4cae69f1b404e..218781f8b035c 100644 --- a/lib/inventory/controller.go +++ b/lib/inventory/controller.go @@ -20,12 +20,12 @@ package inventory import ( "context" + "log/slog" "os" "strings" "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/api/client/proto" @@ -34,9 +34,11 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/inventory/internal/delay" + "github.com/gravitational/teleport/lib/services" usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport" "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" + logutils "github.com/gravitational/teleport/lib/utils/log" ) // Auth is an interface representing the subset of the auth API that must be made available @@ -113,11 +115,13 @@ const ( keepAliveKubeTick = "keep-alive-kube-tick" ) -// instanceHBStepSize is the step size used for the variable instance hearbteat duration. This value is -// basically arbitrary. It was selected because it produces a scaling curve that makes a fairly reasonable -// tradeoff between heartbeat availability and load scaling. See test coverage in the 'interval' package -// for a demonstration of the relationship between step sizes and interval/duration scaling. -const instanceHBStepSize = 1024 +// heartbeatStepSize is the step size used for the variable heartbeat intervals. +// This value is basically arbitrary. It was selected because it produces a +// scaling curve that makes a fairly reasonable tradeoff between heartbeat +// availability and load scaling. See test coverage in the 'interval' package +// for a demonstration of the relationship between step sizes and +// interval/duration scaling. +const heartbeatStepSize = 1024 type controllerOptions struct { serverKeepAlive time.Duration @@ -216,6 +220,10 @@ type Controller struct { instanceTTL time.Duration instanceHBEnabled bool instanceHBVariableDuration *interval.VariableDuration + sshHBVariableDuration *interval.VariableDuration + appHBVariableDuration *interval.VariableDuration + dbHBVariableDuration *interval.VariableDuration + kubeHBVariableDuration *interval.VariableDuration maxKeepAliveErrs int usageReporter usagereporter.UsageReporter testEvents chan testEvent @@ -236,18 +244,55 @@ func NewController(auth Auth, usageReporter usagereporter.UsageReporter, opts .. instanceHBVariableDuration := interval.NewVariableDuration(interval.VariableDurationConfig{ MinDuration: options.instanceHBInterval, MaxDuration: apidefaults.MaxInstanceHeartbeatInterval, - Step: instanceHBStepSize, + Step: heartbeatStepSize, }) + var ( + sshHBVariableDuration *interval.VariableDuration + appHBVariableDuration *interval.VariableDuration + dbHBVariableDuration *interval.VariableDuration + kubeHBVariableDuration *interval.VariableDuration + ) + serverTTL := apidefaults.ServerAnnounceTTL + if !variableRateHeartbeatsDisabledEnv() { + // by default, heartbeats will scale from 1.5 to 6 minutes, and will + // have a TTL of 15 minutes + serverTTL = apidefaults.ServerAnnounceTTL * 3 / 2 + sshHBVariableDuration = interval.NewVariableDuration(interval.VariableDurationConfig{ + MinDuration: options.serverKeepAlive, + MaxDuration: options.serverKeepAlive * 4, + Step: heartbeatStepSize, + }) + appHBVariableDuration = interval.NewVariableDuration(interval.VariableDurationConfig{ + MinDuration: options.serverKeepAlive, + MaxDuration: options.serverKeepAlive * 4, + Step: heartbeatStepSize, + }) + dbHBVariableDuration = interval.NewVariableDuration(interval.VariableDurationConfig{ + MinDuration: options.serverKeepAlive, + MaxDuration: options.serverKeepAlive * 4, + Step: heartbeatStepSize, + }) + kubeHBVariableDuration = interval.NewVariableDuration(interval.VariableDurationConfig{ + MinDuration: options.serverKeepAlive, + MaxDuration: options.serverKeepAlive * 4, + Step: heartbeatStepSize, + }) + } + ctx, cancel := context.WithCancel(context.Background()) return &Controller{ store: NewStore(), serviceCounter: &serviceCounter{}, serverKeepAlive: options.serverKeepAlive, - serverTTL: apidefaults.ServerAnnounceTTL, + serverTTL: serverTTL, instanceTTL: apidefaults.InstanceHeartbeatTTL, instanceHBEnabled: !instanceHeartbeatsDisabledEnv(), instanceHBVariableDuration: instanceHBVariableDuration, + sshHBVariableDuration: sshHBVariableDuration, + appHBVariableDuration: appHBVariableDuration, + dbHBVariableDuration: dbHBVariableDuration, + kubeHBVariableDuration: kubeHBVariableDuration, maxKeepAliveErrs: options.maxKeepAliveErrs, auth: auth, authID: options.authID, @@ -348,27 +393,36 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { defer func() { if handle.goodbye.GetDeleteResources() { - log.WithFields(log.Fields{ - "apps": len(handle.appServers), - "dbs": len(handle.databaseServers), - "kube": len(handle.kubernetesServers), - "server_id": handle.Hello().ServerID, - }).Debug("Cleaning up resources in response to instance termination") + slog.DebugContext(c.closeContext, "Cleaning up resources in response to instance termination", + "apps", len(handle.appServers), + "dbs", len(handle.databaseServers), + "kube", len(handle.kubernetesServers), + "server_id", handle.Hello().ServerID, + ) for _, app := range handle.appServers { if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) { - log.Warnf("Failed to remove app server %q on termination: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to remove app server on termination", + "app_server", handle.Hello().ServerID, + "error", err, + ) } } for _, db := range handle.databaseServers { if err := c.auth.DeleteDatabaseServer(c.closeContext, apidefaults.Namespace, db.resource.GetHostID(), db.resource.GetName()); err != nil && !trace.IsNotFound(err) { - log.Warnf("Failed to remove db server %q on termination: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to remove db server on termination", + "db_server", handle.Hello().ServerID, + "error", err, + ) } } for _, kube := range handle.kubernetesServers { if err := c.auth.DeleteKubernetesServer(c.closeContext, kube.resource.GetHostID(), kube.resource.GetName()); err != nil && !trace.IsNotFound(err) { - log.Warnf("Failed to remove kube server %q on termination: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to remove kube server on termination", + "kube_server", handle.Hello().ServerID, + "error", err, + ) } } } @@ -382,23 +436,36 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { if handle.sshServer != nil { c.onDisconnectFunc(constants.KeepAliveNode, 1) + if c.sshHBVariableDuration != nil { + c.sshHBVariableDuration.Dec() + } + handle.sshServer = nil } if len(handle.appServers) > 0 { c.onDisconnectFunc(constants.KeepAliveApp, len(handle.appServers)) + if c.appHBVariableDuration != nil { + c.appHBVariableDuration.Add(-len(handle.appServers)) + } + clear(handle.appServers) } if len(handle.databaseServers) > 0 { c.onDisconnectFunc(constants.KeepAliveDatabase, len(handle.databaseServers)) + if c.dbHBVariableDuration != nil { + c.dbHBVariableDuration.Add(-len(handle.databaseServers)) + } + clear(handle.databaseServers) } if len(handle.kubernetesServers) > 0 { c.onDisconnectFunc(constants.KeepAliveKube, len(handle.kubernetesServers)) + if c.kubeHBVariableDuration != nil { + c.kubeHBVariableDuration.Add(-len(handle.kubernetesServers)) + } + clear(handle.kubernetesServers) } - clear(handle.appServers) - clear(handle.databaseServers) - clear(handle.kubernetesServers) c.testEvent(handlerClose) }() @@ -407,52 +474,75 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) { case msg := <-handle.Recv(): switch m := msg.(type) { case proto.UpstreamInventoryHello: - log.Warnf("Unexpected upstream hello on control stream of server %q.", handle.Hello().ServerID) + slog.WarnContext(c.closeContext, "Unexpected upstream hello on control stream of server", "server_id", handle.Hello().ServerID) handle.CloseWithError(trace.BadParameter("unexpected upstream hello")) return case proto.UpstreamInventoryAgentMetadata: c.handleAgentMetadata(handle, m) case proto.InventoryHeartbeat: - if err := c.handleHeartbeatMsg(handle, m); err != nil { - handle.CloseWithError(err) - return + // XXX: when adding new services to the heartbeat logic, make + // sure to also update the 'icsServiceToMetricName' mapping in + // auth/grpcserver.go in order to ensure that metrics start + // counting the control stream as a registered keepalive stream + // for that service. + + if m.SSHServer != nil { + // we initialize sshKeepAliveDelay before calling + // handleSSHServerHB unlike the other heartbeat types + // because handleSSHServerHB needs the delay to reset it + // after an announce, including the first one + if sshKeepAliveDelay == nil { + sshKeepAliveDelay = c.createKeepAliveDelay(c.sshHBVariableDuration) + } + + if err := c.handleSSHServerHB(handle, m.SSHServer, sshKeepAliveDelay); err != nil { + handle.CloseWithError(trace.Wrap(err)) + return + } } - // we initialize delays lazily here, depending on the protocol - if sshKeepAliveDelay == nil && m.SSHServer != nil { - sshKeepAliveDelay = delay.New(delay.Params{ - FirstInterval: retryutils.HalfJitter(c.serverKeepAlive), - FixedInterval: c.serverKeepAlive, - Jitter: retryutils.SeventhJitter, - }) - } - if appKeepAliveDelay == nil && m.AppServer != nil { - appKeepAliveDelay = delay.New(delay.Params{ - FirstInterval: retryutils.HalfJitter(c.serverKeepAlive), - FixedInterval: c.serverKeepAlive, - Jitter: retryutils.SeventhJitter, - }) + if m.AppServer != nil { + if err := c.handleAppServerHB(handle, m.AppServer); err != nil { + handle.CloseWithError(err) + return + } + + if appKeepAliveDelay == nil { + appKeepAliveDelay = c.createKeepAliveDelay(c.appHBVariableDuration) + } } - if dbKeepAliveDelay == nil && m.DatabaseServer != nil { - dbKeepAliveDelay = delay.New(delay.Params{ - FirstInterval: retryutils.HalfJitter(c.serverKeepAlive), - FixedInterval: c.serverKeepAlive, - Jitter: retryutils.SeventhJitter, - }) + + if m.DatabaseServer != nil { + if err := c.handleDatabaseServerHB(handle, m.DatabaseServer); err != nil { + handle.CloseWithError(err) + return + } + + if dbKeepAliveDelay == nil { + dbKeepAliveDelay = c.createKeepAliveDelay(c.dbHBVariableDuration) + } } - if kubeKeepAliveDelay == nil && m.KubernetesServer != nil { - kubeKeepAliveDelay = delay.New(delay.Params{ - FirstInterval: retryutils.HalfJitter(c.serverKeepAlive), - FixedInterval: c.serverKeepAlive, - Jitter: retryutils.SeventhJitter, - }) + + if m.KubernetesServer != nil { + if err := c.handleKubernetesServerHB(handle, m.KubernetesServer); err != nil { + handle.CloseWithError(err) + return + } + + if kubeKeepAliveDelay == nil { + kubeKeepAliveDelay = c.createKeepAliveDelay(c.kubeHBVariableDuration) + } } + case proto.UpstreamInventoryPong: c.handlePong(handle, m) case proto.UpstreamInventoryGoodbye: handle.goodbye = m default: - log.Warnf("Unexpected upstream message type %T on control stream of server %q.", m, handle.Hello().ServerID) + slog.WarnContext(c.closeContext, "Unexpected upstream message type on control stream", + "message_type", logutils.TypeAttr(m), + "server_id", handle.Hello().ServerID, + ) handle.CloseWithError(trace.BadParameter("unexpected upstream message type %T", m)) return } @@ -521,6 +611,12 @@ func instanceHeartbeatsDisabledEnv() bool { return os.Getenv("TELEPORT_UNSTABLE_DISABLE_INSTANCE_HB") == "yes" } +// variableRateHeartbeatsDisabledEnv checks if variable rate heartbeats have +// been explicitly disabled via environment variable. +func variableRateHeartbeatsDisabledEnv() bool { + return os.Getenv("TELEPORT_UNSTABLE_DISABLE_VARIABLE_RATE_HEARTBEATS") == "yes" +} + func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Time) error { if !c.instanceHBEnabled { return nil @@ -543,7 +639,10 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim instance, err := tracker.nextHeartbeat(now, handle.Hello(), c.authID) if err != nil { - log.Warnf("Failed to construct next heartbeat value for instance %q: %v (this is a bug)", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to construct next heartbeat value for instance (this is a bug)", + "server_id", handle.Hello().ServerID, + "error", err, + ) return trace.Wrap(err) } @@ -556,7 +655,10 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim }) if err != nil { - log.Warnf("Failed to hb instance %q: %v", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to hb instance", + "server_id", handle.Hello().ServerID, + "error", err, + ) c.testEvent(instanceHeartbeatErr) if !tracker.retryHeartbeat { // suppress failure and retry exactly once @@ -578,7 +680,9 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim func (c *Controller) handlePong(handle *upstreamHandle, msg proto.UpstreamInventoryPong) { pending, ok := handle.pings[msg.ID] if !ok { - log.Warnf("Unexpected upstream pong from server %q (id=%d).", handle.Hello().ServerID, msg.ID) + slog.WarnContext(c.closeContext, "Unexpected upstream pong", + "server_id", handle.Hello().ServerID, + "pong_id", msg.ID) return } pending.rspC <- pingResponse{ @@ -605,39 +709,7 @@ func (c *Controller) handlePingRequest(handle *upstreamHandle, req pingRequest) return nil } -func (c *Controller) handleHeartbeatMsg(handle *upstreamHandle, hb proto.InventoryHeartbeat) error { - // XXX: when adding new services to the heartbeat logic, make sure to also update the - // 'icsServiceToMetricName' mapping in auth/grpcserver.go in order to ensure that metrics - // start counting the control stream as a registered keepalive stream for that service. - - if hb.SSHServer != nil { - if err := c.handleSSHServerHB(handle, hb.SSHServer); err != nil { - return trace.Wrap(err) - } - } - - if hb.AppServer != nil { - if err := c.handleAppServerHB(handle, hb.AppServer); err != nil { - return trace.Wrap(err) - } - } - - if hb.DatabaseServer != nil { - if err := c.handleDatabaseServerHB(handle, hb.DatabaseServer); err != nil { - return trace.Wrap(err) - } - } - - if hb.KubernetesServer != nil { - if err := c.handleKubernetesServerHB(handle, hb.KubernetesServer); err != nil { - return trace.Wrap(err) - } - } - - return nil -} - -func (c *Controller) handleSSHServerHB(handle *upstreamHandle, sshServer *types.ServerV2) error { +func (c *Controller) handleSSHServerHB(handle *upstreamHandle, sshServer *types.ServerV2, sshDelay *delay.Delay) error { // the auth layer verifies that a stream's hello message matches the identity and capabilities of the // client cert. after that point it is our responsibility to ensure that heartbeated information is // consistent with the identity and capabilities claimed in the initial hello. @@ -654,28 +726,48 @@ func (c *Controller) handleSSHServerHB(handle *upstreamHandle, sshServer *types. sshServer.SetAddr(utils.ReplaceLocalhost(sshServer.GetAddr(), handle.PeerAddr())) } + sshServer.SetExpiry(time.Now().Add(c.serverTTL).UTC()) + if handle.sshServer == nil { c.onConnectFunc(constants.KeepAliveNode) - handle.sshServer = &heartBeatInfo[*types.ServerV2]{} + if c.sshHBVariableDuration != nil { + c.sshHBVariableDuration.Inc() + } + handle.sshServer = &heartBeatInfo[*types.ServerV2]{ + resource: sshServer, + } + } else if handle.sshServer.keepAliveErrs == 0 && services.CompareServers(handle.sshServer.resource, sshServer) < services.Different { + // if we have successfully upserted this exact server the last time + // (except for the expiry), we don't need to upsert it again right now + return nil + } else { + handle.sshServer.resource = sshServer } - now := time.Now() - - sshServer.SetExpiry(now.Add(c.serverTTL).UTC()) - - lease, err := c.auth.UpsertNode(c.closeContext, sshServer) - if err == nil { + if _, err := c.auth.UpsertNode(c.closeContext, handle.sshServer.resource); err == nil { c.testEvent(sshUpsertOk) - // store the new lease and reset retry state - handle.sshServer.lease = lease + // reset the error status + handle.sshServer.keepAliveErrs = 0 handle.sshServer.retryUpsert = false + + sshDelay.Reset() } else { c.testEvent(sshUpsertErr) - log.Warnf("Failed to upsert ssh server %q on heartbeat: %v.", handle.Hello().ServerID, err) - - // blank old lease if any and set retry state. next time handleKeepAlive is called - // we will attempt to upsert the server again. - handle.sshServer.lease = nil + slog.WarnContext(c.closeContext, "Failed to announce SSH server", + "server_id", handle.Hello().ServerID, + "error", err, + ) + + // we use keepAliveErrs as a general upsert error count for SSH, + // retryUpsert as a flag to signify that we MUST succeed the very next + // upsert: if we're here it means that we have a new resource to upsert + // and we have failed to do so once, so if we fail again we are going to + // fall too far behind and we should let the instance go and connect to + // a healthier auth server + handle.sshServer.keepAliveErrs++ + if handle.sshServer.retryUpsert || handle.sshServer.keepAliveErrs > c.maxKeepAliveErrs { + return trace.Wrap(err, "failed to announce SSH server") + } handle.sshServer.retryUpsert = true } handle.sshServer.resource = sshServer @@ -701,6 +793,9 @@ func (c *Controller) handleAppServerHB(handle *upstreamHandle, appServer *types. if _, ok := handle.appServers[appKey]; !ok { c.onConnectFunc(constants.KeepAliveApp) + if c.appHBVariableDuration != nil { + c.appHBVariableDuration.Inc() + } handle.appServers[appKey] = &heartBeatInfo[*types.AppServerV3]{} } @@ -718,7 +813,10 @@ func (c *Controller) handleAppServerHB(handle *upstreamHandle, appServer *types. srv.resource = appServer } else { c.testEvent(appUpsertErr) - log.Warnf("Failed to upsert app server %q on heartbeat: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to upsert app server on heartbeat", + "server_id", handle.Hello().ServerID, + "error", err, + ) // blank old lease if any and set retry state. next time handleKeepAlive is called // we will attempt to upsert the server again. @@ -749,6 +847,9 @@ func (c *Controller) handleDatabaseServerHB(handle *upstreamHandle, databaseServ if _, ok := handle.databaseServers[dbKey]; !ok { c.onConnectFunc(constants.KeepAliveDatabase) + if c.dbHBVariableDuration != nil { + c.dbHBVariableDuration.Inc() + } handle.databaseServers[dbKey] = &heartBeatInfo[*types.DatabaseServerV3]{} } @@ -766,7 +867,10 @@ func (c *Controller) handleDatabaseServerHB(handle *upstreamHandle, databaseServ srv.resource = databaseServer } else { c.testEvent(dbUpsertErr) - log.Warnf("Failed to upsert database server %q on heartbeat: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to upsert database server on heartbeat", + "server_id", handle.Hello().ServerID, + "error", err, + ) // blank old lease if any and set retry state. next time handleKeepAlive is called // we will attempt to upsert the server again. @@ -797,6 +901,9 @@ func (c *Controller) handleKubernetesServerHB(handle *upstreamHandle, kubernetes if _, ok := handle.kubernetesServers[kubeKey]; !ok { c.onConnectFunc(constants.KeepAliveKube) + if c.kubeHBVariableDuration != nil { + c.kubeHBVariableDuration.Inc() + } handle.kubernetesServers[kubeKey] = &heartBeatInfo[*types.KubernetesServerV3]{} } @@ -814,7 +921,10 @@ func (c *Controller) handleKubernetesServerHB(handle *upstreamHandle, kubernetes srv.resource = kubernetesServer } else { c.testEvent(kubeUpsertErr) - log.Warnf("Failed to upsert kubernetes server %q on heartbeat: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to upsert kubernetes server on heartbeat", + "server_id", handle.Hello().ServerID, + "error", err, + ) // blank old lease if any and set retry state. next time handleKeepAlive is called // we will attempt to upsert the server again. @@ -861,11 +971,19 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e srv.keepAliveErrs++ handle.appServers[name] = srv shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs - log.Warnf("Failed to keep alive app server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove) + slog.WarnContext(c.closeContext, "Failed to keep alive app server", + "server_id", handle.Hello().ServerID, + "error", err, + "error_count", srv.keepAliveErrs, + "should_remove", shouldRemove, + ) if shouldRemove { c.testEvent(appKeepAliveDel) c.onDisconnectFunc(constants.KeepAliveApp, 1) + if c.appHBVariableDuration != nil { + c.appHBVariableDuration.Dec() + } delete(handle.appServers, name) } } else { @@ -877,7 +995,10 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e lease, err := c.auth.UpsertApplicationServer(c.closeContext, srv.resource) if err != nil { c.testEvent(appUpsertRetryErr) - log.Warnf("Failed to upsert app server %q on retry: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to upsert app server on retry", + "server_id", handle.Hello().ServerID, + "error", err, + ) // since this is retry-specific logic, an error here means that upsert failed twice in // a row. Missing upserts is more problematic than missing keepalives so we don't bother // attempting a third time. @@ -904,11 +1025,19 @@ func (c *Controller) keepAliveDatabaseServer(handle *upstreamHandle, now time.Ti srv.keepAliveErrs++ handle.databaseServers[name] = srv shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs - log.Warnf("Failed to keep alive database server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove) + slog.WarnContext(c.closeContext, "Failed to keep alive database server", + "server_id", handle.Hello().ServerID, + "error", err, + "error_count", srv.keepAliveErrs, + "should_remove", shouldRemove, + ) if shouldRemove { c.testEvent(dbKeepAliveDel) c.onDisconnectFunc(constants.KeepAliveDatabase, 1) + if c.dbHBVariableDuration != nil { + c.dbHBVariableDuration.Dec() + } delete(handle.databaseServers, name) } } else { @@ -920,7 +1049,10 @@ func (c *Controller) keepAliveDatabaseServer(handle *upstreamHandle, now time.Ti lease, err := c.auth.UpsertDatabaseServer(c.closeContext, srv.resource) if err != nil { c.testEvent(dbUpsertRetryErr) - log.Warnf("Failed to upsert database server %q on retry: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to upsert database server on retry", + "server_id", handle.Hello().ServerID, + "error", err, + ) // since this is retry-specific logic, an error here means that upsert failed twice in // a row. Missing upserts is more problematic than missing keepalives so we don't bother // attempting a third time. @@ -947,11 +1079,19 @@ func (c *Controller) keepAliveKubernetesServer(handle *upstreamHandle, now time. srv.keepAliveErrs++ handle.kubernetesServers[name] = srv shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs - log.Warnf("Failed to keep alive kubernetes server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove) + slog.WarnContext(c.closeContext, "Failed to keep alive kubernetes server", + "server_id", handle.Hello().ServerID, + "error", err, + "error_count", srv.keepAliveErrs, + "should_remove", shouldRemove, + ) if shouldRemove { c.testEvent(kubeKeepAliveDel) c.onDisconnectFunc(constants.KeepAliveKube, 1) + if c.kubeHBVariableDuration != nil { + c.kubeHBVariableDuration.Dec() + } delete(handle.kubernetesServers, name) } } else { @@ -963,7 +1103,10 @@ func (c *Controller) keepAliveKubernetesServer(handle *upstreamHandle, now time. lease, err := c.auth.UpsertKubernetesServer(c.closeContext, srv.resource) if err != nil { c.testEvent(kubeUpsertRetryErr) - log.Warnf("Failed to upsert kubernetes server %q on retry: %v.", handle.Hello().ServerID, err) + slog.WarnContext(c.closeContext, "Failed to upsert kubernetes server on retry.", + "server_id", handle.Hello().ServerID, + "error", err, + ) // since this is retry-specific logic, an error here means that upsert failed twice in // a row. Missing upserts is more problematic than missing keepalives so we don'resource bother // attempting a third time. @@ -984,42 +1127,54 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e return nil } - if handle.sshServer.lease != nil { - lease := *handle.sshServer.lease - lease.Expires = now.Add(c.serverTTL).UTC() - if err := c.auth.KeepAliveServer(c.closeContext, lease); err != nil { - c.testEvent(sshKeepAliveErr) - handle.sshServer.keepAliveErrs++ - shouldClose := handle.sshServer.keepAliveErrs > c.maxKeepAliveErrs - - log.Warnf("Failed to keep alive ssh server %q: %v (count=%d, closing=%v).", handle.Hello().ServerID, err, handle.sshServer.keepAliveErrs, shouldClose) - - if shouldClose { - return trace.Errorf("failed to keep alive ssh server: %v", err) - } + if _, err := c.auth.UpsertNode(c.closeContext, handle.sshServer.resource); err == nil { + if handle.sshServer.retryUpsert { + c.testEvent(sshUpsertRetryOk) } else { - handle.sshServer.keepAliveErrs = 0 c.testEvent(sshKeepAliveOk) } - } else if handle.sshServer.retryUpsert { - handle.sshServer.resource.SetExpiry(time.Now().Add(c.serverTTL).UTC()) - lease, err := c.auth.UpsertNode(c.closeContext, handle.sshServer.resource) - if err != nil { + handle.sshServer.keepAliveErrs = 0 + handle.sshServer.retryUpsert = false + } else { + if handle.sshServer.retryUpsert { c.testEvent(sshUpsertRetryErr) - log.Warnf("Failed to upsert ssh server %q on retry: %v.", handle.Hello().ServerID, err) - // since this is retry-specific logic, an error here means that upsert failed twice in - // a row. Missing upserts is more problematic than missing keepalives so we don'resource bother - // attempting a third time. - return trace.Errorf("failed to upsert ssh server on retry: %v", err) + slog.WarnContext(c.closeContext, "Failed to upsert SSH server on retry", + "server_id", handle.Hello().ServerID, + "error", err, + ) + // retryUpsert is set when we get a new resource and we fail to + // upsert it; if we're here it means that we have failed to upsert + // it _again_, so we have fallen quite far behind + return trace.Wrap(err, "failed to upsert SSH server on retry") + } + + c.testEvent(sshKeepAliveErr) + handle.sshServer.keepAliveErrs++ + closing := handle.sshServer.keepAliveErrs > c.maxKeepAliveErrs + slog.WarnContext(c.closeContext, "Failed to upsert SSH server on keepalive", + "server_id", handle.Hello().ServerID, + "error", err, + "count", handle.sshServer.keepAliveErrs, + "closing", closing, + ) + + if closing { + return trace.Wrap(err, "failed to keep alive SSH server") } - c.testEvent(sshUpsertRetryOk) - handle.sshServer.lease = lease - handle.sshServer.retryUpsert = false } return nil } +func (c *Controller) createKeepAliveDelay(variableDuration *interval.VariableDuration) *delay.Delay { + return delay.New(delay.Params{ + FirstInterval: retryutils.HalfJitter(c.serverKeepAlive), + FixedInterval: c.serverKeepAlive, + VariableInterval: variableDuration, + Jitter: retryutils.SeventhJitter, + }) +} + // Close terminates all control streams registered with this controller. Control streams // registered after Close() is called are closed immediately. func (c *Controller) Close() error { diff --git a/lib/inventory/controller_test.go b/lib/inventory/controller_test.go index f6d188e02f2c2..f3b6f68d8cf58 100644 --- a/lib/inventory/controller_test.go +++ b/lib/inventory/controller_test.go @@ -225,14 +225,13 @@ func TestSSHServerBasics(t *testing.T) { // set up to induce some failures, but not enough to cause the control // stream to be closed. auth.mu.Lock() - auth.failUpserts = 1 - auth.failKeepAlives = 2 + auth.failUpserts = 2 auth.mu.Unlock() // keepalive should fail twice, but since the upsert is already known // to have succeeded, we should not see an upsert failure yet. awaitEvents(t, events, - expect(sshKeepAliveErr, sshKeepAliveErr), + expect(sshKeepAliveErr, sshKeepAliveErr, sshKeepAliveOk), deny(sshUpsertErr, handlerClose), ) @@ -248,6 +247,32 @@ func TestSSHServerBasics(t *testing.T) { }) require.NoError(t, err) + // this explicit upsert will not happen since the server is the same, but + // keepalives should work + awaitEvents(t, events, + expect(sshKeepAliveOk), + deny(sshKeepAliveErr, sshUpsertErr, sshUpsertRetryOk, handlerClose), + ) + + err = downstream.Send(ctx, proto.InventoryHeartbeat{ + SSHServer: &types.ServerV2{ + Metadata: types.Metadata{ + Name: serverID, + Labels: map[string]string{ + "changed": "changed", + }, + }, + Spec: types.ServerSpecV2{ + Addr: zeroAddr, + }, + }, + }) + require.NoError(t, err) + + auth.mu.Lock() + auth.failUpserts = 1 + auth.mu.Unlock() + // we should now see an upsert failure, but no additional // keepalive failures, and the upsert should succeed on retry. awaitEvents(t, events, diff --git a/lib/inventory/internal/delay/delay.go b/lib/inventory/internal/delay/delay.go index 7a5ac8a06d74a..bb94478daf875 100644 --- a/lib/inventory/internal/delay/delay.go +++ b/lib/inventory/internal/delay/delay.go @@ -74,7 +74,7 @@ type Delay struct { } // Elapsed returns the channel on which the ticks are delivered. This method can -// be called on a nil delay, resulting in a nil channel. The [*Delay.Advance] +// be called on a nil delay, resulting in a nil channel. The [Delay.Advance] // method must be called after receiving a tick from the channel. // // select { @@ -102,7 +102,7 @@ func (i *Delay) interval() time.Duration { } // Advance sets up the next tick of the delay. Must be called after receiving -// from the [*Delay.Elapsed] channel; specifically, to maintain compatibility +// from the [Delay.Elapsed] channel; specifically, to maintain compatibility // with [clockwork.Clock], it must only be called with a drained timer channel. // For consistency, the value passed to Advance should be the value received // from the Elapsed channel (passing the current time will also work, but will @@ -111,8 +111,20 @@ func (i *Delay) Advance(now time.Time) { i.timer.Reset(i.interval() - i.clock.Since(now)) } -// Stop stops the delay. Only needed for [clockwork.Clock] compatibility. Can be -// called on a nil delay, as a no-op. The delay should not be used afterwards. +// Reset restarts the ticker from the current time. Must only be called while +// the timer is running (i.e. it must not be called between receiving from +// [Delay.Elapsed] and calling [Delay.Advance]). +func (i *Delay) Reset() { + // the drain is for Go earlier than 1.23 and for [clockwork.Clock] + if !i.timer.Stop() { + <-i.timer.Chan() + } + i.timer.Reset(i.interval()) +} + +// Stop stops the delay. Only needed for Go 1.22 and [clockwork.Clock] +// compatibility. Can be called on a nil delay, as a no-op. The delay should not +// be used afterwards. func (i *Delay) Stop() { if i == nil { return diff --git a/lib/inventory/inventory.go b/lib/inventory/inventory.go index 2e4f38e1e39d0..2cae348402472 100644 --- a/lib/inventory/inventory.go +++ b/lib/inventory/inventory.go @@ -22,12 +22,12 @@ import ( "context" "errors" "io" + "log/slog" "sync" "sync/atomic" "time" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/api/client/proto" @@ -149,7 +149,7 @@ func (h *downstreamHandle) autoEmitMetadata() { md, err := h.metadataGetter(h.CloseContext()) if err != nil { if !errors.Is(err, context.Canceled) { - log.Warnf("Failed to get agent metadata: %v", err) + slog.WarnContext(h.CloseContext(), "Failed to get agent metadata", "error", err) } return } @@ -174,7 +174,7 @@ func (h *downstreamHandle) autoEmitMetadata() { // Send metadata. if err := sender.Send(h.CloseContext(), msg); err != nil && !errors.Is(err, context.Canceled) { - log.Warnf("Failed to send agent metadata: %v", err) + slog.WarnContext(h.CloseContext(), "Failed to send agent metadata", "error", err) } // Block for the duration of the stream. @@ -195,7 +195,7 @@ func (h *downstreamHandle) run(fn DownstreamCreateFunc, hello proto.UpstreamInve return } - log.Debugf("Re-attempt control stream acquisition in ~%s.", retry.Duration()) + slog.DebugContext(h.closeContext, "Re-attempt control stream acquisition", "backoff", retry.Duration()) select { case <-retry.After(): retry.Inc() @@ -209,14 +209,14 @@ func (h *downstreamHandle) tryRun(fn DownstreamCreateFunc, hello proto.UpstreamI stream, err := fn(h.CloseContext()) if err != nil { if !h.closing() { - log.Warnf("Failed to create inventory control stream: %v.", err) + slog.WarnContext(h.CloseContext(), "Failed to create inventory control stream", "error", err) } return } if err := h.handleStream(stream, hello); err != nil { if !h.closing() { - log.Warnf("Inventory control stream failed: %v", err) + slog.WarnContext(h.CloseContext(), "Inventory control stream failed", "error", err) } return } @@ -284,7 +284,7 @@ func (h *downstreamHandle) handlePing(sender DownstreamSender, msg proto.Downstr h.mu.Lock() defer h.mu.Unlock() if len(h.pingHandlers) == 0 { - log.Warnf("Got ping with no handlers registered (id=%d).", msg.ID) + slog.WarnContext(h.closeContext, "Got ping with no handlers registered", "ping_id", msg.ID) return } for _, handler := range h.pingHandlers { diff --git a/lib/inventory/metadata/metadata_other.go b/lib/inventory/metadata/metadata_other.go index eb60f3082876c..6451f3b04cd03 100644 --- a/lib/inventory/metadata/metadata_other.go +++ b/lib/inventory/metadata/metadata_other.go @@ -22,19 +22,18 @@ package metadata import ( - "runtime" - - log "github.com/sirupsen/logrus" + "context" + "log/slog" ) // fetchOSVersion returns "" if not on linux and not on darwin. func (c *fetchConfig) fetchOSVersion() string { - log.Warningf("fetchOSVersion is not implemented for %s", runtime.GOOS) + slog.WarnContext(context.Background(), "fetchOSVersion is not implemented") return "" } // fetchGlibcVersion returns "" if not on linux and not on darwin. func (c *fetchConfig) fetchGlibcVersion() string { - log.Warningf("fetchGlibcVersion is not implemented for %s", runtime.GOOS) + slog.WarnContext(context.Background(), "fetchGlibcVersion is not implemented") return "" }