diff --git a/integrations/lib/testing/integration/authservice.go b/integrations/lib/testing/integration/authservice.go index 7f233e31f8c95..79415f05ecb3d 100644 --- a/integrations/lib/testing/integration/authservice.go +++ b/integrations/lib/testing/integration/authservice.go @@ -36,7 +36,7 @@ import ( "github.com/gravitational/teleport/integrations/lib/logger" ) -var regexpAuthStarting = regexp.MustCompile(`Auth service [^ ]+ is starting on [^ ]+:(\d+)`) +var regexpAuthStarting = regexp.MustCompile(`Auth service.*listen_address:[^ ]+:(\d+)`) type AuthService struct { mu sync.Mutex diff --git a/integrations/lib/testing/integration/proxyservice.go b/integrations/lib/testing/integration/proxyservice.go index 9c2d5701ef5b2..80a05c5140080 100644 --- a/integrations/lib/testing/integration/proxyservice.go +++ b/integrations/lib/testing/integration/proxyservice.go @@ -37,9 +37,9 @@ import ( "github.com/gravitational/teleport/integrations/lib/logger" ) -var regexpWebProxyStarting = regexp.MustCompile(`Web proxy service [^ ]+ is starting on [^ ]+:(\d+)`) -var regexpSSHProxyStarting = regexp.MustCompile(`SSH proxy service [^ ]+ is starting on [^ ]+:(\d+)`) -var regexpReverseTunnelStarting = regexp.MustCompile(`Reverse tunnel service [^ ]+ is starting on [^ ]+:(\d+)`) +var regexpWebProxyStarting = regexp.MustCompile(`Starting web proxy service.*listen_address:[^ ]+:(\d+)`) +var regexpSSHProxyStarting = regexp.MustCompile(`Starting SSH proxy service.*listen_address:[^ ]+:(\d+)`) +var regexpReverseTunnelStarting = regexp.MustCompile(`Starting reverse tunnel server.*listen_address:[^ ]+:(\d+)`) type ProxyService struct { mu sync.Mutex diff --git a/integrations/lib/testing/integration/sshservice.go b/integrations/lib/testing/integration/sshservice.go index 14ca201d1f875..241ccb4884e9b 100644 --- a/integrations/lib/testing/integration/sshservice.go +++ b/integrations/lib/testing/integration/sshservice.go @@ -36,7 +36,7 @@ import ( "github.com/gravitational/teleport/integrations/lib/logger" ) -var regexpSSHStarting = regexp.MustCompile(`Service [^ ]+ is starting on [^ ]+:(\d+)`) +var regexpSSHStarting = regexp.MustCompile(`SSH Service is starting.*listen_address:[^ ]+:(\d+)`) type SSHService struct { mu sync.Mutex diff --git a/lib/service/awsoidc.go b/lib/service/awsoidc.go index 846488b91a9f0..13958e44f5b0e 100644 --- a/lib/service/awsoidc.go +++ b/lib/service/awsoidc.go @@ -94,7 +94,7 @@ func (process *TeleportProcess) initAWSOIDCDeployServiceUpdater() error { return trace.Wrap(err) } - process.log.Infof("The new service has started successfully. Checking for deploy service updates every %v.", updateAWSOIDCDeployServiceInterval) + process.logger.InfoContext(process.ExitContext(), "The new service has started successfully.", "update_interval", updateAWSOIDCDeployServiceInterval) return trace.Wrap(updater.Run(process.GracefulExitContext())) } diff --git a/lib/service/connect.go b/lib/service/connect.go index 632061c911b0a..8840948961bd2 100644 --- a/lib/service/connect.go +++ b/lib/service/connect.go @@ -23,6 +23,7 @@ import ( "crypto/tls" "errors" "fmt" + "log/slog" "path/filepath" "strings" "time" @@ -31,7 +32,6 @@ import ( "github.com/gravitational/roundtrip" "github.com/gravitational/trace" "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" "google.golang.org/grpc" @@ -52,6 +52,7 @@ import ( "github.com/gravitational/teleport/lib/tlsca" "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/interval" + logutils "github.com/gravitational/teleport/lib/utils/log" ) // reconnectToAuthService continuously attempts to reconnect to the auth @@ -85,11 +86,11 @@ func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (* case errors.As(connectErr, &invalidVersionErr{}): return nil, trace.Wrap(connectErr) case role == types.RoleNode && strings.Contains(connectErr.Error(), auth.TokenExpiredOrNotFound): - process.log.Error("Can not join the cluster as node, the token expired or not found. Regenerate the token and try again.") + process.logger.ErrorContext(process.ExitContext(), "Can not join the cluster as node, the token expired or not found. Regenerate the token and try again.") default: - process.log.Errorf("%v failed to establish connection to cluster: %v.", role, connectErr) + process.logger.ErrorContext(process.ExitContext(), "Failed to establish connection to cluster.", "identity", role, "error", connectErr) if process.Config.Version == defaults.TeleportConfigVersionV3 && process.Config.ProxyServer.IsEmpty() { - process.log.Errorf("Check to see if the config has auth_server pointing to a Teleport Proxy. If it does, use proxy_server instead of auth_server.") + process.logger.ErrorContext(process.ExitContext(), "Check to see if the config has auth_server pointing to a Teleport Proxy. If it does, use proxy_server instead of auth_server.") } } } @@ -104,10 +105,10 @@ func (process *TeleportProcess) reconnectToAuthService(role types.SystemRole) (* // Wait in between attempts, but return if teleport is shutting down select { case t := <-retry.After(): - process.log.Debugf("Retrying connection to auth server after waiting %v.", t.Sub(startedWait)) + process.logger.DebugContext(process.ExitContext(), "Retrying connection to auth server.", "identity", role, "backoff", t.Sub(startedWait)) retry.Inc() case <-process.ExitContext().Done(): - process.log.Infof("%v stopping connection attempts, teleport is shutting down.", role) + process.logger.InfoContext(process.ExitContext(), "Stopping connection attempts, teleport is shutting down.", "identity", role) return nil, ErrTeleportExited } } @@ -139,7 +140,7 @@ func (process *TeleportProcess) authServerTooOld(resp *proto.PingResponse) error if serverVersion.Major < teleportVersion.Major { if process.Config.SkipVersionCheck { - process.log.Warnf("Teleport instance is too new. This instance is running v%d. The auth server is running v%d and only supports instances on v%d or v%d.", teleportVersion.Major, serverVersion.Major, serverVersion.Major, serverVersion.Major-1) + process.logger.WarnContext(process.ExitContext(), "This instance is too new. Using a newer major version than the Auth server is unsupported and may impair functionality.", "version", teleportVersion.Major, "auth_version", serverVersion.Major, "supported_versions", []int64{serverVersion.Major, serverVersion.Major - 1}) return nil } return trace.Wrap(invalidVersionErr{ClusterMajorVersion: serverVersion.Major, LocalMajorVersion: teleportVersion.Major}) @@ -155,7 +156,7 @@ func (process *TeleportProcess) connectToAuthService(role types.SystemRole, opts if err != nil { return nil, trace.Wrap(err) } - process.log.Debugf("Connected client: %v", connector.ClientIdentity) + process.logger.DebugContext(process.ExitContext(), "Client successfully connected to cluster", "client_identity", logutils.StringerAttr(connector.ClientIdentity)) process.addConnector(connector) return connector, nil @@ -179,7 +180,7 @@ func (process *TeleportProcess) connect(role types.SystemRole, opts ...certOptio c, err := process.firstTimeConnect(role) return c, trace.Wrap(err) } - process.log.Debugf("Connected state: %v.", state.Spec.Rotation.String()) + process.logger.DebugContext(process.ExitContext(), "Got connected state.", "rotation_state", logutils.StringerAttr(&state.Spec.Rotation)) identity, err := process.GetIdentity(role) if err != nil { @@ -198,7 +199,7 @@ func (process *TeleportProcess) connect(role types.SystemRole, opts ...certOptio ServerIdentity: identity, }, nil } - process.log.Infof("Connecting to the cluster %v with TLS client certificate.", identity.ClusterName) + process.logger.InfoContext(process.ExitContext(), "Connecting to the cluster with TLS client certificate.", "cluster", identity.ClusterName) connector, err := process.getConnector(identity, identity) if err != nil { // In the event that a user is attempting to connect a machine to @@ -208,7 +209,7 @@ func (process *TeleportProcess) connect(role types.SystemRole, opts ...certOptio // made. So provide a more user friendly error as a hint of what // they can do to resolve the issue. if strings.Contains(err.Error(), "certificate signed by unknown authority") { - process.log.Errorf("Was this node already registered to a different cluster? To join this node to a new cluster, remove `%s` and try again", process.Config.DataDir) + process.logger.ErrorContext(process.ExitContext(), "Was this node already registered to a different cluster? To join this node to a new cluster, remove the data_dir and try again", "data_dir", process.Config.DataDir) } return nil, trace.Wrap(err) } @@ -290,7 +291,7 @@ type KeyPair struct { func (process *TeleportProcess) deleteKeyPair(role types.SystemRole, reason string) { process.keyMutex.Lock() defer process.keyMutex.Unlock() - process.log.Debugf("Deleted generated key pair %v %v.", role, reason) + process.logger.DebugContext(process.ExitContext(), "Deleted generated key pair.", "identity", role, "reason", reason) delete(process.keyPairs, keyPairKey{role: role, reason: reason}) } @@ -301,10 +302,10 @@ func (process *TeleportProcess) generateKeyPair(role types.SystemRole, reason st mapKey := keyPairKey{role: role, reason: reason} keyPair, ok := process.keyPairs[mapKey] if ok { - process.log.Debugf("Returning existing key pair for %v %v.", role, reason) + process.logger.DebugContext(process.ExitContext(), "Returning existing key pair for.", "identity", role, "reason", reason) return &keyPair, nil } - process.log.Debugf("Generating new key pair for %v %v.", role, reason) + process.logger.DebugContext(process.ExitContext(), "Generating new key pair.", "identity", role, "reason", reason) privPEM, pubSSH, err := native.GenerateKeyPair() if err != nil { return nil, trace.Wrap(err) @@ -395,7 +396,7 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec if process.getLocalAuth() != nil { // Auth service is on the same host, no need to go though the invitation // procedure. - process.log.Debugf("This server has local Auth server started, using it to add role to the cluster.") + process.logger.DebugContext(process.ExitContext(), "This server has local Auth server started, using it to add role to the cluster.") var systemRoles []types.SystemRole if role == types.RoleInstance { // normally this is taken from the join token, but if we're dealing with a local auth server, we @@ -413,7 +414,7 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec return nil, trace.BadParameter("%v must join a cluster and needs a provisioning token", role) } - process.log.Infof("Joining the cluster with a secure token.") + process.logger.InfoContext(process.ExitContext(), "Joining the cluster with a secure token.") const reason = "first-time-connect" keyPair, err := process.generateKeyPair(role, reason) if err != nil { @@ -471,7 +472,7 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec process.deleteKeyPair(role, reason) } - process.log.Infof("%v has obtained credentials to connect to the cluster.", role) + process.logger.InfoContext(process.ExitContext(), "Successfully obtained credentials to connect to the cluster.", "identity", role) var connector *Connector if role == types.RoleAdmin || role == types.RoleAuth { connector = &Connector{ @@ -495,7 +496,7 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec } if err := process.storage.WriteIdentity(auth.IdentityCurrent, *identity); err != nil { - process.log.Warningf("Failed to write %v identity: %v.", role, err) + process.logger.WarnContext(process.ExitContext(), "Failed to write identity to storage.", "identity", role, "error", err) } if err := process.storage.WriteState(role, auth.StateV2{ @@ -505,7 +506,7 @@ func (process *TeleportProcess) firstTimeConnect(role types.SystemRole) (*Connec }); err != nil { return nil, trace.NewAggregate(err, connector.Close()) } - process.log.Infof("The process successfully wrote the credentials and state of %v to the disk.", role) + process.logger.InfoContext(process.ExitContext(), "The process successfully wrote the credentials and state to the disk.", "identity", role) return connector, nil } @@ -630,7 +631,7 @@ func (process *TeleportProcess) periodicSyncRotationState() error { if _, err := process.WaitForEvent(process.GracefulExitContext(), TeleportReadyEvent); err != nil { return nil } - process.log.Infof("The new service has started successfully. Starting syncing rotation status with period %v.", process.Config.PollingPeriod) + process.logger.InfoContext(process.ExitContext(), "The new service has started successfully. Starting syncing rotation status.", "sync_interval", process.Config.PollingPeriod) periodic := interval.New(interval.Config{ Duration: process.Config.RotationConnectionInterval, @@ -645,7 +646,7 @@ func (process *TeleportProcess) periodicSyncRotationState() error { return nil } - process.log.Warningf("Sync rotation state cycle failed. Retrying in ~%v", process.Config.RotationConnectionInterval) + process.logger.WarnContext(process.ExitContext(), "Sync rotation state cycle failed.", "retry_interval", process.Config.RotationConnectionInterval) select { case <-periodic.Next(): @@ -706,11 +707,9 @@ func (process *TeleportProcess) syncRotationStateCycle() error { } ca, ok := event.Resource.(types.CertAuthority) if !ok { - process.log.Debugf("Skipping event %v for %v", event.Type, event.Resource.GetName()) continue } if ca.GetType() != types.HostCA || ca.GetClusterName() != conn.ClientIdentity.ClusterName { - process.log.Debugf("Skipping event for %v %v", ca.GetType(), ca.GetClusterName()) continue } status, err := process.syncRotationStateAndBroadcast(conn) @@ -742,21 +741,21 @@ func (process *TeleportProcess) syncRotationStateAndBroadcast(conn *Connector) ( status, err := process.syncRotationState(conn) if err != nil { if trace.IsConnectionProblem(err) { - process.log.Warningf("Connection problem: sync rotation state: %v.", err) + process.logger.WarnContext(process.ExitContext(), "Connection problem: sync rotation state.", "error", err) } else { - process.log.Warningf("Failed to sync rotation state: %v.", err) + process.logger.WarnContext(process.ExitContext(), "Failed to sync rotation state.", "error", err) } return nil, trace.Wrap(err) } if status.phaseChanged || status.needsReload { - process.log.Debugf("Sync rotation state detected cert authority reload phase update.") + process.logger.DebugContext(process.ExitContext(), "Sync rotation state detected cert authority reload phase update.") } if status.phaseChanged { process.BroadcastEvent(Event{Name: TeleportPhaseChangeEvent}) } if status.needsReload { - process.log.Debugf("Triggering reload process.") + process.logger.DebugContext(process.ExitContext(), "Triggering reload process.") process.BroadcastEvent(Event{Name: TeleportReloadEvent}) } return status, nil @@ -814,7 +813,7 @@ type rotationStatus struct { // checkServerIdentity returns a boolean that indicates the host certificate // needs to be regenerated. -func checkServerIdentity(conn *Connector, additionalPrincipals []string, dnsNames []string, log logrus.FieldLogger) bool { +func checkServerIdentity(ctx context.Context, conn *Connector, additionalPrincipals []string, dnsNames []string, logger *slog.Logger) bool { var principalsChanged bool var dnsNamesChanged bool @@ -831,13 +830,11 @@ func checkServerIdentity(conn *Connector, additionalPrincipals []string, dnsName // certificate need to be updated. if len(additionalPrincipals) != 0 && !conn.ServerIdentity.HasPrincipals(principalsToCheck) { principalsChanged = true - log.Debugf("Rotation in progress, adding %v to SSH principals %v.", - additionalPrincipals, conn.ServerIdentity.Cert.ValidPrincipals) + logger.DebugContext(ctx, "Rotation in progress, updating SSH principals.", "additional_principals", additionalPrincipals, "current_principals", conn.ServerIdentity.Cert.ValidPrincipals) } if len(dnsNames) != 0 && !conn.ServerIdentity.HasDNSNames(dnsNames) { dnsNamesChanged = true - log.Debugf("Rotation in progress, adding %v to x590 DNS names in SAN %v.", - dnsNames, conn.ServerIdentity.XCert.DNSNames) + logger.DebugContext(ctx, "Rotation in progress, updating DNS names.", "additional_dns_names", dnsNames, "current_dns_names", conn.ServerIdentity.XCert.DNSNames) } return principalsChanged || dnsNamesChanged @@ -855,7 +852,7 @@ func (process *TeleportProcess) rotate(conn *Connector, localState auth.StateV2, // Check if any of the SSH principals or TLS DNS names have changed and the // host credentials need to be regenerated. - regenerateCertificate := checkServerIdentity(conn, additionalPrincipals, dnsNames, process.log) + regenerateCertificate := checkServerIdentity(process.ExitContext(), conn, additionalPrincipals, dnsNames, process.logger) // If the local state matches remote state and neither principals or DNS // names changed, nothing to do. CA is in sync. @@ -888,7 +885,7 @@ func (process *TeleportProcess) rotate(conn *Connector, localState auth.StateV2, // rollback cycle. case "", types.RotationStateStandby: if regenerateCertificate { - process.log.Infof("Service %v has updated principals to %q, DNS Names to %q, going to request new principals and update.", id.Role, additionalPrincipals, dnsNames) + process.logger.InfoContext(process.ExitContext(), "Service has updated principals and DNS Names, going to request new principals and update.", "identity", id.Role, "principals", additionalPrincipals, "dns_names", dnsNames) identity, err := process.reRegister(conn, additionalPrincipals, dnsNames, remote) if err != nil { return nil, trace.Wrap(err) @@ -946,7 +943,7 @@ func (process *TeleportProcess) rotate(conn *Connector, localState auth.StateV2, if err != nil { return nil, trace.Wrap(err) } - process.log.Debugf("Re-registered, received new identity %v.", identity) + process.logger.DebugContext(process.ExitContext(), "Re-registered, received new identity.", "identity", logutils.StringerAttr(identity)) err = writeStateAndIdentity(auth.IdentityReplacement, identity) if err != nil { return nil, trace.Wrap(err) @@ -1001,7 +998,7 @@ func (process *TeleportProcess) getConnector(clientIdentity, serverIdentity *aut // before acquiring their own client. if conn := process.waitForInstanceConnector(); conn != nil && conn.Client != nil { if conn.ClientIdentity.HasSystemRole(clientIdentity.ID.Role) { - process.log.Infof("Reusing Instance client for %s. additionalSystemRoles=%+v", clientIdentity.ID.Role, conn.ClientIdentity.SystemRoles) + process.logger.InfoContext(process.ExitContext(), "Reusing Instance client.", "identity", clientIdentity.ID.Role, "additional_system_roles", conn.ClientIdentity.SystemRoles) return &Connector{ ClientIdentity: clientIdentity, ServerIdentity: serverIdentity, @@ -1009,10 +1006,10 @@ func (process *TeleportProcess) getConnector(clientIdentity, serverIdentity *aut ReusedClient: true, }, nil } else { - process.log.Warnf("Unable to reuse Instance client for %s. additionalSystemRoles=%+v", clientIdentity.ID.Role, conn.ClientIdentity.SystemRoles) + process.logger.WarnContext(process.ExitContext(), "Unable to reuse Instance client.", "identity", clientIdentity.ID.Role, "additional_system_roles", conn.ClientIdentity.SystemRoles) } } else { - process.log.Warnf("Unable to reuse Instance client for %s. (not available)", clientIdentity.ID.Role) + process.logger.WarnContext(process.ExitContext(), "Instance client not available for reuse.", "identity", clientIdentity.ID.Role) } } @@ -1030,7 +1027,7 @@ func (process *TeleportProcess) getConnector(clientIdentity, serverIdentity *aut // Set cluster features and return successfully with a working connector. process.setClusterFeatures(pingResponse.GetServerFeatures()) process.setAuthSubjectiveAddr(pingResponse.RemoteAddr) - process.log.Infof("%v: features loaded from auth server: %+v", clientIdentity.ID.Role, pingResponse.GetServerFeatures()) + process.logger.InfoContext(process.ExitContext(), "features loaded from auth server", "identity", clientIdentity.ID.Role, "features", pingResponse.GetServerFeatures()) return &Connector{ ClientIdentity: clientIdentity, @@ -1056,15 +1053,15 @@ func (process *TeleportProcess) newClient(identity *auth.Identity) (*auth.Client } authServers := process.Config.AuthServerAddresses() - connectToAuthServer := func(logger *logrus.Entry) (*auth.Client, *proto.PingResponse, error) { - logger.Debug("Attempting to connect to Auth Server directly.") + connectToAuthServer := func(logger *slog.Logger) (*auth.Client, *proto.PingResponse, error) { + logger.DebugContext(process.ExitContext(), "Attempting to connect to Auth Server directly.") clt, pingResponse, err := process.newClientDirect(authServers, tlsConfig, identity.ID.Role) if err != nil { - logger.Debug("Failed to connect to Auth Server directly.") + logger.DebugContext(process.ExitContext(), "Failed to connect to Auth Server directly.") return nil, nil, err } - logger.Debug("Connected to Auth Server with direct connection.") + logger.DebugContext(process.ExitContext(), "Connected to Auth Server with direct connection.") return clt, pingResponse, nil } @@ -1072,7 +1069,7 @@ func (process *TeleportProcess) newClient(identity *auth.Identity) (*auth.Client // for config v1 and v2, attempt to directly connect to the auth server and fall back to tunneling case defaults.TeleportConfigVersionV1, defaults.TeleportConfigVersionV2: // if we don't have a proxy address, try to connect to the auth server directly - logger := process.log.WithField("auth-addrs", utils.NetAddrsToStrings(authServers)) + logger := process.logger.With("auth-addrs", utils.NetAddrsToStrings(authServers)) directClient, resp, directErr := connectToAuthServer(logger) if directErr == nil { @@ -1086,17 +1083,17 @@ func (process *TeleportProcess) newClient(identity *auth.Identity) (*auth.Client // if that fails, attempt to connect to the auth server through a tunnel - logger.Debug("Attempting to discover reverse tunnel address.") - logger.Debug("Attempting to connect to Auth Server through tunnel.") + logger.DebugContext(process.ExitContext(), "Attempting to discover reverse tunnel address.") + logger.DebugContext(process.ExitContext(), "Attempting to connect to Auth Server through tunnel.") tunnelClient, pingResponse, err := process.newClientThroughTunnel(tlsConfig, sshClientConfig) if err != nil { - process.log.Errorf("Node failed to establish connection to Teleport Proxy. We have tried the following endpoints:") - process.log.Errorf("- connecting to auth server directly: %v", directErr) + process.logger.ErrorContext(process.ExitContext(), "Node failed to establish connection to Teleport Proxy. We have tried the following endpoints:") + process.logger.ErrorContext(process.ExitContext(), "- connecting to auth server directly", "error", directErr) if trace.IsConnectionProblem(err) && strings.Contains(err.Error(), "connection refused") { err = trace.Wrap(err, "This is the alternative port we tried and it's not configured.") } - process.log.Errorf("- connecting to auth server through tunnel: %v", err) + process.logger.ErrorContext(process.ExitContext(), "- connecting to auth server through tunnel", "error", err) collectedErrs := trace.NewAggregate(directErr, err) if utils.IsUntrustedCertErr(collectedErrs) { collectedErrs = trace.Wrap(collectedErrs, utils.SelfSignedCertsMsg) @@ -1104,28 +1101,28 @@ func (process *TeleportProcess) newClient(identity *auth.Identity) (*auth.Client return nil, nil, trace.Wrap(collectedErrs, "Failed to connect to Auth Server directly or over tunnel, no methods remaining.") } - logger.Debug("Connected to Auth Server through tunnel.") + logger.DebugContext(process.ExitContext(), "Connected to Auth Server through tunnel.") return tunnelClient, pingResponse, nil // for config v3, either tunnel to the given proxy server or directly connect to the given auth server case defaults.TeleportConfigVersionV3: proxyServer := process.Config.ProxyServer if !proxyServer.IsEmpty() { - logger := process.log.WithField("proxy-server", proxyServer.String()) - logger.Debug("Attempting to connect to Auth Server through tunnel.") + logger := process.logger.With("proxy-server", proxyServer.String()) + logger.DebugContext(process.ExitContext(), "Attempting to connect to Auth Server through tunnel.") tunnelClient, pingResponse, err := process.newClientThroughTunnel(tlsConfig, sshClientConfig) if err != nil { return nil, nil, trace.Errorf("Failed to connect to Proxy Server through tunnel: %v", err) } - logger.Debug("Connected to Auth Server through tunnel.") + logger.DebugContext(process.ExitContext(), "Connected to Auth Server through tunnel.") return tunnelClient, pingResponse, nil } // if we don't have a proxy address, try to connect to the auth server directly - logger := process.log.WithField("auth-server", utils.NetAddrsToStrings(authServers)) + logger := process.logger.With("auth-server", utils.NetAddrsToStrings(authServers)) return connectToAuthServer(logger) } diff --git a/lib/service/db.go b/lib/service/db.go index 0039c86e9ae99..16099136c29b2 100644 --- a/lib/service/db.go +++ b/lib/service/db.go @@ -47,10 +47,9 @@ func (process *TeleportProcess) initDatabases() { } func (process *TeleportProcess) initDatabaseService() (retErr error) { - log := process.log.WithField(trace.Component, teleport.Component( - teleport.ComponentDatabase, process.id)) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentDatabase, process.id)) - conn, err := process.WaitForConnector(DatabasesIdentityEvent, log) + conn, err := process.WaitForConnector(DatabasesIdentityEvent, logger) if conn == nil { return trace.Wrap(err) } @@ -88,7 +87,7 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) { lockWatcher, err := services.NewLockWatcher(process.ExitContext(), services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentDatabase, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentDatabase, process.id)), Client: conn.Client, }, }) @@ -101,7 +100,7 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) { ClusterName: clusterName, AccessPoint: accessPoint, LockWatcher: lockWatcher, - Logger: log, + Logger: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentDatabase, process.id)), }) if err != nil { return trace.Wrap(err) @@ -117,7 +116,7 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) { } defer func() { if retErr != nil { - warnOnErr(asyncEmitter.Close(), process.log) + warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) } }() @@ -171,7 +170,7 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) { } defer func() { if retErr != nil { - warnOnErr(dbService.Close(), process.log) + warnOnErr(process.ExitContext(), dbService.Close(), logger) } }() @@ -206,25 +205,25 @@ func (process *TeleportProcess) initDatabaseService() (retErr error) { process.OnExit("db.stop", func(payload interface{}) { if dbService != nil { if payload == nil { - log.Info("Shutting down immediately.") - warnOnErr(dbService.Close(), log) + logger.InfoContext(process.ExitContext(), "Shutting down immediately.") + warnOnErr(process.ExitContext(), dbService.Close(), logger) } else { - log.Info("Shutting down gracefully.") - warnOnErr(dbService.Shutdown(payloadContext(payload, log)), log) + logger.InfoContext(process.ExitContext(), "Shutting down gracefully.") + warnOnErr(process.ExitContext(), dbService.Shutdown(payloadContext(payload)), logger) } } if asyncEmitter != nil { - warnOnErr(asyncEmitter.Close(), log) + warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) } if agentPool != nil { agentPool.Stop() } - warnOnErr(conn.Close(), log) - log.Info("Exited.") + warnOnErr(process.ExitContext(), conn.Close(), logger) + logger.InfoContext(process.ExitContext(), "Exited.") }) process.BroadcastEvent(Event{Name: DatabasesReady, Payload: nil}) - log.Infof("Database service has successfully started: %v.", databases) + logger.InfoContext(process.ExitContext(), "Database service has successfully started.", "database", databases) // Block and wait while the server and agent pool are running. if err := dbService.Wait(); err != nil { diff --git a/lib/service/desktop.go b/lib/service/desktop.go index 70cd6ac41ac90..19580b5d0cb02 100644 --- a/lib/service/desktop.go +++ b/lib/service/desktop.go @@ -20,12 +20,12 @@ package service import ( "crypto/tls" + "log/slog" "net" "net/http" "strconv" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -44,28 +44,26 @@ import ( ) func (process *TeleportProcess) initWindowsDesktopService() { - log := process.log.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentWindowsDesktop, process.id), - }) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentWindowsDesktop, process.id)) process.RegisterWithAuthServer(types.RoleWindowsDesktop, WindowsDesktopIdentityEvent) process.RegisterCriticalFunc("windows_desktop.init", func() error { - conn, err := process.WaitForConnector(WindowsDesktopIdentityEvent, log) + conn, err := process.WaitForConnector(WindowsDesktopIdentityEvent, logger) if conn == nil { return trace.Wrap(err) } - if err := process.initWindowsDesktopServiceRegistered(log, conn); err != nil { - warnOnErr(conn.Close(), log) + if err := process.initWindowsDesktopServiceRegistered(logger, conn); err != nil { + warnOnErr(process.ExitContext(), conn.Close(), logger) return trace.Wrap(err) } return nil }) } -func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus.Entry, conn *Connector) (retErr error) { +func (process *TeleportProcess) initWindowsDesktopServiceRegistered(logger *slog.Logger, conn *Connector) (retErr error) { defer func() { if err := process.closeImportedDescriptors(teleport.ComponentWindowsDesktop); err != nil { - log.WithError(err).Warn("Failed closing imported file descriptors.") + logger.WarnContext(process.ExitContext(), "Failed closing imported file descriptors.") } }() cfg := process.Config @@ -98,14 +96,14 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. // Start a local listener and let proxies dial in. case !useTunnel && !cfg.WindowsDesktop.ListenAddr.IsEmpty(): - log.Info("Using local listener and registering directly with auth server") + logger.InfoContext(process.ExitContext(), "Using local listener and registering directly with auth server") listener, err = process.importOrCreateListener(ListenerWindowsDesktop, cfg.WindowsDesktop.ListenAddr.Addr) if err != nil { return trace.Wrap(err) } defer func() { if retErr != nil { - warnOnErr(listener.Close(), log) + warnOnErr(process.ExitContext(), listener.Close(), logger) } }() @@ -139,13 +137,13 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. agentPool.Stop() } }() - log.Info("Using a reverse tunnel to register and handle proxy connections") + logger.InfoContext(process.ExitContext(), "Using a reverse tunnel to register and handle proxy connections") } lockWatcher, err := services.NewLockWatcher(process.ExitContext(), services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentWindowsDesktop, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentWindowsDesktop, process.id)), Clock: cfg.Clock, Client: conn.Client, }, @@ -160,7 +158,7 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. ClusterName: clusterName, AccessPoint: accessPoint, LockWatcher: lockWatcher, - Logger: log, + Logger: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentWindowsDesktop, process.id)), // Device authorization breaks browser-based access. DeviceAuthorization: authz.DeviceAuthorizationOpts{ DisableGlobalMode: true, @@ -183,7 +181,7 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. if info.ServerName != "" { clusterName, err = apiutils.DecodeClusterName(info.ServerName) if err != nil && !trace.IsNotFound(err) { - log.Debugf("Ignoring unsupported cluster name %q.", info.ServerName) + logger.DebugContext(process.ExitContext(), "Ignoring unsupported cluster name.", "cluster_name", info.ServerName) } } pool, _, err := auth.DefaultClientCertPool(accessPoint, clusterName) @@ -213,7 +211,7 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. srv, err := desktop.NewWindowsService(desktop.WindowsServiceConfig{ DataDir: process.Config.DataDir, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentWindowsDesktop, process.id)), Clock: process.Clock, Authorizer: authorizer, Emitter: conn.Client, @@ -244,14 +242,14 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. } defer func() { if retErr != nil { - warnOnErr(srv.Close(), log) + warnOnErr(process.ExitContext(), srv.Close(), logger) } }() process.RegisterCriticalFunc("windows_desktop.serve", func() error { if useTunnel { - log.Info("Starting Windows desktop service via proxy reverse tunnel.") + logger.InfoContext(process.ExitContext(), "Starting Windows desktop service via proxy reverse tunnel.") } else { - log.Infof("Starting Windows desktop service on %v.", listener.Addr()) + logger.InfoContext(process.ExitContext(), "Starting Windows desktop service.", "listen_address", listener.Addr()) } process.BroadcastEvent(Event{Name: WindowsDesktopReady, Payload: nil}) @@ -286,16 +284,16 @@ func (process *TeleportProcess) initWindowsDesktopServiceRegistered(log *logrus. // Cleanup, when process is exiting. process.OnExit("windows_desktop.shutdown", func(payload interface{}) { // Fast shutdown. - warnOnErr(srv.Close(), log) + warnOnErr(process.ExitContext(), srv.Close(), logger) agentPool.Stop() if payload != nil { // Graceful shutdown. agentPool.Wait() } - warnOnErr(listener.Close(), log) - warnOnErr(conn.Close(), log) + warnOnErr(process.ExitContext(), listener.Close(), logger) + warnOnErr(process.ExitContext(), conn.Close(), logger) - log.Info("Exited.") + logger.InfoContext(process.ExitContext(), "Exited.") }) return nil } diff --git a/lib/service/discovery.go b/lib/service/discovery.go index ca2398790e465..6fc03adadc24c 100644 --- a/lib/service/discovery.go +++ b/lib/service/discovery.go @@ -20,11 +20,11 @@ package service import ( "context" + "log/slog" "os" "time" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/types" @@ -44,10 +44,9 @@ func (process *TeleportProcess) initDiscovery() { } func (process *TeleportProcess) initDiscoveryService() error { - log := process.log.WithField(trace.Component, teleport.Component( - teleport.ComponentDiscovery, process.id)) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentDiscovery, process.id)) - conn, err := process.WaitForConnector(DiscoveryIdentityEvent, log) + conn, err := process.WaitForConnector(DiscoveryIdentityEvent, logger) if conn == nil { return trace.Wrap(err) } @@ -76,7 +75,7 @@ func (process *TeleportProcess) initDiscoveryService() error { process.ExitContext(), process.Config, process.getInstanceClient(), - log, + logger, ) if err != nil { return trace.Wrap(err, "failed to build access graph configuration") @@ -107,15 +106,15 @@ func (process *TeleportProcess) initDiscoveryService() error { } process.OnExit("discovery.stop", func(payload interface{}) { - log.Info("Shutting down.") + logger.InfoContext(process.ExitContext(), "Shutting down.") if discoveryService != nil { discoveryService.Stop() } if asyncEmitter != nil { - warnOnErr(asyncEmitter.Close(), process.log) + warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) } - warnOnErr(conn.Close(), log) - log.Info("Exited.") + warnOnErr(process.ExitContext(), conn.Close(), logger) + logger.InfoContext(process.ExitContext(), "Exited.") }) process.BroadcastEvent(Event{Name: DiscoveryReady, Payload: nil}) @@ -123,7 +122,7 @@ func (process *TeleportProcess) initDiscoveryService() error { if err := discoveryService.Start(); err != nil { return trace.Wrap(err) } - log.Infof("Discovery service has successfully started") + logger.InfoContext(process.ExitContext(), "Discovery service has successfully started") if err := discoveryService.Wait(); err != nil { return trace.Wrap(err) @@ -143,7 +142,7 @@ func (process *TeleportProcess) integrationOnlyCredentials() bool { // buildAccessGraphFromTAGOrFallbackToAuth builds the AccessGraphConfig from the Teleport Agent configuration or falls back to the Auth server's configuration. // If the AccessGraph configuration is not enabled locally, it will fall back to the Auth server's configuration. -func buildAccessGraphFromTAGOrFallbackToAuth(ctx context.Context, config *servicecfg.Config, client auth.ClientI, logger logrus.FieldLogger) (discovery.AccessGraphConfig, error) { +func buildAccessGraphFromTAGOrFallbackToAuth(ctx context.Context, config *servicecfg.Config, client auth.ClientI, logger *slog.Logger) (discovery.AccessGraphConfig, error) { var ( accessGraphCAData []byte err error @@ -164,13 +163,13 @@ func buildAccessGraphFromTAGOrFallbackToAuth(ctx context.Context, config *servic CA: accessGraphCAData, } if !accessGraphCfg.Enabled { - logger.Debug("Access graph is disabled or not configured. Falling back to the Auth server's access graph configuration.") + logger.DebugContext(ctx, "Access graph is disabled or not configured. Falling back to the Auth server's access graph configuration.") ctx, cancel := context.WithTimeout(ctx, 5*time.Second) rsp, err := client.GetClusterAccessGraphConfig(ctx) cancel() switch { case trace.IsNotImplemented(err): - logger.Debug("Auth server does not support access graph's GetClusterAccessGraphConfig RPC") + logger.DebugContext(ctx, "Auth server does not support access graph's GetClusterAccessGraphConfig RPC") case err != nil: return discovery.AccessGraphConfig{}, trace.Wrap(err) default: diff --git a/lib/service/discovery_test.go b/lib/service/discovery_test.go index 180e033d279aa..c858360a6e408 100644 --- a/lib/service/discovery_test.go +++ b/lib/service/discovery_test.go @@ -20,10 +20,10 @@ package service import ( "context" + "log/slog" "testing" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" clusterconfigpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/clusterconfig/v1" @@ -157,7 +157,7 @@ func TestTeleportProcess_initDiscoveryService(t *testing.T) { rsp: tt.rsp, err: tt.err, }, - logrus.StandardLogger(), + slog.Default(), ) tt.assertErr(t, err) require.Equal(t, tt.want, accessGraphCfg) diff --git a/lib/service/kubernetes.go b/lib/service/kubernetes.go index d211b46acc059..cc70afa9149cd 100644 --- a/lib/service/kubernetes.go +++ b/lib/service/kubernetes.go @@ -19,11 +19,11 @@ package service import ( + "log/slog" "net" "net/http" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport" apidefaults "github.com/gravitational/teleport/api/defaults" @@ -38,34 +38,31 @@ import ( ) func (process *TeleportProcess) initKubernetes() { - log := process.log.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentKube, process.id), - }) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentKube, process.id)) process.RegisterWithAuthServer(types.RoleKube, KubeIdentityEvent) process.RegisterCriticalFunc("kube.init", func() error { - conn, err := process.WaitForConnector(KubeIdentityEvent, log) + conn, err := process.WaitForConnector(KubeIdentityEvent, logger) if conn == nil { return trace.Wrap(err) } if !process.getClusterFeatures().Kubernetes { - log.Warn("Warning: Kubernetes service not intialized because Teleport Auth Server is not licensed for Kubernetes Access. ", - "Please contact the cluster administrator to enable it.") + logger.WarnContext(process.ExitContext(), "Warning: Kubernetes service not initialized because Teleport Auth Server is not licensed for Kubernetes Access. Please contact the cluster administrator to enable it.") return nil } - if err := process.initKubernetesService(log, conn); err != nil { - warnOnErr(conn.Close(), log) + if err := process.initKubernetesService(logger, conn); err != nil { + warnOnErr(process.ExitContext(), conn.Close(), logger) return trace.Wrap(err) } return nil }) } -func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *Connector) (retErr error) { +func (process *TeleportProcess) initKubernetesService(logger *slog.Logger, conn *Connector) (retErr error) { // clean up unused descriptors passed for proxy, but not used by it defer func() { if err := process.closeImportedDescriptors(teleport.ComponentKube); err != nil { - log.WithError(err).Warn("Failed closing imported file descriptors.") + logger.WarnContext(process.ExitContext(), "Failed closing imported file descriptors.", "error", err) } }() cfg := process.Config @@ -106,14 +103,14 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C // Start a local listener and let proxies dial in. case !conn.UseTunnel() && !cfg.Kube.ListenAddr.IsEmpty(): - log.Debug("Turning on Kubernetes service listening address.") + logger.DebugContext(process.ExitContext(), "Turning on Kubernetes service listening address.") listener, err = process.importOrCreateListener(ListenerKube, cfg.Kube.ListenAddr.Addr) if err != nil { return trace.Wrap(err) } defer func() { if retErr != nil { - warnOnErr(listener.Close(), log) + warnOnErr(process.ExitContext(), listener.Close(), logger) } }() @@ -147,14 +144,14 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C agentPool.Stop() } }() - log.Info("Started reverse tunnel client.") + logger.InfoContext(process.ExitContext(), "Started reverse tunnel client.") } var dynLabels *labels.Dynamic if len(cfg.Kube.DynamicLabels) != 0 { dynLabels, err = labels.NewDynamic(process.ExitContext(), &labels.DynamicConfig{ Labels: cfg.Kube.DynamicLabels, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentKube, process.id)), }) if err != nil { return trace.Wrap(err) @@ -171,7 +168,7 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C lockWatcher, err := services.NewLockWatcher(process.ExitContext(), services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentKube, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentKube, process.id)), Client: conn.Client, }, }) @@ -184,7 +181,7 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C ClusterName: teleportClusterName, AccessPoint: accessPoint, LockWatcher: lockWatcher, - Logger: log, + Logger: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentKube, process.id)), }) if err != nil { return trace.Wrap(err) @@ -238,7 +235,7 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C StaticLabels: cfg.Kube.StaticLabels, DynamicLabels: dynLabels, CloudLabels: process.cloudLabels, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentKube, process.id)), PROXYProtocolMode: multiplexer.PROXYProtocolOff, // Kube service doesn't need to process unsigned PROXY headers. }) if err != nil { @@ -246,14 +243,14 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C } defer func() { if retErr != nil { - warnOnErr(kubeServer.Close(), log) + warnOnErr(process.ExitContext(), kubeServer.Close(), logger) } }() process.RegisterCriticalFunc("kube.serve", func() error { if conn.UseTunnel() { - log.Info("Starting Kube service via proxy reverse tunnel.") + logger.InfoContext(process.ExitContext(), "Starting Kube service via proxy reverse tunnel.") } else { - log.Infof("Starting Kube service on %v.", listener.Addr()) + logger.InfoContext(process.ExitContext(), "Starting Kube service.", "listen_address", listener.Addr()) } process.BroadcastEvent(Event{Name: KubernetesReady, Payload: nil}) err := kubeServer.Serve(listener) @@ -271,25 +268,25 @@ func (process *TeleportProcess) initKubernetesService(log *logrus.Entry, conn *C // Clean up items in reverse order from their initialization. if payload != nil { // Graceful shutdown. - warnOnErr(kubeServer.Shutdown(payloadContext(payload, log)), log) + warnOnErr(process.ExitContext(), kubeServer.Shutdown(payloadContext(payload)), logger) agentPool.Stop() agentPool.Wait() } else { // Fast shutdown. - warnOnErr(kubeServer.Close(), log) + warnOnErr(process.ExitContext(), kubeServer.Close(), logger) agentPool.Stop() } if asyncEmitter != nil { - warnOnErr(asyncEmitter.Close(), log) + warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) } - warnOnErr(listener.Close(), log) - warnOnErr(conn.Close(), log) + warnOnErr(process.ExitContext(), listener.Close(), logger) + warnOnErr(process.ExitContext(), conn.Close(), logger) if dynLabels != nil { dynLabels.Close() } - log.Info("Exited.") + logger.InfoContext(process.ExitContext(), "Exited.") }) return nil } diff --git a/lib/service/service.go b/lib/service/service.go index 0546e9cf45089..bf17efa1351ae 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -146,6 +146,7 @@ import ( usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport" "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/cert" + logutils "github.com/gravitational/teleport/lib/utils/log" vc "github.com/gravitational/teleport/lib/versioncontrol" uw "github.com/gravitational/teleport/lib/versioncontrol/upgradewindow" "github.com/gravitational/teleport/lib/web" @@ -551,16 +552,16 @@ func (process *TeleportProcess) addConnector(connector *Connector) { // if conn == nil { // return trace.Wrap(err) // } -func (process *TeleportProcess) WaitForConnector(identityEvent string, log logrus.FieldLogger) (*Connector, error) { +func (process *TeleportProcess) WaitForConnector(identityEvent string, log *slog.Logger) (*Connector, error) { event, err := process.WaitForEvent(process.ExitContext(), identityEvent) if err != nil { if log != nil { - log.Debugf("Process is exiting.") + log.DebugContext(process.ExitContext(), "Process is exiting.") } return nil, nil } if log != nil { - log.Debugf("Received event %q.", event.Name) + log.DebugContext(process.ExitContext(), "Received event.", "event", event.Name) } conn, ok := (event.Payload).(*Connector) @@ -651,7 +652,7 @@ func (process *TeleportProcess) GetIdentity(role types.SystemRole) (i *auth.Iden if err != nil { return nil, trace.Wrap(err) } - process.log.Infof("Found static identity %v in the config file, writing to disk.", &id) + process.logger.InfoContext(process.ExitContext(), "Found static identity in the config file, writing to disk.", "identity", logutils.StringerAttr(&id)) if err = process.storage.WriteIdentity(auth.IdentityCurrent, *i); err != nil { return nil, trace.Wrap(err) } @@ -735,7 +736,7 @@ func waitAndReload(ctx context.Context, cfg servicecfg.Config, srv Process, newT cfg.Log.Infof("Started in-process service reload.") fileDescriptors, err := srv.ExportFileDescriptors() if err != nil { - warnOnErr(srv.Close(), cfg.Log) + warnOnErr(ctx, srv.Close(), cfg.Logger) return nil, trace.Wrap(err) } newCfg := cfg @@ -745,12 +746,12 @@ func waitAndReload(ctx context.Context, cfg servicecfg.Config, srv Process, newT newCfg.PIDFile = "" newSrv, err := newTeleport(&newCfg) if err != nil { - warnOnErr(srv.Close(), cfg.Log) + warnOnErr(ctx, srv.Close(), cfg.Logger) return nil, trace.Wrap(err, "failed to create a new service") } cfg.Log.Infof("Created new process.") if err := newSrv.Start(); err != nil { - warnOnErr(srv.Close(), cfg.Log) + warnOnErr(ctx, srv.Close(), cfg.Logger) return nil, trace.Wrap(err, "failed to start a new service") } @@ -764,8 +765,8 @@ func waitAndReload(ctx context.Context, cfg servicecfg.Config, srv Process, newT startCancel() }() if _, err := newSrv.WaitForEvent(startTimeoutCtx, TeleportReadyEvent); err != nil { - warnOnErr(newSrv.Close(), cfg.Log) - warnOnErr(srv.Close(), cfg.Log) + warnOnErr(ctx, newSrv.Close(), cfg.Logger) + warnOnErr(ctx, srv.Close(), cfg.Logger) return nil, trace.BadParameter("the new service has failed to start") } cfg.Log.Infof("New service has started successfully.") @@ -1037,19 +1038,19 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { }) process.inventoryHandle.RegisterPingHandler(func(sender inventory.DownstreamSender, ping proto.DownstreamInventoryPing) { - process.log.Infof("Handling incoming inventory ping (id=%d).", ping.ID) + process.logger.InfoContext(process.ExitContext(), "Handling incoming inventory ping.", "id", ping.ID) err := sender.Send(process.ExitContext(), proto.UpstreamInventoryPong{ ID: ping.ID, }) if err != nil { - process.log.Warnf("Failed to respond to inventory ping (id=%d): %v", ping.ID, err) + process.logger.WarnContext(process.ExitContext(), "Failed to respond to inventory ping.", "id", ping.ID, "error", err) } }) // if an external upgrader is defined, we need to set up an appropriate upgrade window exporter. if upgraderKind != "" { if process.Config.Auth.Enabled || process.Config.Proxy.Enabled { - process.log.Warnf("Use of external upgraders on control-plane instances is not recommended.") + process.logger.WarnContext(process.ExitContext(), "Use of external upgraders on control-plane instances is not recommended.") } driver, err := uw.NewDriver(upgraderKind) @@ -1071,7 +1072,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { exporter.Close() }) - process.log.Infof("Configured upgrade window exporter for external upgrader. kind=%s", upgraderKind) + process.logger.InfoContext(process.ExitContext(), "Configured upgrade window exporter for external upgrader.", "kind", upgraderKind) } if process.Config.Proxy.Enabled { @@ -1085,7 +1086,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { return nil, trace.Wrap(err) } } else { - warnOnErr(process.closeImportedDescriptors(teleport.ComponentDiagnostic), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentDiagnostic), process.logger) } if cfg.Tracing.Enabled { @@ -1152,7 +1153,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { } serviceStarted = true } else { - warnOnErr(process.closeImportedDescriptors(teleport.ComponentAuth), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentAuth), process.logger) } // initInstance initializes the pseudo-service "Instance" that is active for all teleport @@ -1168,7 +1169,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { } serviceStarted = true } else { - warnOnErr(process.closeImportedDescriptors(teleport.ComponentNode), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentNode), process.logger) } if cfg.Proxy.Enabled { @@ -1177,14 +1178,14 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { } serviceStarted = true } else { - warnOnErr(process.closeImportedDescriptors(teleport.ComponentProxy), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentProxy), process.logger) } if cfg.Kube.Enabled { process.initKubernetes() serviceStarted = true } else { - warnOnErr(process.closeImportedDescriptors(teleport.ComponentKube), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentKube), process.logger) } // If this process is proxying applications, start application access server. @@ -1192,7 +1193,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { process.initApps() serviceStarted = true } else { - warnOnErr(process.closeImportedDescriptors(teleport.ComponentApp), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentApp), process.logger) } if process.shouldInitDatabases() { @@ -1200,23 +1201,23 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { serviceStarted = true } else { if process.Config.Databases.Enabled { - process.log.Warn("Database service is enabled with empty configuration, skipping initialization") + process.logger.WarnContext(process.ExitContext(), "Database service is enabled with empty configuration, skipping initialization") } - warnOnErr(process.closeImportedDescriptors(teleport.ComponentDatabase), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentDatabase), process.logger) } if cfg.Metrics.Enabled { process.initMetricsService() serviceStarted = true } else { - warnOnErr(process.closeImportedDescriptors(teleport.ComponentMetrics), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentMetrics), process.logger) } if cfg.WindowsDesktop.Enabled { process.initWindowsDesktopService() serviceStarted = true } else { - warnOnErr(process.closeImportedDescriptors(teleport.ComponentWindowsDesktop), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentWindowsDesktop), process.logger) } if process.shouldInitDiscovery() { @@ -1224,9 +1225,9 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) { serviceStarted = true } else { if process.Config.Discovery.Enabled { - process.log.Warn("Discovery service is enabled with empty configuration, skipping initialization") + process.logger.WarnContext(process.ExitContext(), "Discovery service is enabled with empty configuration, skipping initialization") } - warnOnErr(process.closeImportedDescriptors(teleport.ComponentDiscovery), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentDiscovery), process.logger) } if process.enterpriseServicesEnabledWithCommunityBuild() { @@ -1294,9 +1295,9 @@ func (process *TeleportProcess) notifyParent() { signalPipe, err := process.importSignalPipe() if err != nil { if !trace.IsNotFound(err) { - process.log.Warningf("Failed to import signal pipe") + process.logger.WarnContext(process.ExitContext(), "Failed to import signal pipe") } - process.log.Debugf("No signal pipe to import, must be first Teleport process.") + process.logger.DebugContext(process.ExitContext(), "No signal pipe to import, must be first Teleport process.") return } defer signalPipe.Close() @@ -1305,16 +1306,16 @@ func (process *TeleportProcess) notifyParent() { defer cancel() if _, err := process.WaitForEvent(ctx, TeleportReadyEvent); err != nil { - process.log.Errorf("Timeout waiting for a forked process to start: %v. Initiating self-shutdown.", ctx.Err()) + process.logger.ErrorContext(process.ExitContext(), "Timeout waiting for a forked process to start. Initiating self-shutdown.", "error", ctx.Err()) if err := process.Close(); err != nil { - process.log.Warningf("Failed to shutdown process: %v.", err) + process.logger.WarnContext(process.ExitContext(), "Failed to shutdown process.", "error", err) } return } - process.log.Infof("New service has started successfully.") + process.logger.InfoContext(process.ExitContext(), "New service has started successfully.") if err := process.writeToSignalPipe(signalPipe, fmt.Sprintf("Process %v has started.", os.Getpid())); err != nil { - process.log.Warningf("Failed to write to signal pipe: %v", err) + process.logger.WarnContext(process.ExitContext(), "Failed to write to signal pipe", "error", err) // despite the failure, it's ok to proceed, // it could mean that the parent process has crashed and the pipe // is no longer valid. @@ -1533,7 +1534,7 @@ func (process *TeleportProcess) initAuthExternalAuditLog(auditConfig types.Clust return nil, trace.Wrap(err) } if externalAuditStorage.IsUsed() && (len(loggers) > 0 || uri.Scheme != teleport.ComponentAthena) { - process.log.Infof("Skipping events URI %s because External Audit Storage is enabled", eventsURI) + process.logger.InfoContext(ctx, "Skipping events URI because External Audit Storage is enabled", "events_uri", eventsURI) continue } switch uri.Scheme { @@ -1703,18 +1704,18 @@ func (process *TeleportProcess) initAuthService() error { // this is for teleconsole process.auditLog = events.NewDiscardAuditLog() - warningMessage := "Warning: Teleport audit and session recording have been " + + const warningMessage = "Warning: Teleport audit and session recording have been " + "turned off. This is dangerous, you will not be able to view audit events " + "or save and playback recorded sessions." - process.log.Warn(warningMessage) + process.logger.WarnContext(process.ExitContext(), warningMessage) emitter, streamer = events.NewDiscardEmitter(), events.NewDiscardStreamer() } else { // check if session recording has been disabled. note, we will continue // logging audit events, we just won't record sessions. if cfg.Auth.SessionRecordingConfig.GetMode() == types.RecordOff { - warningMessage := "Warning: Teleport session recording have been turned off. " + + const warningMessage = "Warning: Teleport session recording have been turned off. " + "This is dangerous, you will not be able to save and playback sessions." - process.log.Warn(warningMessage) + process.logger.WarnContext(process.ExitContext(), warningMessage) } if cfg.FIPS { @@ -1902,14 +1903,12 @@ func (process *TeleportProcess) initAuthService() error { return trace.Wrap(err) } - log := process.log.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentAuth, process.id), - }) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentAuth, process.id)) lockWatcher, err := services.NewLockWatcher(process.ExitContext(), services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentAuth, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentAuth, process.id)), Client: authServer.Services, }, }) @@ -1961,13 +1960,13 @@ func (process *TeleportProcess) initAuthService() error { authServer.SetGlobalNotificationCache(globalNotificationCache) if embedderClient != nil { - log.Debugf("Starting embedding watcher") + logger.DebugContext(process.ExitContext(), "Starting embedding watcher") embeddingProcessor := ai.NewEmbeddingProcessor(&ai.EmbeddingProcessorConfig{ AIClient: embedderClient, EmbeddingsRetriever: embeddingsRetriever, EmbeddingSrv: authServer, NodeSrv: authServer.UnifiedResourceCache, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentAuth, process.id)), Jitter: retryutils.NewFullJitter(), }) @@ -1986,10 +1985,10 @@ func (process *TeleportProcess) initAuthService() error { // // Ref: e/tool/teleport/process/process.go if !modules.GetModules().Features().Assist { - log.Debug("Skipping start of embedding processor: Assist feature not enabled for license") + logger.DebugContext(process.ExitContext(), "Skipping start of embedding processor: Assist feature not enabled for license") return nil } - log.Debugf("Starting embedding processor") + logger.DebugContext(process.ExitContext(), "Starting embedding processor") return embeddingProcessor.Run(process.GracefulExitContext(), embeddingInitialDelay, embeddingPeriod) }) } @@ -2034,7 +2033,7 @@ func (process *TeleportProcess) initAuthService() error { AccessPoint: authServer, MFAAuthenticator: authServer, LockWatcher: lockWatcher, - Logger: log, + Logger: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentAuth, process.id)), // Auth Server does explicit device authorization. // Various Auth APIs must allow access to unauthorized devices, otherwise it // is not possible to acquire device-aware certificates in the first place. @@ -2083,7 +2082,7 @@ func (process *TeleportProcess) initAuthService() error { } listener, err := process.importOrCreateListener(ListenerAuth, cfg.Auth.ListenAddr.Addr) if err != nil { - log.Errorf("PID: %v Failed to bind to address %v: %v, exiting.", os.Getpid(), cfg.Auth.ListenAddr.Addr, err) + logger.ErrorContext(process.ExitContext(), "Failed to bind to address, exiting.", "pid", os.Getpid(), "listen_address", cfg.Auth.ListenAddr.Addr, "error", err) return trace.Wrap(err) } @@ -2092,17 +2091,18 @@ func (process *TeleportProcess) initAuthService() error { authAddr := listener.Addr().String() // clean up unused descriptors passed for proxy, but not used by it - warnOnErr(process.closeImportedDescriptors(teleport.ComponentAuth), log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentAuth), logger) if cfg.Auth.PROXYProtocolMode == multiplexer.PROXYProtocolOn { - log.Info("Starting Auth service with external PROXY protocol support.") + logger.InfoContext(process.ExitContext(), "Starting Auth service with external PROXY protocol support.") } if cfg.Auth.PROXYProtocolMode == multiplexer.PROXYProtocolUnspecified { - log.Warn("'proxy_protocol' unspecified. " + + const warning = "'proxy_protocol' unspecified. " + "Starting Auth service with external PROXY protocol support, " + "but IP pinned connection affected by PROXY headers will not be allowed. " + "Set 'proxy_protocol: on' in 'auth_service' config if Auth service runs behind L4 load balancer with enabled " + - "PROXY protocol, or set 'proxy_protocol: off' otherwise") + "PROXY protocol, or set 'proxy_protocol: off' otherwise" + logger.WarnContext(process.ExitContext(), warning) } muxCAGetter := func(ctx context.Context, id types.CertAuthID, loadKeys bool) (types.CertAuthority, error) { @@ -2137,14 +2137,14 @@ func (process *TeleportProcess) initAuthService() error { return trace.Wrap(err) } process.RegisterCriticalFunc("auth.tls", func() error { - log.Infof("Auth service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, authAddr) + logger.InfoContext(process.ExitContext(), "Auth service is starting.", "version", teleport.Version, "git_ref", teleport.Gitref, "listen_address", authAddr) // since tlsServer.Serve is a blocking call, we emit this even right before // the service has started process.BroadcastEvent(Event{Name: AuthTLSReady, Payload: nil}) err := tlsServer.Serve() if err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Warningf("TLS server exited with error: %v.", err) + logger.WarnContext(process.ExitContext(), "TLS server exited with error.", "error", err) } return nil }) @@ -2184,12 +2184,12 @@ func (process *TeleportProcess) initAuthService() error { if net.ParseIP(host).IsUnspecified() { ip, err := utils.GuessHostIP() if err != nil { - log.Warn(err) + logger.WarnContext(process.ExitContext(), "failed guessing the host ip address", "error", err) } else { authAddr = net.JoinHostPort(ip.String(), port) } } - log.Warnf("Configuration setting auth_service/advertise_ip is not set. guessing %v.", authAddr) + logger.WarnContext(process.ExitContext(), "Configuration setting auth_service/advertise_ip is not set, using inferred address", "address", authAddr) } heartbeat, err := srv.NewHeartbeat(srv.HeartbeatConfig{ @@ -2214,7 +2214,7 @@ func (process *TeleportProcess) initAuthService() error { state, err := process.storage.GetState(process.GracefulExitContext(), types.RoleAdmin) if err != nil { if !trace.IsNotFound(err) { - log.Warningf("Failed to get rotation state: %v.", err) + logger.WarnContext(process.ExitContext(), "Failed to get rotation state.", "error", err) return nil, trace.Wrap(err) } } else { @@ -2244,42 +2244,41 @@ func (process *TeleportProcess) initAuthService() error { // the http server would have not started tracking the listeners // and http.Shutdown will do nothing. if mux != nil { - warnOnErr(mux.Close(), log) + warnOnErr(process.ExitContext(), mux.Close(), logger) } if listener != nil { - warnOnErr(listener.Close(), log) + warnOnErr(process.ExitContext(), listener.Close(), logger) } if payload == nil { - log.Info("Shutting down immediately.") - warnOnErr(tlsServer.Close(), log) + logger.InfoContext(process.ExitContext(), "Shutting down immediately.") + warnOnErr(process.ExitContext(), tlsServer.Close(), logger) } else { - ctx := payloadContext(payload, log) - log.Info("Shutting down immediately (auth service does not currently support graceful shutdown).") + ctx := payloadContext(payload) + logger.InfoContext(ctx, "Shutting down immediately (auth service does not currently support graceful shutdown).") // NOTE: Graceful shutdown of auth.TLSServer is disabled right now, because we don't // have a good model for performing it. In particular, watchers and other gRPC streams // are a problem. Even if we distinguish between user-created and server-created streams // (as is done with ssh connections), we don't have a way to distinguish "service accounts" // such as access workflow plugins from normal users. Without this, a graceful shutdown // of the auth server basically never exits. - warnOnErr(tlsServer.Close(), log) + warnOnErr(ctx, tlsServer.Close(), logger) if g, ok := authServer.Services.UsageReporter.(usagereporter.GracefulStopper); ok { if err := g.GracefulStop(ctx); err != nil { - log.WithError(err).Warn("Error while gracefully stopping usage reporter.") + logger.WarnContext(ctx, "Error while gracefully stopping usage reporter.", "error", err) } } } - log.Info("Exited.") + logger.InfoContext(process.ExitContext(), "Exited.") }) return nil } -func payloadContext(payload interface{}, log logrus.FieldLogger) context.Context { - ctx, ok := payload.(context.Context) - if ok { +func payloadContext(payload any) context.Context { + if ctx, ok := payload.(context.Context); ok { return ctx } - log.Errorf("Expected context, got %T.", payload) + return context.TODO() } @@ -2522,18 +2521,16 @@ func (process *TeleportProcess) initInstance() error { } process.RegisterWithAuthServer(types.RoleInstance, InstanceIdentityEvent) - log := process.log.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentInstance, process.id), - }) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentInstance, process.id)) process.RegisterCriticalFunc("instance.init", func() error { - conn, err := process.WaitForConnector(InstanceIdentityEvent, log) + conn, err := process.WaitForConnector(InstanceIdentityEvent, logger) if conn == nil { return trace.Wrap(err) } process.setInstanceConnector(conn) - log.Infof("Successfully registered instance client.") + logger.InfoContext(process.ExitContext(), "Successfully registered instance client.") process.BroadcastEvent(Event{Name: InstanceReady, Payload: nil}) return nil }) @@ -2545,9 +2542,7 @@ func (process *TeleportProcess) initInstance() error { func (process *TeleportProcess) initSSH() error { process.RegisterWithAuthServer(types.RoleNode, SSHIdentityEvent) - log := process.log.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentNode, process.id), - }) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentNode, process.id)) proxyGetter := reversetunnel.NewConnectedProxyGetter() @@ -2558,12 +2553,12 @@ func (process *TeleportProcess) initSSH() error { // being signaled to restart. var restartingOnGracefulShutdown bool - conn, err := process.WaitForConnector(SSHIdentityEvent, log) + conn, err := process.WaitForConnector(SSHIdentityEvent, logger) if conn == nil { return trace.Wrap(err) } - defer func() { warnOnErr(conn.Close(), log) }() + defer func() { warnOnErr(process.ExitContext(), conn.Close(), logger) }() cfg := process.Config @@ -2604,7 +2599,7 @@ func (process *TeleportProcess) initSSH() error { if err != nil { return trace.Wrap(err) } - defer func() { warnOnErr(ebpf.Close(restartingOnGracefulShutdown), log) }() + defer func() { warnOnErr(process.ExitContext(), ebpf.Close(restartingOnGracefulShutdown), logger) }() // make sure the default namespace is used if ns := cfg.SSH.Namespace; ns != "" && ns != apidefaults.Namespace { @@ -2622,8 +2617,9 @@ func (process *TeleportProcess) initSSH() error { } if auditd.IsLoginUIDSet() { - log.Warnf("Login UID is set, but it shouldn't be. Incorrect login UID breaks session ID when using auditd. " + - "Please make sure that Teleport runs as a daemon and any parent process doesn't set the login UID.") + const warn = "Login UID is set, but it shouldn't be. Incorrect login UID breaks session ID when using auditd. " + + "Please make sure that Teleport runs as a daemon and any parent process doesn't set the login UID." + logger.WarnContext(process.ExitContext(), warn) } // Provide helpful log message if listen_addr or public_addr are not being @@ -2634,10 +2630,10 @@ func (process *TeleportProcess) initSSH() error { // joining cluster directly or through a tunnel). if conn.UseTunnel() { if !cfg.SSH.Addr.IsEmpty() { - log.Info("Connected to cluster over tunnel connection, ignoring listen_addr setting.") + logger.InfoContext(process.ExitContext(), "Connected to cluster over tunnel connection, ignoring listen_addr setting.") } if len(cfg.SSH.PublicAddrs) > 0 { - log.Info("Connected to cluster over tunnel connection, ignoring public_addr setting.") + logger.InfoContext(process.ExitContext(), "Connected to cluster over tunnel connection, ignoring public_addr setting.") } } if !conn.UseTunnel() && cfg.SSH.Addr.IsEmpty() { @@ -2650,12 +2646,12 @@ func (process *TeleportProcess) initSSH() error { if err != nil { return trace.Wrap(err) } - defer func() { warnOnErr(asyncEmitter.Close(), log) }() + defer func() { warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) }() lockWatcher, err := services.NewLockWatcher(process.ExitContext(), services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentNode, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentNode, process.id)), Client: conn.Client, }, }) @@ -2677,7 +2673,7 @@ func (process *TeleportProcess) initSSH() error { LockEnforcer: lockWatcher, Emitter: &events.StreamerAndEmitter{Emitter: asyncEmitter, Streamer: conn.Client}, Component: teleport.ComponentNode, - Logger: process.log.WithField(trace.Component, "sessionctrl"), + Logger: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentNode, process.id)).WithField(trace.Component, "sessionctrl"), TracerProvider: process.TracingProvider, ServerID: serverID, }) @@ -2726,12 +2722,12 @@ func (process *TeleportProcess) initSSH() error { if err != nil { return trace.Wrap(err) } - defer func() { warnOnErr(s.Close(), log) }() + defer func() { warnOnErr(process.ExitContext(), s.Close(), logger) }() var resumableServer *resumption.SSHServerWrapper if os.Getenv("TELEPORT_UNSTABLE_DISABLE_SSH_RESUMPTION") == "" { resumableServer = resumption.NewSSHServerWrapper(resumption.SSHServerWrapperConfig{ - Log: log.WithField(trace.Component, teleport.Component(teleport.ComponentNode, resumption.Component)), + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentNode, resumption.Component)), SSHServer: s.HandleConnection, HostID: serverID, @@ -2740,7 +2736,7 @@ func (process *TeleportProcess) initSSH() error { go func() { if err := resumableServer.HandoverCleanup(process.GracefulExitContext()); err != nil { - log.WithError(err).Warn("Failed to clean up handover sockets.") + logger.WarnContext(process.ExitContext(), "Failed to clean up handover sockets.", "error", err) } }() } @@ -2752,9 +2748,9 @@ func (process *TeleportProcess) initSSH() error { return trace.Wrap(err) } // clean up unused descriptors passed for proxy, but not used by it - warnOnErr(process.closeImportedDescriptors(teleport.ComponentNode), log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentNode), logger) - log.Infof("Service %s:%s is starting on %v %v.", teleport.Version, teleport.Gitref, cfg.SSH.Addr.Addr, process.Config.CachePolicy) + logger.InfoContext(process.ExitContext(), "SSH Service is starting.", "version", teleport.Version, "git_ref", teleport.Gitref, "listen_address", cfg.SSH.Addr.Addr, "cache_policy", process.Config.CachePolicy) preDetect := resumption.PreDetectFixedSSHVersion(sshutils.SSHVersionPrefix) if resumableServer != nil { @@ -2818,7 +2814,7 @@ func (process *TeleportProcess) initSSH() error { if err != nil { return trace.Wrap(err) } - log.Infof("Service is starting in tunnel mode.") + logger.InfoContext(process.ExitContext(), "Service is starting in tunnel mode.") } // Broadcast that the node has started. @@ -2835,20 +2831,20 @@ func (process *TeleportProcess) initSSH() error { } if event.Payload == nil { - log.Infof("Shutting down immediately.") - warnOnErr(s.Close(), log) + logger.InfoContext(process.ExitContext(), "Shutting down immediately.") + warnOnErr(process.ExitContext(), s.Close(), logger) } else { - log.Infof("Shutting down gracefully.") - ctx := payloadContext(event.Payload, log) + logger.InfoContext(process.ExitContext(), "Shutting down gracefully.") + ctx := payloadContext(event.Payload) restartingOnGracefulShutdown = services.IsProcessReloading(ctx) || services.HasProcessForked(ctx) - warnOnErr(s.Shutdown(ctx), log) + warnOnErr(ctx, s.Shutdown(ctx), logger) } s.Wait() agentPool.Stop() agentPool.Wait() - log.Infof("Exited.") + logger.InfoContext(process.ExitContext(), "Exited.") return nil }) @@ -2865,7 +2861,7 @@ func (process *TeleportProcess) RegisterWithAuthServer(role types.SystemRole, ev if role.IsLocalService() && !process.instanceRoleExpected(role) { // if you hit this error, your probably forgot to call SetExpectedInstanceRole inside of // the registerExpectedServices function. - process.log.Errorf("Register called for unexpected instance role %q (this is a bug).", role) + process.logger.ErrorContext(process.ExitContext(), "Register called for unexpected instance role (this is a bug).", "role", role) } connector, err := process.reconnectToAuthService(role) @@ -2880,7 +2876,7 @@ func (process *TeleportProcess) RegisterWithAuthServer(role types.SystemRole, ev // waitForInstanceConnector waits for the instance connector to be ready, // logging a warning if this is taking longer than expected. -func waitForInstanceConnector(process *TeleportProcess, log *logrus.Entry) (*Connector, error) { +func waitForInstanceConnector(process *TeleportProcess, log *slog.Logger) (*Connector, error) { type r struct { c *Connector err error @@ -2902,8 +2898,7 @@ func waitForInstanceConnector(process *TeleportProcess, log *logrus.Entry) (*Con } return result.c, nil case <-t.C: - log.Warn("The Instance connector is still not available, process-wide services " + - "such as session uploading will not function") + log.WarnContext(process.ExitContext(), "The Instance connector is still not available, process-wide services such as session uploading will not function") } } } @@ -2912,9 +2907,8 @@ func waitForInstanceConnector(process *TeleportProcess, log *logrus.Entry) (*Con // (data/log/upload/streaming/default/) func (process *TeleportProcess) initUploaderService() error { component := teleport.Component(teleport.ComponentUpload, process.id) - log := process.log.WithFields(logrus.Fields{ - trace.Component: component, - }) + + logger := process.logger.With(trace.Component, component) var clusterName string @@ -2932,7 +2926,7 @@ func (process *TeleportProcess) initUploaderService() error { // so as a special case we can stop early if auth happens to be // the only service running in this process. if srs := process.getInstanceRoles(); len(srs) == 1 && srs[0] == types.RoleAuth { - log.Debug("this process only runs the auth service, no separate upload completer will run") + logger.DebugContext(process.ExitContext(), "this process only runs the auth service, no separate upload completer will run") return nil } @@ -2943,8 +2937,8 @@ func (process *TeleportProcess) initUploaderService() error { } clusterName = cn.GetClusterName() } else { - log.Debug("auth is not running in-process, waiting for instance connector") - conn, err := waitForInstanceConnector(process, log) + logger.DebugContext(process.ExitContext(), "auth is not running in-process, waiting for instance connector") + conn, err := waitForInstanceConnector(process, logger) if err != nil { return trace.Wrap(err) } @@ -2955,7 +2949,7 @@ func (process *TeleportProcess) initUploaderService() error { clusterName = conn.ServerIdentity.ClusterName } - log.Info("starting upload completer service") + logger.InfoContext(process.ExitContext(), "starting upload completer service") // create folder for uploads uid, gid, err := adminCreds() @@ -2971,14 +2965,14 @@ func (process *TeleportProcess) initUploaderService() error { for _, path := range paths { for i := 1; i < len(path); i++ { dir := filepath.Join(path[:i+1]...) - log.Infof("Creating directory %v.", dir) + logger.InfoContext(process.ExitContext(), "Creating directory.", "directory", dir) err := os.Mkdir(dir, 0o755) err = trace.ConvertSystemError(err) if err != nil && !trace.IsAlreadyExists(err) { return trace.Wrap(err) } if uid != nil && gid != nil { - log.Infof("Setting directory %v owner to %v:%v.", dir, *uid, *gid) + logger.InfoContext(process.ExitContext(), "Setting directory owner.", "directory", dir, "uid", *uid, "gid", *gid) err := os.Lchown(dir, *uid, *gid) if err != nil { return trace.ConvertSystemError(err) @@ -3003,16 +2997,16 @@ func (process *TeleportProcess) initUploaderService() error { process.RegisterFunc("fileuploader.service", func() error { err := fileUploader.Serve(process.ExitContext()) if err != nil { - log.WithError(err).Errorf("File uploader server exited with error.") + logger.ErrorContext(process.ExitContext(), "File uploader server exited with error.", "error", err) } return nil }) process.OnExit("fileuploader.shutdown", func(payload interface{}) { - log.Infof("File uploader is shutting down.") + logger.InfoContext(process.ExitContext(), "File uploader is shutting down.") fileUploader.Close() - log.Infof("File uploader has shut down.") + logger.InfoContext(process.ExitContext(), "File uploader has shut down.") }) // upload completer scans for uploads that have been initiated, but not completed @@ -3036,15 +3030,15 @@ func (process *TeleportProcess) initUploaderService() error { process.RegisterFunc("fileuploadcompleter.service", func() error { if err := uploadCompleter.Serve(process.ExitContext()); err != nil { - log.WithError(err).Errorf("File uploader server exited with error.") + logger.ErrorContext(process.ExitContext(), "File uploader server exited with error.", "error", err) } return nil }) process.OnExit("fileuploadcompleter.shutdown", func(payload interface{}) { - log.Infof("File upload completer is shutting down.") + logger.InfoContext(process.ExitContext(), "File upload completer is shutting down.", "error", err) uploadCompleter.Close() - log.Infof("File upload completer has shut down.") + logger.InfoContext(process.ExitContext(), "File upload completer has shut down.") }) return nil @@ -3056,15 +3050,13 @@ func (process *TeleportProcess) initMetricsService() error { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) - log := process.log.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentMetrics, process.id), - }) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentMetrics, process.id)) listener, err := process.importOrCreateListener(ListenerMetrics, process.Config.Metrics.ListenAddr.Addr) if err != nil { return trace.Wrap(err) } - warnOnErr(process.closeImportedDescriptors(teleport.ComponentMetrics), log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentMetrics), logger) tlsConfig := &tls.Config{} if process.Config.Metrics.MTLS { @@ -3112,30 +3104,29 @@ func (process *TeleportProcess) initMetricsService() error { ReadHeaderTimeout: defaults.ReadHeadersTimeout, WriteTimeout: apidefaults.DefaultIOTimeout, IdleTimeout: apidefaults.DefaultIdleTimeout, - ErrorLog: utils.NewStdlogger(log.Error, teleport.ComponentMetrics), TLSConfig: tlsConfig, } - log.Infof("Starting metrics service on %v.", process.Config.Metrics.ListenAddr.Addr) + logger.InfoContext(process.ExitContext(), "Starting metrics service.", "listen_address", process.Config.Metrics.ListenAddr.Addr) process.RegisterFunc("metrics.service", func() error { err := server.Serve(listener) if err != nil && err != http.ErrServerClosed { - log.Warningf("Metrics server exited with error: %v.", err) + logger.WarnContext(process.ExitContext(), "Metrics server exited with error.", "error", err) } return nil }) process.OnExit("metrics.shutdown", func(payload interface{}) { if payload == nil { - log.Infof("Shutting down immediately.") - warnOnErr(server.Close(), log) + logger.InfoContext(process.ExitContext(), "Shutting down immediately.") + warnOnErr(process.ExitContext(), server.Close(), logger) } else { - log.Infof("Shutting down gracefully.") - ctx := payloadContext(payload, log) - warnOnErr(server.Shutdown(ctx), log) + logger.InfoContext(process.ExitContext(), "Shutting down gracefully.") + ctx := payloadContext(payload) + warnOnErr(process.ExitContext(), server.Shutdown(ctx), logger) } - log.Infof("Exited.") + logger.InfoContext(process.ExitContext(), "Exited.") }) process.BroadcastEvent(Event{Name: MetricsReady, Payload: nil}) @@ -3155,7 +3146,7 @@ func (process *TeleportProcess) initDiagnosticService() error { } if process.Config.Debug { - process.log.Infof("Adding diagnostic debugging handlers. To connect with profiler, use `go tool pprof %v`.", process.Config.DiagnosticAddr.Addr) + process.logger.InfoContext(process.ExitContext(), "Adding diagnostic debugging handlers. To connect with profiler, use `go tool pprof `.", "listen_address", process.Config.DiagnosticAddr.Addr) mux.HandleFunc("/debug/pprof/", pprof.Index) mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) @@ -3168,9 +3159,7 @@ func (process *TeleportProcess) initDiagnosticService() error { roundtrip.ReplyJSON(w, http.StatusOK, map[string]interface{}{"status": "ok"}) }) - log := process.log.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentDiagnostic, process.id), - }) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentDiagnostic, process.id)) // Create a state machine that will process and update the internal state of // Teleport based off Events. Use this state machine to return return the @@ -3194,7 +3183,7 @@ func (process *TeleportProcess) initDiagnosticService() error { case e := <-eventCh: ps.update(e) case <-ctx.Done(): - log.Debugf("Teleport is exiting, returning.") + logger.DebugContext(process.ExitContext(), "Teleport is exiting, returning.") return nil } } @@ -3227,7 +3216,7 @@ func (process *TeleportProcess) initDiagnosticService() error { if err != nil { return trace.Wrap(err) } - warnOnErr(process.closeImportedDescriptors(teleport.ComponentDiagnostic), log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(teleport.ComponentDiagnostic), logger) server := &http.Server{ Handler: mux, @@ -3235,37 +3224,36 @@ func (process *TeleportProcess) initDiagnosticService() error { ReadHeaderTimeout: defaults.ReadHeadersTimeout, WriteTimeout: apidefaults.DefaultIOTimeout, IdleTimeout: apidefaults.DefaultIdleTimeout, - ErrorLog: utils.NewStdlogger(log.Error, teleport.ComponentDiagnostic), } - log.Infof("Starting diagnostic service on %v.", process.Config.DiagnosticAddr.Addr) + logger.InfoContext(process.ExitContext(), "Starting diagnostic service.", "listen_address", process.Config.DiagnosticAddr.Addr) process.RegisterFunc("diagnostic.service", func() error { err := server.Serve(listener) if err != nil && err != http.ErrServerClosed { - log.Warningf("Diagnostic server exited with error: %v.", err) + logger.WarnContext(process.ExitContext(), "Diagnostic server exited with error.", "error", err) } return nil }) process.OnExit("diagnostic.shutdown", func(payload interface{}) { if payload == nil { - log.Infof("Shutting down immediately.") - warnOnErr(server.Close(), log) + logger.InfoContext(process.ExitContext(), "Shutting down immediately.") + warnOnErr(process.ExitContext(), server.Close(), logger) } else { - log.Infof("Shutting down gracefully.") - ctx := payloadContext(payload, log) - warnOnErr(server.Shutdown(ctx), log) + logger.InfoContext(process.ExitContext(), "Shutting down gracefully.") + ctx := payloadContext(payload) + warnOnErr(process.ExitContext(), server.Shutdown(ctx), logger) } - log.Infof("Exited.") + logger.InfoContext(process.ExitContext(), "Exited.") }) return nil } func (process *TeleportProcess) initTracingService() error { - log := process.log.WithField(trace.Component, teleport.Component(teleport.ComponentTracing, process.id)) - log.Info("Initializing tracing provider and exporter.") + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentTracing, process.id)) + logger.InfoContext(process.ExitContext(), "Initializing tracing provider and exporter.") attrs := []attribute.KeyValue{ attribute.String(tracing.ProcessIDKey, process.id), @@ -3277,7 +3265,7 @@ func (process *TeleportProcess) initTracingService() error { if err != nil { return trace.Wrap(err) } - traceConf.Logger = log + traceConf.Logger = process.log.WithField(trace.Component, teleport.Component(teleport.ComponentTracing, process.id)) provider, err := tracing.NewTraceProvider(process.ExitContext(), *traceConf) if err != nil { @@ -3287,16 +3275,16 @@ func (process *TeleportProcess) initTracingService() error { process.OnExit("tracing.shutdown", func(payload interface{}) { if payload == nil { - log.Info("Shutting down immediately.") + logger.InfoContext(process.ExitContext(), "Shutting down immediately.") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - warnOnErr(provider.Shutdown(ctx), log) + warnOnErr(process.ExitContext(), provider.Shutdown(ctx), logger) } else { - log.Infof("Shutting down gracefully.") - ctx := payloadContext(payload, log) - warnOnErr(provider.Shutdown(ctx), log) + logger.InfoContext(process.ExitContext(), "Shutting down gracefully.") + ctx := payloadContext(payload) + warnOnErr(process.ExitContext(), provider.Shutdown(ctx), logger) } - process.log.Info("Exited.") + process.logger.InfoContext(process.ExitContext(), "Exited.") }) process.BroadcastEvent(Event{Name: TracingReady, Payload: nil}) @@ -3434,13 +3422,13 @@ func (process *TeleportProcess) initProxy() error { } process.RegisterWithAuthServer(types.RoleProxy, ProxyIdentityEvent) process.RegisterCriticalFunc("proxy.init", func() error { - conn, err := process.WaitForConnector(ProxyIdentityEvent, process.log) + conn, err := process.WaitForConnector(ProxyIdentityEvent, process.logger) if conn == nil { return trace.Wrap(err) } if err := process.initProxyEndpoint(conn); err != nil { - warnOnErr(conn.Close(), process.log) + warnOnErr(process.ExitContext(), conn.Close(), process.logger) return trace.Wrap(err) } @@ -3572,7 +3560,7 @@ func (l *dbListeners) Close() { // setupProxyListeners sets up web proxy listeners based on the configuration func (process *TeleportProcess) setupProxyListeners(networkingConfig types.ClusterNetworkingConfig, accessPoint auth.ProxyAccessPoint, clusterName string) (*proxyListeners, error) { cfg := process.Config - process.log.Debugf("Setup Proxy: Web Proxy Address: %v, Reverse Tunnel Proxy Address: %v", cfg.Proxy.WebAddr.Addr, cfg.Proxy.ReverseTunnelListenAddr.Addr) + process.logger.DebugContext(process.ExitContext(), "Setting up Proxy listeners", "web_address", cfg.Proxy.WebAddr.Addr, "tunnel_address", cfg.Proxy.ReverseTunnelListenAddr.Addr) var err error var listeners proxyListeners @@ -3608,7 +3596,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust } if cfg.Proxy.Kube.Enabled && !cfg.Proxy.Kube.ListenAddr.IsEmpty() { - process.log.Debugf("Setup Proxy: turning on Kubernetes proxy.") + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: turning on Kubernetes proxy.", "kube_address", cfg.Proxy.Kube.ListenAddr.Addr) listener, err := process.importOrCreateListener(ListenerProxyKube, cfg.Proxy.Kube.ListenAddr.Addr) if err != nil { return nil, trace.Wrap(err) @@ -3618,7 +3606,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust if !cfg.Proxy.DisableDatabaseProxy { if !cfg.Proxy.MySQLAddr.IsEmpty() { - process.log.Debugf("Setup Proxy: MySQL proxy address: %v.", cfg.Proxy.MySQLAddr.Addr) + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: turning on MySQL proxy.", "mysql_address", cfg.Proxy.MySQLAddr.Addr) listener, err := process.importOrCreateListener(ListenerProxyMySQL, cfg.Proxy.MySQLAddr.Addr) if err != nil { return nil, trace.Wrap(err) @@ -3627,7 +3615,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust } if !cfg.Proxy.MongoAddr.IsEmpty() { - process.log.Debugf("Setup Proxy: Mongo proxy address: %v.", cfg.Proxy.MongoAddr.Addr) + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: turning on Mongo proxy.", "mongo_address", cfg.Proxy.MongoAddr.Addr) listener, err := process.importOrCreateListener(ListenerProxyMongo, cfg.Proxy.MongoAddr.Addr) if err != nil { return nil, trace.Wrap(err) @@ -3636,7 +3624,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust } if !cfg.Proxy.PostgresAddr.IsEmpty() { - process.log.Debugf("Setup Proxy: Postgres proxy address: %v.", cfg.Proxy.PostgresAddr.Addr) + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: turning on Postgres proxy.", "postgres_address", cfg.Proxy.PostgresAddr.Addr) listener, err := process.importOrCreateListener(ListenerProxyPostgres, cfg.Proxy.PostgresAddr.Addr) if err != nil { return nil, trace.Wrap(err) @@ -3648,7 +3636,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust tunnelStrategy, err := networkingConfig.GetTunnelStrategyType() if err != nil { - process.log.WithError(err).Warn("Failed to get tunnel strategy. Falling back to agent mesh strategy.") + process.logger.WarnContext(process.ExitContext(), "Failed to get tunnel strategy. Falling back to agent mesh strategy.", "error", err) tunnelStrategy = types.AgentMesh } @@ -3673,10 +3661,10 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust switch { case cfg.Proxy.DisableWebService && cfg.Proxy.DisableReverseTunnel: - process.log.Debugf("Setup Proxy: Reverse tunnel proxy and web proxy are disabled.") + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: Reverse tunnel proxy and web proxy are disabled.") return &listeners, nil case cfg.Proxy.ReverseTunnelListenAddr == cfg.Proxy.WebAddr && !cfg.Proxy.DisableTLS: - process.log.Debugf("Setup Proxy: Reverse tunnel proxy and web proxy listen on the same port, multiplexing is on.") + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: Reverse tunnel proxy and web proxy listen on the same port, multiplexing is on.") listener, err := process.importOrCreateListener(ListenerProxyTunnelAndWeb, cfg.Proxy.WebAddr.Addr) if err != nil { return nil, trace.Wrap(err) @@ -3706,7 +3694,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust }() return &listeners, nil case cfg.Proxy.PROXYProtocolMode != multiplexer.PROXYProtocolOff && !cfg.Proxy.DisableWebService && !cfg.Proxy.DisableTLS: - process.log.Debug("Setup Proxy: PROXY protocol is enabled for web service, multiplexing is on.") + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: PROXY protocol is enabled for web service, multiplexing is on.") listener, err := process.importOrCreateListener(ListenerProxyWeb, cfg.Proxy.WebAddr.Addr) if err != nil { return nil, trace.Wrap(err) @@ -3738,7 +3726,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust }() return &listeners, nil default: - process.log.Debug("Setup Proxy: Proxy and reverse tunnel are listening on separate ports.") + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: Proxy and reverse tunnel are listening on separate ports.") if !cfg.Proxy.DisableReverseTunnel && !cfg.Proxy.ReverseTunnelListenAddr.IsEmpty() { if cfg.Proxy.DisableWebService { listeners.reverseTunnel, err = process.importOrCreateListener(ListenerProxyTunnel, cfg.Proxy.ReverseTunnelListenAddr.Addr) @@ -3763,7 +3751,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust // only done by tests and not exposed via file config), the web // listener is multiplexing both web and db client connections. if !cfg.Proxy.DisableDatabaseProxy && !cfg.Proxy.DisableTLS { - process.log.Debug("Setup Proxy: Multiplexing web and database proxy on the same port.") + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: Multiplexing web and database proxy on the same port.") listeners.mux, err = multiplexer.New(multiplexer.Config{ PROXYProtocolMode: cfg.Proxy.PROXYProtocolMode, Listener: listener, @@ -3784,7 +3772,7 @@ func (process *TeleportProcess) setupProxyListeners(networkingConfig types.Clust } }() } else { - process.log.Debug("Setup Proxy: TLS is disabled, multiplexing is off.") + process.logger.DebugContext(process.ExitContext(), "Setup Proxy: TLS is disabled, multiplexing is off.") listeners.web = listener } } @@ -3819,7 +3807,7 @@ func (process *TeleportProcess) initMinimalReverseTunnelListener(cfg *servicecfg listeners.reverseTunnel = listeners.reverseTunnelMux.SSH() go func() { if err := listeners.reverseTunnelMux.Serve(); err != nil { - process.log.WithError(err).Debug("Minimal reverse tunnel mux exited with error") + process.logger.DebugContext(process.ExitContext(), "Minimal reverse tunnel mux exited with error", "error", err) } }() listeners.minimalWeb = listeners.reverseTunnelMux.TLS() @@ -3838,7 +3826,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { // clean up unused descriptors passed for proxy, but not used by it defer func() { if err := process.closeImportedDescriptors(teleport.ComponentProxy); err != nil { - process.log.Warnf("Failed closing imported file descriptors: %v", err) + process.logger.WarnContext(process.ExitContext(), "Failed closing imported file descriptors", "error", err) } }() var err error @@ -3885,9 +3873,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { proxySSHAddr.Addr = listeners.ssh.Addr().String() } - log := process.log.WithFields(logrus.Fields{ - trace.Component: teleport.Component(teleport.ComponentReverseTunnelServer, process.id), - }) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)) // asyncEmitter makes sure that sessions do not block // in case if connections are slow @@ -4019,9 +4005,9 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { return trace.Wrap(err) } process.RegisterCriticalFunc("proxy.reversetunnel.server", func() error { - log.Infof("Starting %s:%s on %v using %v", teleport.Version, teleport.Gitref, cfg.Proxy.ReverseTunnelListenAddr.Addr, process.Config.CachePolicy) + logger.InfoContext(process.ExitContext(), "Starting reverse tunnel server", "version", teleport.Version, "git_ref", teleport.Gitref, "listen_address", cfg.Proxy.ReverseTunnelListenAddr.Addr, "cache_policy", process.Config.CachePolicy) if err := tsrv.Start(); err != nil { - log.Error(err) + logger.ErrorContext(process.ExitContext(), "Failed starting reverse tunnel server", "error", err) return trace.Wrap(err) } @@ -4179,11 +4165,11 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { listeners.db.tls = listeners.tls.DB() process.RegisterCriticalFunc("proxy.tls", func() error { - log.Infof("TLS multiplexer is starting on %v.", cfg.Proxy.WebAddr.Addr) + logger.InfoContext(process.ExitContext(), "TLS multiplexer is starting.", "listen_address", cfg.Proxy.WebAddr.Addr) if err := listeners.tls.Serve(); !trace.IsConnectionProblem(err) { - log.WithError(err).Warn("TLS multiplexer error.") + logger.WarnContext(process.ExitContext(), "TLS multiplexer error.", "error", err) } - log.Info("TLS multiplexer exited.") + logger.InfoContext(process.ExitContext(), "TLS multiplexer exited.") return nil }) } @@ -4200,7 +4186,6 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { // will break some application access use-cases. ReadHeaderTimeout: defaults.ReadHeadersTimeout, IdleTimeout: apidefaults.DefaultIdleTimeout, - ErrorLog: utils.NewStdlogger(log.Error, teleport.ComponentProxy), ConnState: ingress.HTTPConnStateReporter(ingress.Web, ingressReporter), ConnContext: func(ctx context.Context, c net.Conn) context.Context { ctx = authz.ContextWithConn(ctx, c) @@ -4208,30 +4193,30 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { }, }, Handler: webHandler, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), }) if err != nil { return trace.Wrap(err) } process.RegisterCriticalFunc("proxy.web", func() error { - log.Infof("Web proxy service %s:%s is starting on %v.", teleport.Version, teleport.Gitref, cfg.Proxy.WebAddr.Addr) + logger.InfoContext(process.ExitContext(), "Starting web proxy service.", "version", teleport.Version, "git_ref", teleport.Gitref, "listen_address", cfg.Proxy.WebAddr.Addr) defer webHandler.Close() process.BroadcastEvent(Event{Name: ProxyWebServerReady, Payload: webHandler}) if err := webServer.Serve(listeners.web); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, http.ErrServerClosed) { - log.Warningf("Error while serving web requests: %v", err) + logger.WarnContext(process.ExitContext(), "Error while serving web requests", "error", err) } - log.Info("Exited.") + logger.InfoContext(process.ExitContext(), "Exited.") return nil }) if listeners.reverseTunnelMux != nil { - if minimalWebServer, err = process.initMinimalReverseTunnel(listeners, tlsConfigWeb, cfg, webConfig, log); err != nil { + if minimalWebServer, err = process.initMinimalReverseTunnel(listeners, tlsConfigWeb, cfg, webConfig, process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id))); err != nil { return trace.Wrap(err) } } } else { - log.Info("Web UI is disabled.") + logger.InfoContext(process.ExitContext(), "Web UI is disabled.") } // Register ALPN handler that will be accepting connections for plain @@ -4256,7 +4241,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { Listener: listeners.proxyPeer, TLSConfig: serverTLSConfig, ClusterDialer: clusterdial.NewClusterDialer(tsrv), - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), ClusterName: clusterName, }) if err != nil { @@ -4265,11 +4250,11 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { process.RegisterCriticalFunc("proxy.peer", func() error { if _, err := process.WaitForEvent(process.ExitContext(), ProxyReverseTunnelReady); err != nil { - log.Debugf("Process exiting: failed to start peer proxy service waiting for reverse tunnel server") + logger.DebugContext(process.ExitContext(), "Process exiting: failed to start peer proxy service waiting for reverse tunnel server") return nil } - log.Infof("Peer proxy service is starting on %s", listeners.proxyPeer.Addr().String()) + logger.InfoContext(process.ExitContext(), "Starting peer proxy service", "listen_address", logutils.StringerAttr(listeners.proxyPeer.Addr())) err := proxyServer.Serve() if err != nil { return trace.Wrap(err) @@ -4287,7 +4272,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { staticLabels[types.ProxyGroupGenerationLabel] = strconv.FormatUint(cfg.Proxy.ProxyGroupGeneration, 10) } if len(staticLabels) > 0 { - log.Infof("Enabling proxy group labels: group ID = %q, generation = %v.", cfg.Proxy.ProxyGroupID, cfg.Proxy.ProxyGroupGeneration) + logger.InfoContext(process.ExitContext(), "Enabling proxy group labels.", "group_id", cfg.Proxy.ProxyGroupID, "generation", cfg.Proxy.ProxyGroupGeneration) } sshProxy, err := regular.New( @@ -4331,7 +4316,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { ClusterName: clusterName, AccessPoint: accessPoint, LockWatcher: lockWatcher, - Logger: log, + Logger: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), }) if err != nil { return trace.Wrap(err) @@ -4350,7 +4335,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { tlscfg.InsecureSkipVerify = true tlscfg.ClientAuth = tls.RequireAnyClientCert } - tlscfg.GetConfigForClient = auth.WithClusterCAs(tlscfg, accessPoint, clusterName, log) + tlscfg.GetConfigForClient = auth.WithClusterCAs(tlscfg, accessPoint, clusterName, process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id))) creds, err := auth.NewTransportCredentials(auth.TransportCredentialsConfig{ TransportCredentials: credentials.NewTLS(tlscfg), @@ -4411,19 +4396,19 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { if cfg.Proxy.SSHAddr.Addr != "" { sshListenerAddr = cfg.Proxy.SSHAddr.Addr } - log.Infof("SSH proxy service %s:%s is starting on %v", teleport.Version, teleport.Gitref, sshListenerAddr) + logger.InfoContext(process.ExitContext(), " Stating SSH proxy service", "version", teleport.Version, "git_ref", teleport.Gitref, "listen_address", sshListenerAddr) // start ssh server go func() { if err := sshProxy.Serve(proxyLimiter.WrapListener(listeners.ssh)); err != nil && !utils.IsOKNetworkError(err) { - log.WithError(err).Error("SSH proxy server terminated unexpectedly") + logger.ErrorContext(process.ExitContext(), "SSH proxy server terminated unexpectedly", "error", err) } }() // start grpc server go func() { if err := sshGRPCServer.Serve(proxyLimiter.WrapListener(listeners.sshGRPC)); err != nil && !utils.IsOKNetworkError(err) && !errors.Is(err, grpc.ErrServerStopped) { - log.WithError(err).Error("SSH gRPC server terminated unexpectedly") + logger.ErrorContext(process.ExitContext(), "SSH gRPC server terminated unexpectedly", "error", err) } }() @@ -4472,7 +4457,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { ClusterName: clusterName, AccessPoint: accessPoint, LockWatcher: lockWatcher, - Logger: log, + Logger: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), }) if err != nil { return trace.Wrap(err) @@ -4494,7 +4479,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { kubeServerWatcher, err := services.NewKubeServerWatcher(process.ExitContext(), services.KubeServerWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: component, - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), Client: accessPoint, }, }) @@ -4534,7 +4519,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { AccessPoint: accessPoint, GetRotation: process.GetRotation, OnHeartbeat: process.OnHeartbeat(component), - Log: log, + Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), IngressReporter: ingressReporter, KubernetesServersWatcher: kubeServerWatcher, PROXYProtocolMode: cfg.Proxy.PROXYProtocolMode, @@ -4543,15 +4528,13 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { return trace.Wrap(err) } process.RegisterCriticalFunc("proxy.kube", func() error { - log := logrus.WithFields(logrus.Fields{ - trace.Component: component, - }) + logger := process.logger.With(trace.Component, component) kubeListenAddr := listeners.kube.Addr().String() if cfg.Proxy.Kube.ListenAddr.Addr != "" { kubeListenAddr = cfg.Proxy.Kube.ListenAddr.Addr } - log.Infof("Starting Kube proxy on %v.", kubeListenAddr) + logger.InfoContext(process.ExitContext(), "Starting Kube proxy.", "listen_address", kubeListenAddr) var mopts []kubeproxy.ServeOption if cfg.Testing.KubeMultiplexerIgnoreSelfConnections { @@ -4560,7 +4543,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { err := kubeServer.Serve(listeners.kube, mopts...) if err != nil && err != http.ErrServerClosed { - log.Warningf("Kube TLS server exited with error: %v.", err) + logger.WarnContext(process.ExitContext(), "Kube TLS server exited with error.", "error", err) } return nil }) @@ -4575,7 +4558,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { ClusterName: clusterName, AccessPoint: accessPoint, LockWatcher: lockWatcher, - Logger: log, + Logger: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentReverseTunnelServer, process.id)), }) if err != nil { return trace.Wrap(err) @@ -4645,30 +4628,30 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { }) } - log := process.log.WithField(trace.Component, teleport.Component(teleport.ComponentDatabase)) + logger := process.logger.With(trace.Component, teleport.Component(teleport.ComponentDatabase)) if listeners.db.postgres != nil { process.RegisterCriticalFunc("proxy.db.postgres", func() error { - log.Infof("Starting Database Postgres proxy server on %v.", listeners.db.postgres.Addr()) + logger.InfoContext(process.ExitContext(), "Starting Database Postgres proxy server.", "listen_address", listeners.db.postgres.Addr()) if err := dbProxyServer.ServePostgres(listeners.db.postgres); err != nil { - log.WithError(err).Warn("Postgres proxy server exited with error.") + logger.WarnContext(process.ExitContext(), "Postgres proxy server exited with error.", "error", err) } return nil }) } if listeners.db.mysql != nil { process.RegisterCriticalFunc("proxy.db.mysql", func() error { - log.Infof("Starting MySQL proxy server on %v.", cfg.Proxy.MySQLAddr.Addr) + logger.InfoContext(process.ExitContext(), "Starting MySQL proxy server.", "listen_address", cfg.Proxy.MySQLAddr.Addr) if err := dbProxyServer.ServeMySQL(listeners.db.mysql); err != nil { - log.WithError(err).Warn("MySQL proxy server exited with error.") + logger.WarnContext(process.ExitContext(), "MySQL proxy server exited with error.", "error", err) } return nil }) } if listeners.db.tls != nil { process.RegisterCriticalFunc("proxy.db.tls", func() error { - log.Infof("Starting Database TLS proxy server on %v.", cfg.Proxy.WebAddr.Addr) + logger.InfoContext(process.ExitContext(), "Starting Database TLS proxy server.", "listen_address", cfg.Proxy.WebAddr.Addr) if err := dbProxyServer.ServeTLS(listeners.db.tls); err != nil { - log.WithError(err).Warn("Database TLS proxy server exited with error.") + logger.WarnContext(process.ExitContext(), "Database TLS proxy server exited with error.", "error", err) } return nil }) @@ -4676,9 +4659,9 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { if listeners.db.mongo != nil { process.RegisterCriticalFunc("proxy.db.mongo", func() error { - log.Infof("Starting Database Mongo proxy server on %v.", cfg.Proxy.MongoAddr.Addr) + logger.InfoContext(process.ExitContext(), "Starting Database Mongo proxy server.", "listen_address", cfg.Proxy.MongoAddr.Addr) if err := dbProxyServer.ServeMongo(listeners.db.mongo, tlsConfigWeb.Clone()); err != nil { - log.WithError(err).Warn("Database Mongo proxy server exited with error.") + logger.WarnContext(process.ExitContext(), "Database Mongo proxy server exited with error.", "error", err) } return nil }) @@ -4743,9 +4726,9 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { alpnHandlerForWeb.Set(alpnServer.MakeConnectionHandler(alpnTLSConfigForWeb)) process.RegisterCriticalFunc("proxy.tls.alpn.sni.proxy", func() error { - log.Infof("Starting TLS ALPN SNI proxy server on %v.", listeners.alpn.Addr()) + logger.InfoContext(process.ExitContext(), "Starting TLS ALPN SNI proxy server on.", "listen_address", logutils.StringerAttr(listeners.alpn.Addr())) if err := alpnServer.Serve(process.ExitContext()); err != nil { - log.WithError(err).Warn("TLS ALPN SNI proxy proxy server exited with error.") + logger.WarnContext(process.ExitContext(), "TLS ALPN SNI proxy proxy server exited with error.", "error", err) } return nil }) @@ -4764,9 +4747,9 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { } process.RegisterCriticalFunc("proxy.tls.alpn.sni.proxy.reverseTunnel", func() error { - log.Infof("Starting TLS ALPN SNI reverse tunnel proxy server on %v.", listeners.reverseTunnelALPN.Addr()) + logger.InfoContext(process.ExitContext(), "Starting TLS ALPN SNI reverse tunnel proxy server.", "listen_address", listeners.reverseTunnelALPN.Addr()) if err := reverseTunnelALPNServer.Serve(process.ExitContext()); err != nil { - log.WithError(err).Warn("TLS ALPN SNI proxy proxy on reverse tunnel server exited with error.") + logger.WarnContext(process.ExitContext(), "TLS ALPN SNI proxy proxy on reverse tunnel server exited with error.", "error", err) } return nil }) @@ -4780,27 +4763,27 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { // halfway through a shutdown, and double closing a listener is fine. listeners.Close() if payload == nil { - log.Infof("Shutting down immediately.") + logger.InfoContext(process.ExitContext(), "Shutting down immediately.") if tsrv != nil { - warnOnErr(tsrv.Close(), log) + warnOnErr(process.ExitContext(), tsrv.Close(), logger) } - warnOnErr(rcWatcher.Close(), log) + warnOnErr(process.ExitContext(), rcWatcher.Close(), logger) if proxyServer != nil { - warnOnErr(proxyServer.Close(), log) + warnOnErr(process.ExitContext(), proxyServer.Close(), logger) } if webServer != nil { - warnOnErr(webServer.Close(), log) + warnOnErr(process.ExitContext(), webServer.Close(), logger) } if minimalWebServer != nil { - warnOnErr(minimalWebServer.Close(), log) + warnOnErr(process.ExitContext(), minimalWebServer.Close(), logger) } if peerClient != nil { - warnOnErr(peerClient.Stop(), log) + warnOnErr(process.ExitContext(), peerClient.Stop(), logger) } - warnOnErr(sshProxy.Close(), log) + warnOnErr(process.ExitContext(), sshProxy.Close(), logger) sshGRPCServer.Stop() if kubeServer != nil { - warnOnErr(kubeServer.Close(), log) + warnOnErr(process.ExitContext(), kubeServer.Close(), logger) } if grpcServerPublic != nil { grpcServerPublic.Stop() @@ -4809,37 +4792,37 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { grpcServerMTLS.Stop() } if alpnServer != nil { - warnOnErr(alpnServer.Close(), log) + warnOnErr(process.ExitContext(), alpnServer.Close(), logger) } if reverseTunnelALPNServer != nil { - warnOnErr(reverseTunnelALPNServer.Close(), log) + warnOnErr(process.ExitContext(), reverseTunnelALPNServer.Close(), logger) } } else { - log.Infof("Shutting down gracefully.") - ctx := payloadContext(payload, log) + logger.InfoContext(process.ExitContext(), "Shutting down gracefully.") + ctx := payloadContext(payload) if tsrv != nil { - warnOnErr(tsrv.DrainConnections(ctx), log) + warnOnErr(ctx, tsrv.DrainConnections(ctx), logger) } - warnOnErr(sshProxy.Shutdown(ctx), log) + warnOnErr(ctx, sshProxy.Shutdown(ctx), logger) sshGRPCServer.GracefulStop() if webServer != nil { - warnOnErr(webServer.Shutdown(ctx), log) + warnOnErr(ctx, webServer.Shutdown(ctx), logger) } if minimalWebServer != nil { - warnOnErr(minimalWebServer.Shutdown(ctx), log) + warnOnErr(ctx, minimalWebServer.Shutdown(ctx), logger) } if tsrv != nil { - warnOnErr(tsrv.Shutdown(ctx), log) + warnOnErr(ctx, tsrv.Shutdown(ctx), logger) } - warnOnErr(rcWatcher.Close(), log) + warnOnErr(ctx, rcWatcher.Close(), logger) if proxyServer != nil { - warnOnErr(proxyServer.Shutdown(), log) + warnOnErr(ctx, proxyServer.Shutdown(), logger) } if peerClient != nil { peerClient.Shutdown() } if kubeServer != nil { - warnOnErr(kubeServer.Shutdown(ctx), log) + warnOnErr(ctx, kubeServer.Shutdown(ctx), logger) } if grpcServerPublic != nil { grpcServerPublic.GracefulStop() @@ -4848,10 +4831,10 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { grpcServerMTLS.GracefulStop() } if alpnServer != nil { - warnOnErr(alpnServer.Close(), log) + warnOnErr(ctx, alpnServer.Close(), logger) } if reverseTunnelALPNServer != nil { - warnOnErr(reverseTunnelALPNServer.Close(), log) + warnOnErr(ctx, reverseTunnelALPNServer.Close(), logger) } // Explicitly deleting proxy heartbeats helps the behavior of @@ -4860,16 +4843,16 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { if services.ShouldDeleteServerHeartbeatsOnShutdown(ctx) { if err := conn.Client.DeleteProxy(ctx, process.Config.HostUUID); err != nil { if !trace.IsNotFound(err) { - log.WithError(err).Warn("Failed to delete heartbeat.") + logger.WarnContext(ctx, "Failed to delete heartbeat.", "error", err) } else { - log.WithError(err).Debug("Failed to delete heartbeat.") + logger.DebugContext(ctx, "Failed to delete heartbeat.", "error", err) } } } } - warnOnErr(asyncEmitter.Close(), log) - warnOnErr(conn.Close(), log) - log.Infof("Exited.") + warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) + warnOnErr(process.ExitContext(), conn.Close(), logger) + logger.InfoContext(process.ExitContext(), "Exited.") }) return nil @@ -5183,7 +5166,7 @@ func (process *TeleportProcess) waitForAppDepend() { for _, event := range appDependEvents { _, err := process.WaitForEvent(process.ExitContext(), event) if err != nil { - process.log.Debugf("Process is exiting.") + process.logger.DebugContext(process.ExitContext(), "Process is exiting.") break } } @@ -5257,10 +5240,10 @@ func (process *TeleportProcess) initApps() { // Define logger to prefix log lines with the name of the component and PID. component := teleport.Component(teleport.ComponentApp, process.id) - log := process.log.WithField(trace.Component, component) + logger := process.logger.With(trace.Component, component) process.RegisterCriticalFunc("apps.start", func() error { - conn, err := process.WaitForConnector(AppsIdentityEvent, log) + conn, err := process.WaitForConnector(AppsIdentityEvent, logger) if conn == nil { return trace.Wrap(err) } @@ -5268,7 +5251,7 @@ func (process *TeleportProcess) initApps() { shouldSkipCleanup := false defer func() { if !shouldSkipCleanup { - warnOnErr(conn.Close(), log) + warnOnErr(process.ExitContext(), conn.Close(), logger) } }() @@ -5300,9 +5283,9 @@ func (process *TeleportProcess) initApps() { } // Block and wait for all dependencies to start before starting. - log.Debugf("Waiting for application service dependencies to start.") + logger.DebugContext(process.ExitContext(), "Waiting for application service dependencies to start.") process.waitForAppDepend() - log.Debugf("Application service dependencies have started, continuing.") + logger.DebugContext(process.ExitContext(), "Application service dependencies have started, continuing.") } clusterName := conn.ServerIdentity.ClusterName @@ -5380,7 +5363,7 @@ func (process *TeleportProcess) initApps() { lockWatcher, err := services.NewLockWatcher(process.ExitContext(), services.LockWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentApp, - Log: log, + Log: process.log.WithField(trace.Component, component), Client: conn.Client, }, }) @@ -5391,7 +5374,7 @@ func (process *TeleportProcess) initApps() { ClusterName: clusterName, AccessPoint: accessPoint, LockWatcher: lockWatcher, - Logger: log, + Logger: process.log.WithField(trace.Component, component), DeviceAuthorization: authz.DeviceAuthorizationOpts{ // Ignore the global device_trust.mode toggle, but allow role-based // settings to be applied. @@ -5412,7 +5395,7 @@ func (process *TeleportProcess) initApps() { } defer func() { if !shouldSkipCleanup { - warnOnErr(asyncEmitter.Close(), log) + warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) } }() @@ -5457,7 +5440,7 @@ func (process *TeleportProcess) initApps() { defer func() { if !shouldSkipCleanup { - warnOnErr(appServer.Close(), log) + warnOnErr(process.ExitContext(), appServer.Close(), logger) } }() @@ -5491,7 +5474,7 @@ func (process *TeleportProcess) initApps() { } process.BroadcastEvent(Event{Name: AppsReady, Payload: nil}) - log.Infof("All applications successfully started.") + logger.InfoContext(process.ExitContext(), "All applications successfully started.") // Cancel deferred cleanup actions, because we're going // to register an OnExit handler to take care of it @@ -5500,19 +5483,19 @@ func (process *TeleportProcess) initApps() { // Execute this when process is asked to exit. process.OnExit("apps.stop", func(payload interface{}) { if payload == nil { - log.Infof("Shutting down immediately.") - warnOnErr(appServer.Close(), log) + logger.InfoContext(process.ExitContext(), "Shutting down immediately.") + warnOnErr(process.ExitContext(), appServer.Close(), logger) } else { - log.Infof("Shutting down gracefully.") - warnOnErr(appServer.Shutdown(payloadContext(payload, log)), log) + logger.InfoContext(process.ExitContext(), "Shutting down gracefully.") + warnOnErr(process.ExitContext(), appServer.Shutdown(payloadContext(payload)), logger) } if asyncEmitter != nil { - warnOnErr(asyncEmitter.Close(), log) + warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) } agentPool.Stop() - warnOnErr(asyncEmitter.Close(), log) - warnOnErr(conn.Close(), log) - log.Infof("Exited.") + warnOnErr(process.ExitContext(), asyncEmitter.Close(), logger) + warnOnErr(process.ExitContext(), conn.Close(), logger) + logger.InfoContext(process.ExitContext(), "Exited.") }) // Block and wait while the server and agent pool are running. @@ -5524,21 +5507,21 @@ func (process *TeleportProcess) initApps() { }) } -func warnOnErr(err error, log logrus.FieldLogger) { +func warnOnErr(ctx context.Context, err error, log *slog.Logger) { if err != nil { // don't warn on double close, happens sometimes when // calling accept on a closed listener if utils.IsOKNetworkError(err) { return } - log.WithError(err).Warn("Got error while cleaning up.") + log.WarnContext(ctx, "Got error while cleaning up.", "error", err) } } // initAuthStorage initializes the storage backend for the auth service. func (process *TeleportProcess) initAuthStorage() (backend.Backend, error) { ctx := context.TODO() - process.log.Debugf("Using %v backend.", process.Config.Auth.StorageConfig.Type) + process.logger.DebugContext(process.ExitContext(), "Initializing auth backend.", "backend", process.Config.Auth.StorageConfig.Type) bc := process.Config.Auth.StorageConfig bk, err := backend.New(ctx, bc.Type, bc.Params) if err != nil { @@ -5569,7 +5552,7 @@ func (process *TeleportProcess) WaitWithContext(ctx context.Context) { go func() { defer cancel() if err := process.Supervisor.Wait(); err != nil { - process.log.Warnf("Error waiting for all services to complete: %v", err) + process.logger.WarnContext(process.ExitContext(), "Error waiting for all services to complete", "error", err) } }() @@ -5583,8 +5566,8 @@ func (process *TeleportProcess) StartShutdown(ctx context.Context) context.Conte // the only potential imported file descriptor that's not a listening // socket, so closing every imported FD with a prefix of "" will close all // imported listeners that haven't been used so far - warnOnErr(process.closeImportedDescriptors(""), process.log) - warnOnErr(process.stopListeners(), process.log) + warnOnErr(process.ExitContext(), process.closeImportedDescriptors(""), process.logger) + warnOnErr(process.ExitContext(), process.stopListeners(), process.logger) // populate context values if process.forkedTeleportCount.Load() > 0 { @@ -5596,19 +5579,19 @@ func (process *TeleportProcess) StartShutdown(ctx context.Context) context.Conte go func() { defer cancel() if err := process.Supervisor.Wait(); err != nil { - process.log.Warnf("Error waiting for all services to complete: %v", err) + process.logger.WarnContext(process.ExitContext(), "Error waiting for all services to complete", "error", err) } - process.log.Debug("All supervisor functions are completed.") + process.logger.DebugContext(process.ExitContext(), "All supervisor functions are completed.") if localAuth := process.getLocalAuth(); localAuth != nil { if err := localAuth.Close(); err != nil { - process.log.Warningf("Failed closing auth server: %v.", err) + process.logger.WarnContext(process.ExitContext(), "Failed closing auth server.", "error", err) } } if process.storage != nil { if err := process.storage.Close(); err != nil { - process.log.Warningf("Failed closing process storage: %v.", err) + process.logger.WarnContext(process.ExitContext(), "Failed closing process storage.", "error", err) } } @@ -5626,7 +5609,7 @@ func (process *TeleportProcess) Shutdown(ctx context.Context) { localCtx := process.StartShutdown(ctx) // wait until parent context closes <-localCtx.Done() - process.log.Debug("Process completed.") + process.logger.DebugContext(ctx, "Process completed.") } // Close broadcasts close signals and exits immediately @@ -5716,7 +5699,7 @@ func (process *TeleportProcess) initDebugApp() { process.OnExit("debug.app.shutdown", func(payload interface{}) { server.Close() - process.log.Infof("Exited.") + process.logger.InfoContext(process.ExitContext(), "Exited.") }) return nil }) @@ -6019,7 +6002,7 @@ func (process *TeleportProcess) initPublicGRPCServer( joinServiceServer := joinserver.NewJoinServiceGRPCServer(conn.Client) proto.RegisterJoinServiceServer(server, joinServiceServer) process.RegisterCriticalFunc("proxy.grpc.public", func() error { - process.log.Infof("Starting proxy gRPC server on %v.", listener.Addr()) + process.logger.InfoContext(process.ExitContext(), "Starting proxy gRPC server.", "listen_address", listener.Addr()) return trace.Wrap(server.Serve(listener)) }) return server @@ -6094,7 +6077,7 @@ func (process *TeleportProcess) initSecureGRPCServer(cfg initSecureGRPCServerCfg kubeproto.RegisterKubeServiceServer(server, kubeServer) process.RegisterCriticalFunc("proxy.grpc.secure", func() error { - process.log.Infof("Starting proxy gRPC server on %v.", cfg.listener.Addr()) + process.logger.InfoContext(process.ExitContext(), "Starting proxy gRPC server.", "listen_address", cfg.listener.Addr()) return trace.Wrap(server.Serve(cfg.listener)) }) return server, nil diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 71abbb5004c30..8e8a0d327f5af 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -23,6 +23,7 @@ import ( "crypto/tls" "errors" "fmt" + "log/slog" "net" "net/http" "os" @@ -205,12 +206,12 @@ func TestDynamicClientReuse(t *testing.T) { t.Cleanup(func() { require.NoError(t, process.Close()) }) // wait for instance connector - iconn, err := process.WaitForConnector(InstanceIdentityEvent, process.log) + iconn, err := process.WaitForConnector(InstanceIdentityEvent, process.logger) require.NoError(t, err) require.NotNil(t, iconn) // wait for proxy connector - pconn, err := process.WaitForConnector(ProxyIdentityEvent, process.log) + pconn, err := process.WaitForConnector(ProxyIdentityEvent, process.logger) require.NoError(t, err) require.NotNil(t, pconn) @@ -222,7 +223,7 @@ func TestDynamicClientReuse(t *testing.T) { // configued set. process.RegisterWithAuthServer(types.RoleNode, SSHIdentityEvent) - nconn, err := process.WaitForConnector(SSHIdentityEvent, process.log) + nconn, err := process.WaitForConnector(SSHIdentityEvent, process.logger) require.NoError(t, err) require.NotNil(t, nconn) @@ -406,7 +407,7 @@ func TestServiceCheckPrincipals(t *testing.T) { }, } for i, tt := range tests { - ok := checkServerIdentity(testConnector, tt.inPrincipals, tt.inDNS, logrus.New().WithField("test", "TestServiceCheckPrincipals")) + ok := checkServerIdentity(context.TODO(), testConnector, tt.inPrincipals, tt.inDNS, slog.Default().With("test", "TestServiceCheckPrincipals")) require.Equal(t, tt.outRegenerate, ok, "test %d", i) } } @@ -488,6 +489,7 @@ func TestAthenaAuditLogSetup(t *testing.T) { }, backend: backend, log: utils.NewLoggerForTests(), + logger: utils.NewSlogLoggerForTests(), } integrationSvc, err := local.NewIntegrationsService(backend) diff --git a/lib/service/signals.go b/lib/service/signals.go index fadd884e01728..2c6d8369d76cb 100644 --- a/lib/service/signals.go +++ b/lib/service/signals.go @@ -62,7 +62,7 @@ func (process *TeleportProcess) printShutdownStatus(ctx context.Context) { case <-t.C: statusInterval = min(statusInterval*2, defaults.LowResPollingPeriod) t.Reset(statusInterval) - process.log.Infof("Waiting for services: %v to finish.", process.Supervisor.Services()) + process.logger.InfoContext(process.ExitContext(), "Waiting for services to finish.", "services", process.Supervisor.Services()) } } } @@ -98,10 +98,10 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error { timeoutCtx, cancel := context.WithTimeout(ctx, apidefaults.MaxCertDuration) defer cancel() process.Shutdown(timeoutCtx) - process.log.Infof("All services stopped, exiting.") + process.logger.InfoContext(process.ExitContext(), "All services stopped, exiting.") return nil case syscall.SIGTERM, syscall.SIGINT: - process.log.Infof("Got signal %q, exiting within %s.", signal, fastShutdownTimeout) + process.logger.InfoContext(process.ExitContext(), "Got shutdown signal, exiting within timeout.", "signal", signal, "timeout", fastShutdownTimeout.Seconds()) // we run the shutdown in a goroutine so we can return and exit // the process even if Shutdown takes longer to return than we // expected (due to bugs, for example) @@ -116,9 +116,9 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error { defer graceTimer.Stop() select { case <-graceTimer.C: - process.log.Warn("Shutdown still hasn't completed, exiting anyway.") + process.logger.WarnContext(process.ExitContext(), "Shutdown still hasn't completed, exiting anyway.") case <-shutdownDone: - process.log.Info("All services stopped, exiting.") + process.logger.InfoContext(process.ExitContext(), "All services stopped or timeout passed, exiting immediately.") } return nil case syscall.SIGUSR1: @@ -131,29 +131,29 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error { // That was not quite enough. With pipelines diagnostics could come from any of several programs running simultaneously. // Diagnostics needed to identify themselves. // - Doug McIllroy, "A Research UNIX Reader: Annotated Excerpts from the Programmer’s Manual, 1971-1986" - process.log.Infof("Got signal %q, logging diagnostic info to stderr.", signal) + process.logger.InfoContext(process.ExitContext(), "Got signal SIGUSR1, logging diagnostic info to stderr.") writeDebugInfo(os.Stderr) case syscall.SIGUSR2: - process.log.Infof("Got signal %q, forking a new process.", signal) + process.logger.InfoContext(process.ExitContext(), "Got signal SIGUSR2, forking a new process.") if err := process.forkChild(); err != nil { - process.log.Warningf("Failed to fork: %v", err) + process.logger.WarnContext(process.ExitContext(), "Failed to fork process", "error", err) } else { - process.log.Infof("Successfully started new process.") + process.logger.InfoContext(process.ExitContext(), "Successfully started new process.") } case syscall.SIGHUP: - process.log.Infof("Got signal %q, performing graceful restart.", signal) + process.logger.InfoContext(process.ExitContext(), "Got signal SIGHUP, performing graceful restart.") if err := process.forkChild(); err != nil { - process.log.Warningf("Failed to fork: %v", err) + process.logger.WarnContext(process.ExitContext(), "Failed to fork process", "error", err) continue } - process.log.Infof("Successfully started new process, shutting down gracefully.") + process.logger.InfoContext(process.ExitContext(), "Successfully started new process, shutting down gracefully.") timeoutCtx, cancel := context.WithTimeout(ctx, apidefaults.MaxCertDuration) defer cancel() process.Shutdown(timeoutCtx) - process.log.Infof("All services stopped, exiting.") + process.logger.InfoContext(process.ExitContext(), "All services stopped, exiting.") return nil default: - process.log.Infof("Ignoring %q.", signal) + process.logger.InfoContext(process.ExitContext(), "Ignoring unknown signal.", "signal", signal) } case <-process.ReloadContext().Done(): // it's fine to signal.Stop the same channel multiple times, and @@ -166,32 +166,32 @@ func (process *TeleportProcess) WaitForSignals(ctx context.Context) error { // miss signals to exit or to graceful restart instead continue } - process.log.Infof("Exiting signal handler: process has started internal reload.") + process.logger.InfoContext(process.ExitContext(), "Exiting signal handler: process has started internal reload.") return ErrTeleportReloading case <-process.ExitContext().Done(): - process.log.Infof("Someone else has closed context, exiting.") + process.logger.InfoContext(process.ExitContext(), "Someone else has closed context, exiting.") return nil case <-ctx.Done(): process.Close() if err := process.Wait(); err != nil { - process.log.Warnf("Error waiting for all services to exit: %v", err) + process.logger.WarnContext(process.ExitContext(), "Error waiting for all services to exit", "error", err) } - process.log.Info("Got request to shutdown, context is closing") + process.logger.InfoContext(process.ExitContext(), "Got request to shutdown, context is closing") return nil case event := <-serviceErrorsC: se, ok := event.Payload.(ExitEventPayload) if !ok { - process.log.Warningf("Failed to decode service exit event, %T", event.Payload) + process.logger.WarnContext(process.ExitContext(), "Failed to decode service exit event", "payload", event.Payload) continue } if se.Service.IsCritical() { - process.log.Errorf("Critical service %v has exited with error %v, aborting.", se.Service, se.Error) + process.logger.ErrorContext(process.ExitContext(), "Critical service has exited with error, aborting.", "service", se.Service, "error", se.Error) if err := process.Close(); err != nil { - process.log.Errorf("Error when shutting down teleport %v.", err) + process.logger.ErrorContext(process.ExitContext(), "Error when shutting down teleport.", "error", err) } return trace.Wrap(se.Error) } - process.log.Warningf("Non-critical service %v has exited with error %v, continuing to operate.", se.Service, se.Error) + process.logger.WarnContext(process.ExitContext(), "Non-critical service has exited with error , continuing to operate.", "service", se.Service, "error", se.Error) } } } @@ -211,7 +211,7 @@ func (process *TeleportProcess) writeToSignalPipe(signalPipe *os.File, message s go func() { _, err := signalPipe.Write([]byte(message)) if err != nil { - process.log.Debugf("Failed to write to pipe: %v.", trace.DebugReport(err)) + process.logger.DebugContext(process.ExitContext(), "Failed to write to pipe.", "error", trace.DebugReport(err)) return } cancel() @@ -221,7 +221,7 @@ func (process *TeleportProcess) writeToSignalPipe(signalPipe *os.File, message s case <-time.After(signalPipeTimeout): return trace.BadParameter("Failed to write to parent process pipe.") case <-messageSignalled.Done(): - process.log.Infof("Signaled success to parent process.") + process.logger.InfoContext(process.ExitContext(), "Signaled success to parent process.") } return nil } @@ -236,7 +236,7 @@ func (process *TeleportProcess) closeImportedDescriptors(prefix string) error { openDescriptors := make([]*servicecfg.FileDescriptor, 0, len(process.importedDescriptors)) for _, d := range process.importedDescriptors { if strings.HasPrefix(d.Type, prefix) { - process.log.Infof("Closing imported but unused descriptor %v %v.", d.Type, d.Address) + process.logger.InfoContext(process.ExitContext(), "Closing imported but unused descriptor.", "type", d.Type, "address", d.Address) errors = append(errors, d.Close()) } else { openDescriptors = append(openDescriptors, d) @@ -251,13 +251,13 @@ func (process *TeleportProcess) closeImportedDescriptors(prefix string) error { func (process *TeleportProcess) importOrCreateListener(typ ListenerType, address string) (net.Listener, error) { l, err := process.importListener(typ, address) if err == nil { - process.log.Infof("Using file descriptor %v %v passed by the parent process.", typ, address) + process.logger.InfoContext(process.ExitContext(), "Using file descriptor passed by the parent process.", "type", typ, "address", address) return l, nil } if !trace.IsNotFound(err) { return nil, trace.Wrap(err) } - process.log.Infof("Service %v is creating new listener on %v.", typ, address) + process.logger.InfoContext(process.ExitContext(), "Service is creating new listener.", "type", typ, "address", address) return process.createListener(typ, address) } @@ -308,7 +308,7 @@ func (process *TeleportProcess) createListener(typ ListenerType, address string) } if listenersClosed() { - process.log.Debugf("Listening is blocked, not opening listener for type %v and address %v.", typ, address) + process.logger.DebugContext(process.ExitContext(), "Listening is blocked, not opening listener.", "type", typ, "address", address) return nil, trace.BadParameter("listening is blocked") } @@ -318,7 +318,7 @@ func (process *TeleportProcess) createListener(typ ListenerType, address string) listener, ok := process.getListenerNeedsLock(typ, address) process.Unlock() if ok { - process.log.Debugf("Using existing listener for type %v and address %v.", typ, address) + process.logger.DebugContext(process.ExitContext(), "Using existing listener.", "type", typ, "address", address) return listener, nil } return nil, trace.Wrap(err) @@ -330,12 +330,12 @@ func (process *TeleportProcess) createListener(typ ListenerType, address string) // needs a dns lookup, so we can't do it while holding the lock) if process.listenersClosed { listener.Close() - process.log.Debugf("Listening is blocked, closing newly-created listener for type %v and address %v.", typ, address) + process.logger.DebugContext(process.ExitContext(), "Listening is blocked, closing newly-created listener.", "type", typ, "address", address) return nil, trace.BadParameter("listening is blocked") } if l, ok := process.getListenerNeedsLock(typ, address); ok { listener.Close() - process.log.Debugf("Using existing listener for type %v and address %v.", typ, address) + process.logger.DebugContext(process.ExitContext(), "Using existing listener.", "type", typ, "address", address) return l, nil } r := registeredListener{typ: typ, address: address, listener: listener} @@ -500,9 +500,9 @@ func (process *TeleportProcess) forkChild() error { return err } - log := process.log.WithFields(logrus.Fields{"path": path, "workingDir": workingDir}) + logger := process.logger.With("path", path, "working_dir", workingDir) - log.Info("Forking child.") + logger.InfoContext(process.ExitContext(), "Forking child.") listenerFiles, err := process.ExportFileDescriptors() if err != nil { @@ -527,7 +527,7 @@ func (process *TeleportProcess) forkChild() error { return trace.Wrap(err) } - log.Infof("Passing %s to child", vals) + logger.InfoContext(process.ExitContext(), "Passing files to child", "files", vals) env := append(os.Environ(), fmt.Sprintf("%s=%s", teleportFilesEnvVar, vals)) p, err := os.StartProcess(path, os.Args, &os.ProcAttr{ @@ -539,19 +539,18 @@ func (process *TeleportProcess) forkChild() error { if err != nil { return trace.ConvertSystemError(err) } - log.WithField("pid", p.Pid).Infof("Forked new child process.") - log = process.log.WithField("pid", p.Pid) + logger.InfoContext(process.ExitContext(), "Forked new child process.", "pid", p.Pid) + logger = process.logger.With("pid", p.Pid) process.forkedTeleportCount.Add(1) go func() { defer process.forkedTeleportCount.Add(-1) state, err := p.Wait() if err != nil { - log.WithError(err). - Error("Failed waiting for forked Teleport process.") + logger.ErrorContext(process.ExitContext(), "Failed waiting for forked Teleport process.", "error", err) return } - log.WithField("status", state.String()).Warn("Forked Teleport process has exited.") + logger.WarnContext(process.ExitContext(), "Forked Teleport process has exited.", "status", state.String()) }() _ = writePipe.Close() @@ -565,7 +564,7 @@ func (process *TeleportProcess) forkChild() error { if err != nil { return trace.Wrap(err, "waiting for forked Teleport process to signal successful start") } - log.WithField("data", string(buf[:n])).Infof("Forked Teleport process signaled successful start.") + logger.InfoContext(process.ExitContext(), "Forked Teleport process signaled successful start.", "data", string(buf[:n])) return nil } diff --git a/lib/service/state.go b/lib/service/state.go index c99e404050d91..ad76112391960 100644 --- a/lib/service/state.go +++ b/lib/service/state.go @@ -91,7 +91,7 @@ func (f *processState) update(event Event) { component, ok := event.Payload.(string) if !ok { - f.process.log.Errorf("%v broadcasted without component name, this is a bug!", event.Name) + f.process.logger.ErrorContext(f.process.ExitContext(), "Received event broadcast without component name, this is a bug!", "event", event.Name) return } s, ok := f.states[component] @@ -105,7 +105,7 @@ func (f *processState) update(event Event) { // If a degraded event was received, always change the state to degraded. case TeleportDegradedEvent: s.state = stateDegraded - f.process.log.Infof("Detected Teleport component %q is running in a degraded state.", component) + f.process.logger.InfoContext(f.process.ExitContext(), "Detected Teleport component is running in a degraded state.", "component", component) // If the current state is degraded, and a OK event has been // received, change the state to recovering. If the current state is // recovering and a OK events is received, if it's been longer @@ -115,15 +115,15 @@ func (f *processState) update(event Event) { switch s.state { case stateStarting: s.state = stateOK - f.process.log.Debugf("Teleport component %q has started.", component) + f.process.logger.DebugContext(f.process.ExitContext(), "Teleport component has started.", "component", component) case stateDegraded: s.state = stateRecovering s.recoveryTime = f.process.Clock.Now() - f.process.log.Infof("Teleport component %q is recovering from a degraded state.", component) + f.process.logger.InfoContext(f.process.ExitContext(), "Teleport component is recovering from a degraded state.", "component", component) case stateRecovering: if f.process.Clock.Since(s.recoveryTime) > defaults.HeartbeatCheckPeriod*2 { s.state = stateOK - f.process.log.Infof("Teleport component %q has recovered from a degraded state.", component) + f.process.logger.InfoContext(f.process.ExitContext(), "Teleport component has recovered from a degraded state.", "component", component) } } } diff --git a/lib/utils/log/slog_handler.go b/lib/utils/log/slog_handler.go index 3c42ca56269ec..4e47810c48beb 100644 --- a/lib/utils/log/slog_handler.go +++ b/lib/utils/log/slog_handler.go @@ -555,3 +555,26 @@ func getCaller(a slog.Attr) (file string, line int) { return file, line } + +type stringerAttr struct { + fmt.Stringer +} + +// StringerAttr creates a [slog.LogValuer] that will defer to +// the provided [fmt.Stringer]. All slog attributes are always evaluated, +// even if the log event is discarded due to the configured log level. +// A text [slog.Handler] will try to defer evaluation if the attribute is a +// [fmt.Stringer], however, the JSON [slog.Handler] only defers to [json.Marshaler]. +// This means that to defer evaluation and creation of the string representation, +// the object must implement [fmt.Stringer] and [json.Marshaler], otherwise additional +// and unwanted values may be emitted if the logger is configured to use JSON +// instead of text. This wrapping mechanism allows a value that implements [fmt.Stringer], +// to be guaranteed to be lazily constructed and always output the same +// content regardless of the output format. +func StringerAttr(s fmt.Stringer) slog.LogValuer { + return stringerAttr{Stringer: s} +} + +func (s stringerAttr) LogValue() slog.Value { + return slog.StringValue(s.Stringer.String()) +}