From c5333f4b3b76efc7a4bed5e21fb8344b6660edd4 Mon Sep 17 00:00:00 2001 From: Marco Dinis Date: Mon, 4 Nov 2024 16:04:37 +0000 Subject: [PATCH] Move most of `lib/srv/discovery` from logrus to slog (#48292) * Move most of `lib/srv/discovery` from logrus to slog * use local context --- lib/service/discovery.go | 3 +- lib/srv/discovery/access_graph.go | 24 ++-- lib/srv/discovery/database_watcher.go | 14 +- lib/srv/discovery/discovery.go | 124 +++++++++--------- lib/srv/discovery/discovery_test.go | 31 +++-- lib/srv/discovery/kube_integration_watcher.go | 16 +-- .../kube_integration_watcher_test.go | 4 +- lib/srv/discovery/kube_services_watcher.go | 14 +- lib/srv/discovery/kube_watcher.go | 14 +- lib/srv/discovery/reconciler.go | 8 +- lib/srv/discovery/status.go | 21 ++- 11 files changed, 148 insertions(+), 125 deletions(-) diff --git a/lib/service/discovery.go b/lib/service/discovery.go index 4c6105f246234..5ab190b93b764 100644 --- a/lib/service/discovery.go +++ b/lib/service/discovery.go @@ -87,7 +87,8 @@ func (process *TeleportProcess) initDiscoveryService() error { Emitter: asyncEmitter, AccessPoint: accessPoint, ServerID: process.Config.HostUUID, - Log: process.log, + Log: process.logger, + LegacyLogger: process.log, ClusterName: conn.ClusterName(), ClusterFeatures: process.GetClusterFeatures, PollInterval: process.Config.Discovery.PollInterval, diff --git a/lib/srv/discovery/access_graph.go b/lib/srv/discovery/access_graph.go index b2bfd1a4e51a9..740bf21d2039a 100644 --- a/lib/srv/discovery/access_graph.go +++ b/lib/srv/discovery/access_graph.go @@ -66,7 +66,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, &aws_sync.Resources{}) if err := push(stream, upsert, toDel); err != nil { - s.Log.WithError(err).Error("Error pushing empty resources to TAGs") + s.Log.ErrorContext(ctx, "Error pushing empty resources to TAGs", "error", err) } return trace.Wrap(errNoAccessGraphFetchers) } @@ -109,7 +109,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * // Aggregate all errors into a single error. err := trace.NewAggregate(errs...) if err != nil { - s.Log.WithError(err).Error("Error polling TAGs") + s.Log.ErrorContext(ctx, "Error polling TAGs", "error", err) } result := aws_sync.MergeResources(results...) // Merge all results into a single result @@ -122,7 +122,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * } if pushErr != nil { - s.Log.WithError(pushErr).Error("Error pushing TAGs") + s.Log.ErrorContext(ctx, "Error pushing TAGs", "error", pushErr) return nil } // Update the currentTAGResources with the result of the reconciliation. @@ -135,7 +135,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * }, }, }); err != nil { - s.Log.WithError(err).Error("Error submitting usage event") + s.Log.ErrorContext(ctx, "Error submitting usage event", "error", err) } return nil @@ -315,7 +315,7 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c defer func() { lease.Stop() if err := lease.Wait(); err != nil { - s.Log.WithError(err).Warn("error cleaning up semaphore") + s.Log.WarnContext(ctx, "Error cleaning up semaphore", "error", err) } }() @@ -336,12 +336,12 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c stream, err := client.AWSEventsStream(ctx) if err != nil { - s.Log.WithError(err).Error("Failed to get access graph service stream") + s.Log.ErrorContext(ctx, "Failed to get access graph service stream", "error", err) return trace.Wrap(err) } header, err := stream.Header() if err != nil { - s.Log.WithError(err).Error("Failed to get access graph service stream header") + s.Log.ErrorContext(ctx, "Failed to get access graph service stream header", "error", err) return trace.Wrap(err) } const ( @@ -361,7 +361,7 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c defer wg.Done() defer cancel() if !accessGraphConn.WaitForStateChange(ctx, connectivity.Ready) { - s.Log.Info("access graph service connection was closed") + s.Log.InfoContext(ctx, "Access graph service connection was closed") } }() @@ -418,7 +418,7 @@ func grpcCredentials(config AccessGraphConfig, getCert func() (*tls.Certificate, func (s *Server) initAccessGraphWatchers(ctx context.Context, cfg *Config) error { fetchers, err := s.accessGraphFetchersFromMatchers(ctx, cfg.Matchers, "" /* discoveryConfigName */) if err != nil { - s.Log.WithError(err).Error("Error initializing access graph fetchers") + s.Log.ErrorContext(ctx, "Error initializing access graph fetchers", "error", err) } s.staticTAGSyncFetchers = fetchers @@ -431,7 +431,7 @@ func (s *Server) initAccessGraphWatchers(ctx context.Context, cfg *Config) error // We will wait for the config to change and re-evaluate the fetchers // before starting the sync. if len(allFetchers) == 0 { - s.Log.Debug("No AWS sync fetchers configured. Access graph sync will not be enabled.") + s.Log.DebugContext(ctx, "No AWS sync fetchers configured. Access graph sync will not be enabled.") select { case <-ctx.Done(): return @@ -442,10 +442,10 @@ func (s *Server) initAccessGraphWatchers(ctx context.Context, cfg *Config) error } // reset the currentTAGResources to force a full sync if err := s.initializeAndWatchAccessGraph(ctx, reloadCh); errors.Is(err, errTAGFeatureNotEnabled) { - s.Log.Warn("Access Graph specified in config, but the license does not include Teleport Policy. Access graph sync will not be enabled.") + s.Log.WarnContext(ctx, "Access Graph specified in config, but the license does not include Teleport Policy. Access graph sync will not be enabled.") break } else if err != nil { - s.Log.Warnf("Error initializing and watching access graph: %v", err) + s.Log.WarnContext(ctx, "Error initializing and watching access graph", "error", err) } select { diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index 8b78fd181e31a..302859420ad92 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -66,7 +66,7 @@ func (s *Server) startDatabaseWatchers() error { watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ FetchersFn: s.getAllDatabaseFetchers, - Log: s.Log.WithField("kind", types.KindDatabase), + Log: s.LegacyLogger.WithField("kind", types.KindDatabase), DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, TriggerFetchC: s.newDiscoveryConfigChangedSub(), @@ -96,7 +96,7 @@ func (s *Server) startDatabaseWatchers() error { mu.Unlock() if err := reconciler.Reconcile(s.ctx); err != nil { - s.Log.WithError(err).Warn("Unable to reconcile database resources.") + s.Log.WarnContext(s.ctx, "Unable to reconcile database resources", "error", err) } else if s.onDatabaseReconcile != nil { s.onDatabaseReconcile() } @@ -128,7 +128,7 @@ func (s *Server) getAllDatabaseFetchers() []common.Fetcher { func (s *Server) getCurrentDatabases() map[string]types.Database { databases, err := s.AccessPoint.GetDatabases(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Failed to get databases from cache.") + s.Log.WarnContext(s.ctx, "Failed to get databases from cache", "error", err) return nil } @@ -138,7 +138,7 @@ func (s *Server) getCurrentDatabases() map[string]types.Database { } func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) error { - s.Log.Debugf("Creating database %s.", database.GetName()) + s.Log.DebugContext(ctx, "Creating database", "database", database.GetName()) err := s.AccessPoint.CreateDatabase(ctx, database) // If the database already exists but has cloud origin and an empty // discovery group, then update it. @@ -163,18 +163,18 @@ func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) }, }) if err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) onDatabaseUpdate(ctx context.Context, database, _ types.Database) error { - s.Log.Debugf("Updating database %s.", database.GetName()) + s.Log.DebugContext(ctx, "Updating database", "database", database.GetName()) return trace.Wrap(s.AccessPoint.UpdateDatabase(ctx, database)) } func (s *Server) onDatabaseDelete(ctx context.Context, database types.Database) error { - s.Log.Debugf("Deleting database %s.", database.GetName()) + s.Log.DebugContext(ctx, "Deleting database", "database", database.GetName()) return trace.Wrap(s.AccessPoint.DeleteDatabase(ctx, database.GetName())) } diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 095da62b6475f..4b78786d5f60a 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -23,6 +23,7 @@ import ( "crypto/tls" "errors" "fmt" + "log/slog" "slices" "strings" "sync" @@ -61,6 +62,7 @@ import ( aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" "github.com/gravitational/teleport/lib/srv/discovery/fetchers/db" "github.com/gravitational/teleport/lib/srv/server" + logutils "github.com/gravitational/teleport/lib/utils/log" "github.com/gravitational/teleport/lib/utils/spreadwork" ) @@ -121,7 +123,10 @@ type Config struct { // AccessPoint is a discovery access point AccessPoint authclient.DiscoveryAccessPoint // Log is the logger. - Log logrus.FieldLogger + Log *slog.Logger + // LegacyLogger is the old logger + // Deprecated: use Log instead. + LegacyLogger logrus.FieldLogger // ServerID identifies the Teleport instance where this service runs. ServerID string // onDatabaseReconcile is called after each database resource reconciliation. @@ -224,7 +229,10 @@ kubernetes matchers are present.`) } if c.Log == nil { - c.Log = logrus.New() + c.Log = slog.Default() + } + if c.LegacyLogger == nil { + c.LegacyLogger = logrus.New() } if c.protocolChecker == nil { c.protocolChecker = fetchers.NewProtoChecker(false) @@ -245,11 +253,13 @@ kubernetes matchers are present.`) return trace.BadParameter("cluster features are required") } - c.Log = c.Log.WithField(teleport.ComponentKey, teleport.ComponentDiscovery) + c.Log = c.Log.With(teleport.ComponentKey, teleport.ComponentDiscovery) + c.LegacyLogger = c.LegacyLogger.WithField(teleport.ComponentKey, teleport.ComponentDiscovery) if c.DiscoveryGroup == "" { - c.Log.Warn("discovery_service.discovery_group is not set. This field is required for the discovery service to work properly.\n" + - "Please set discovery_service.discovery_group according to the documentation: https://goteleport.com/docs/reference/config/#discovery-service") + const warningMessage = "discovery_service.discovery_group is not set. This field is required for the discovery service to work properly.\n" + + "Please set discovery_service.discovery_group according to the documentation: https://goteleport.com/docs/reference/config/#discovery-service" + c.Log.WarnContext(context.Background(), warningMessage) } c.Matchers.Azure = services.SimplifyAzureMatchers(c.Matchers.Azure) @@ -499,7 +509,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { _, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType) // Add non-integration kube fetchers. - kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, otherMatchers) + kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers) if err != nil { return trace.Wrap(err) } @@ -527,7 +537,7 @@ func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error { KubernetesClient: kubeClient, FilterLabels: matcher.Labels, Namespaces: matcher.Namespaces, - Log: s.Log, + Log: s.LegacyLogger, ClusterName: s.DiscoveryGroup, ProtocolChecker: s.Config.protocolChecker, }) @@ -625,7 +635,7 @@ func (s *Server) kubeFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, return matcherType == types.AWSMatcherEKS }) if len(awsKubeMatchers) > 0 { - eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, awsKubeMatchers) + eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, awsKubeMatchers) if err != nil { return nil, trace.Wrap(err) } @@ -683,7 +693,7 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa Regions: matcher.Regions, FilterLabels: matcher.ResourceTags, ResourceGroups: matcher.ResourceGroups, - Log: s.Log, + Log: s.LegacyLogger, }) if err != nil { return trace.Wrap(err) @@ -764,7 +774,7 @@ func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatche Location: location, FilterLabels: matcher.GetLabels(), ProjectID: projectID, - Log: s.Log, + Log: s.LegacyLogger, }) if err != nil { return trace.Wrap(err) @@ -883,7 +893,7 @@ func (s *Server) handleEC2Instances(instances *server.EC2Instances) error { } if err := s.emitUsageEvents(instances.MakeEvents()); err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(s.ctx, "Error emitting usage event", "error", err) } return nil @@ -902,7 +912,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { for _, ec2Instance := range instances.Instances { eiceNode, err := common.NewAWSNodeFromEC2v1Instance(ec2Instance.OriginalInstance, awsInfo) if err != nil { - s.Log.WithField("instance_id", ec2Instance.InstanceID).Warnf("Error converting to Teleport EICE Node: %v", err) + s.Log.WarnContext(s.ctx, "Error converting to Teleport EICE Node", "error", err, "instance_id", ec2Instance.InstanceID) s.awsEC2ResourcesStatus.incrementFailed(awsResourceGroup{ discoveryConfig: instances.DiscoveryConfig, @@ -915,7 +925,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { return s.GetName() == eiceNode.GetName() }) if err != nil && !trace.IsNotFound(err) { - s.Log.Warnf("Error finding the existing node with name %q: %v", eiceNode.GetName(), err) + s.Log.WarnContext(s.ctx, "Error finding the existing node", "node_name", eiceNode.GetName(), "error", err) continue } @@ -925,7 +935,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { case 1: existingNode = existingNodes[0] default: - s.Log.Warnf("Found multiple matching nodes with name %q", eiceNode.GetName()) + s.Log.WarnContext(s.ctx, "Found multiple matching nodes by name", "name", eiceNode.GetName()) continue } @@ -957,7 +967,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { err := spreadwork.ApplyOverTime(s.ctx, applyOverTimeConfig, nodesToUpsert, func(eiceNode types.Server) { if _, err := s.AccessPoint.UpsertNode(s.ctx, eiceNode); err != nil { instanceID := eiceNode.GetAWSInstanceID() - s.Log.WithField("instance_id", instanceID).Warnf("Error upserting EC2 instance: %v", err) + s.Log.WarnContext(s.ctx, "Error upserting EC2 instance", "instance_id", instanceID, "error", err) s.awsEC2ResourcesStatus.incrementFailed(awsResourceGroup{ discoveryConfig: instances.DiscoveryConfig, integration: instances.Integration, @@ -965,7 +975,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { } }) if err != nil { - s.Log.Warnf("Failed to upsert EC2 nodes: %v", err) + s.Log.WarnContext(s.ctx, "Failed to upsert EC2 nodes", "error", err) } } @@ -979,8 +989,7 @@ func (s *Server) handleEC2RemoteInstallation(instances *server.EC2Instances) err return trace.Wrap(err) } - s.Log.Debugf("Running Teleport installation on these instances: AccountID: %s, Instances: %s", - instances.AccountID, genEC2InstancesLogStr(instances.Instances)) + s.Log.DebugContext(s.ctx, "Running Teleport installation on instances", "account_id", instances.AccountID, "instances", genEC2InstancesLogStr(instances.Instances)) req := server.SSMRunRequest{ DocumentName: instances.DocumentName, @@ -1025,11 +1034,17 @@ func (s *Server) handleEC2RemoteInstallation(instances *server.EC2Instances) err func (s *Server) logHandleInstancesErr(err error) { var aErr awserr.Error if errors.As(err, &aErr) && aErr.Code() == ssm.ErrCodeInvalidInstanceId { - s.Log.WithError(err).Error("SSM SendCommand failed with ErrCodeInvalidInstanceId. Make sure that the instances have AmazonSSMManagedInstanceCore policy assigned. Also check that SSM agent is running and registered with the SSM endpoint on that instance and try restarting or reinstalling it in case of issues. See https://docs.aws.amazon.com/systems-manager/latest/APIReference/API_SendCommand.html#API_SendCommand_Errors for more details.") + const errorMessage = "SSM SendCommand failed with ErrCodeInvalidInstanceId. " + + "Make sure that the instances have AmazonSSMManagedInstanceCore policy assigned. " + + "Also check that SSM agent is running and registered with the SSM endpoint on that instance and try restarting or reinstalling it in case of issues. " + + "See https://docs.aws.amazon.com/systems-manager/latest/APIReference/API_SendCommand.html#API_SendCommand_Errors for more details." + s.Log.ErrorContext(s.ctx, + errorMessage, + "error", err) } else if trace.IsNotFound(err) { - s.Log.Debug("All discovered EC2 instances are already part of the cluster.") + s.Log.DebugContext(s.ctx, "All discovered EC2 instances are already part of the cluster") } else { - s.Log.WithError(err).Error("Failed to enroll discovered EC2 instances.") + s.Log.ErrorContext(s.ctx, "Failed to enroll discovered EC2 instances", "error", err) } } @@ -1042,13 +1057,13 @@ func (s *Server) watchCARotation(ctx context.Context) { nodes, err := s.findUnrotatedEC2Nodes(ctx) if err != nil { if trace.IsNotFound(err) { - s.Log.Debug("No OpenSSH nodes require CA rotation") + s.Log.DebugContext(ctx, "No OpenSSH nodes require CA rotation") continue } - s.Log.Errorf("Error finding OpenSSH nodes requiring CA rotation: %s", err) + s.Log.ErrorContext(ctx, "Error finding OpenSSH nodes requiring CA rotation", "error", err) continue } - s.Log.Debugf("Found %d nodes requiring rotation", len(nodes)) + s.Log.DebugContext(ctx, "Found nodes requiring rotation", "nodes_count", len(nodes)) s.caRotationCh <- nodes case <-s.ctx.Done(): return @@ -1108,7 +1123,7 @@ func (s *Server) findUnrotatedEC2Nodes(ctx context.Context) ([]types.Server, err func (s *Server) handleEC2Discovery() { if err := s.nodeWatcher.WaitInitialization(); err != nil { - s.Log.WithError(err).Error("Failed to initialize nodeWatcher.") + s.Log.ErrorContext(s.ctx, "Failed to initialize nodeWatcher", "error", err) return } @@ -1119,8 +1134,7 @@ func (s *Server) handleEC2Discovery() { select { case instances := <-s.ec2Watcher.InstancesC: ec2Instances := instances.EC2 - s.Log.Debugf("EC2 instances discovered (AccountID: %s, Instances: %v), starting installation", - ec2Instances.AccountID, genEC2InstancesLogStr(ec2Instances.Instances)) + s.Log.DebugContext(s.ctx, "EC2 instances discovered, starting installation", "account_id", ec2Instances.AccountID, "instances", genEC2InstancesLogStr(ec2Instances.Instances)) s.awsEC2ResourcesStatus.incrementFound(awsResourceGroup{ discoveryConfig: instances.EC2.DiscoveryConfig, @@ -1186,9 +1200,7 @@ func (s *Server) handleAzureInstances(instances *server.AzureInstances) error { return trace.Wrap(errNoInstances) } - s.Log.Debugf("Running Teleport installation on these virtual machines: SubscriptionID: %s, VMs: %s", - instances.SubscriptionID, genAzureInstancesLogStr(instances.Instances), - ) + s.Log.DebugContext(s.ctx, "Running Teleport installation on virtual machines", "subscription_id", instances.SubscriptionID, "vms", genAzureInstancesLogStr(instances.Instances)) req := server.AzureRunRequest{ Client: client, Instances: instances.Instances, @@ -1203,14 +1215,14 @@ func (s *Server) handleAzureInstances(instances *server.AzureInstances) error { return trace.Wrap(err) } if err := s.emitUsageEvents(instances.MakeEvents()); err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(s.ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) handleAzureDiscovery() { if err := s.nodeWatcher.WaitInitialization(); err != nil { - s.Log.WithError(err).Error("Failed to initialize nodeWatcher.") + s.Log.ErrorContext(s.ctx, "Failed to initialize nodeWatcher", "error", err) return } @@ -1219,14 +1231,12 @@ func (s *Server) handleAzureDiscovery() { select { case instances := <-s.azureWatcher.InstancesC: azureInstances := instances.Azure - s.Log.Debugf("Azure instances discovered (SubscriptionID: %s, Instances: %v), starting installation", - azureInstances.SubscriptionID, genAzureInstancesLogStr(azureInstances.Instances), - ) + s.Log.DebugContext(s.ctx, "Azure instances discovered, starting installation", "subscription_id", azureInstances.SubscriptionID, "instances", genAzureInstancesLogStr(azureInstances.Instances)) if err := s.handleAzureInstances(azureInstances); err != nil { if errors.Is(err, errNoInstances) { - s.Log.Debug("All discovered Azure VMs are already part of the cluster.") + s.Log.DebugContext(s.ctx, "All discovered Azure VMs are already part of the cluster") } else { - s.Log.WithError(err).Error("Failed to enroll discovered Azure VMs.") + s.Log.ErrorContext(s.ctx, "Failed to enroll discovered Azure VMs", "error", err) } } case <-s.ctx.Done(): @@ -1280,9 +1290,7 @@ func (s *Server) handleGCPInstances(instances *server.GCPInstances) error { return trace.Wrap(errNoInstances) } - s.Log.Debugf("Running Teleport installation on these virtual machines: ProjectID: %s, VMs: %s", - instances.ProjectID, genGCPInstancesLogStr(instances.Instances), - ) + s.Log.DebugContext(s.ctx, "Running Teleport installation on virtual machines", "project_id", instances.ProjectID, "vms", genGCPInstancesLogStr(instances.Instances)) sshKeyAlgo, err := cryptosuites.AlgorithmForKey(s.ctx, cryptosuites.GetCurrentSuiteFromPing(s.AccessPoint), cryptosuites.UserSSH) if err != nil { return trace.Wrap(err, "finding algorithm for SSH key from ping response") @@ -1301,14 +1309,14 @@ func (s *Server) handleGCPInstances(instances *server.GCPInstances) error { return trace.Wrap(err) } if err := s.emitUsageEvents(instances.MakeEvents()); err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(s.ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) handleGCPDiscovery() { if err := s.nodeWatcher.WaitInitialization(); err != nil { - s.Log.WithError(err).Error("Failed to initialize nodeWatcher.") + s.Log.ErrorContext(s.ctx, "Failed to initialize nodeWatcher", "error", err) return } go s.gcpWatcher.Run() @@ -1316,14 +1324,12 @@ func (s *Server) handleGCPDiscovery() { select { case instances := <-s.gcpWatcher.InstancesC: gcpInstances := instances.GCP - s.Log.Debugf("GCP instances discovered (ProjectID: %s, Instances %v), starting installation", - gcpInstances.ProjectID, genGCPInstancesLogStr(gcpInstances.Instances), - ) + s.Log.DebugContext(s.ctx, "GCP instances discovered, starting installation", "project_id", gcpInstances.ProjectID, "instances", genGCPInstancesLogStr(gcpInstances.Instances)) if err := s.handleGCPInstances(gcpInstances); err != nil { if errors.Is(err, errNoInstances) { - s.Log.Debug("All discovered GCP VMs are already part of the cluster.") + s.Log.DebugContext(s.ctx, "All discovered GCP VMs are already part of the cluster") } else { - s.Log.WithError(err).Error("Failed to enroll discovered GCP VMs.") + s.Log.ErrorContext(s.ctx, "Failed to enroll discovered GCP VMs", "error", err) } } case <-s.ctx.Done(): @@ -1386,7 +1392,7 @@ func (s *Server) submitFetchEvent(cloudProvider, resourceType string) { }, }) if err != nil { - s.Log.WithError(err).Debug("Error emitting discovery fetch event.") + s.Log.DebugContext(s.ctx, "Error emitting discovery fetch event", "error", err) } } @@ -1479,7 +1485,7 @@ func (s *Server) loadExistingDynamicDiscoveryConfigs() error { for { dcs, respNextKey, err := s.AccessPoint.ListDiscoveryConfigs(s.ctx, 0, nextKey) if err != nil { - s.Log.WithError(err).Warnf("failed to list discovery configs") + s.Log.WarnContext(s.ctx, "Failed to list discovery configs", "error", err) return trace.Wrap(err) } @@ -1488,7 +1494,7 @@ func (s *Server) loadExistingDynamicDiscoveryConfigs() error { continue } if err := s.upsertDynamicMatchers(s.ctx, dc); err != nil { - s.Log.WithError(err).Warnf("failed to update dynamic matchers for discovery config %q", dc.GetName()) + s.Log.WarnContext(s.ctx, "Failed to update dynamic matchers for discovery config", "discovery_config", dc.GetName(), "error", err) continue } s.dynamicDiscoveryConfig[dc.GetName()] = dc @@ -1514,7 +1520,7 @@ func (s *Server) startDynamicWatcherUpdater() { case types.OpPut: dc, ok := event.Resource.(*discoveryconfig.DiscoveryConfig) if !ok { - s.Log.Warnf("dynamic matcher watcher: unexpected resource type %T", event.Resource) + s.Log.WarnContext(s.ctx, "Dynamic matcher watcher: unexpected resource type", "expected", logutils.TypeAttr(dc), "got", logutils.TypeAttr(event.Resource)) return } @@ -1542,7 +1548,7 @@ func (s *Server) startDynamicWatcherUpdater() { } if err := s.upsertDynamicMatchers(s.ctx, dc); err != nil { - s.Log.WithError(err).Warnf("failed to update dynamic matchers for discovery config %q", dc.GetName()) + s.Log.WarnContext(s.ctx, "Failed to update dynamic matchers for discovery config", "discovery_config", dc.GetName(), "error", err) continue } s.dynamicDiscoveryConfig[dc.GetName()] = dc @@ -1559,10 +1565,10 @@ func (s *Server) startDynamicWatcherUpdater() { delete(s.dynamicDiscoveryConfig, name) s.notifyDiscoveryConfigChanged() default: - s.Log.Warnf("Skipping unknown event type %s", event.Type) + s.Log.WarnContext(s.ctx, "Skipping unknown event type %s", "got", event.Type) } case <-s.dynamicMatcherWatcher.Done(): - s.Log.Warnf("dynamic matcher watcher error: %v", s.dynamicMatcherWatcher.Error()) + s.Log.WarnContext(s.ctx, "Dynamic matcher watcher error", "error", s.dynamicMatcherWatcher.Error()) return } } @@ -1694,7 +1700,7 @@ func (s *Server) discardUnsupportedMatchers(m *Matchers) { validAWSMatchers := make([]types.AWSMatcher, 0, len(m.AWS)) for i, m := range m.AWS { if m.Integration == "" { - s.Log.Warnf("discarding AWS matcher [%d] - missing integration", i) + s.Log.WarnContext(s.ctx, "Discarding AWS matcher - missing integration", "matcher_pos", i) continue } validAWSMatchers = append(validAWSMatchers, m) @@ -1702,17 +1708,17 @@ func (s *Server) discardUnsupportedMatchers(m *Matchers) { m.AWS = validAWSMatchers if len(m.GCP) > 0 { - s.Log.Warnf("discarding GCP matchers - missing integration") + s.Log.WarnContext(s.ctx, "Discarding GCP matchers - missing integration") m.GCP = []types.GCPMatcher{} } if len(m.Azure) > 0 { - s.Log.Warnf("discarding Azure matchers - missing integration") + s.Log.WarnContext(s.ctx, "Discarding Azure matchers - missing integration") m.Azure = []types.AzureMatcher{} } if len(m.Kubernetes) > 0 { - s.Log.Warnf("discarding Kubernetes matchers - missing integration") + s.Log.WarnContext(s.ctx, "Discarding Kubernetes matchers - missing integration") m.Kubernetes = []types.KubernetesMatcher{} } } @@ -1731,7 +1737,7 @@ func (s *Server) Stop() { } if s.dynamicMatcherWatcher != nil { if err := s.dynamicMatcherWatcher.Close(); err != nil { - s.Log.Warnf("dynamic matcher watcher closing error: ", trace.Wrap(err)) + s.Log.WarnContext(s.ctx, "Dynamic matcher watcher closing error", "error", err) } } } diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index ab384e65d74f5..7483dfb1195a2 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -87,6 +87,7 @@ import ( "github.com/gravitational/teleport/lib/srv/discovery/common" "github.com/gravitational/teleport/lib/srv/server" usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport" + libutils "github.com/gravitational/teleport/lib/utils" ) func TestMain(m *testing.M) { @@ -679,7 +680,9 @@ func TestDiscoveryServer(t *testing.T) { require.NoError(t, err) } - logger := logrus.New() + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() + reporter := &mockUsageReporter{} installer := &mockSSMInstaller{ installedInstances: make(map[string]struct{}), @@ -700,6 +703,7 @@ func TestDiscoveryServer(t *testing.T) { Matchers: tc.staticMatchers, Emitter: tc.emitter, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: defaultDiscoveryGroup, clock: fakeClock, }) @@ -759,7 +763,8 @@ func TestDiscoveryServer(t *testing.T) { func TestDiscoveryServerConcurrency(t *testing.T) { t.Parallel() ctx := context.Background() - logger := logrus.New() + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() defaultDiscoveryGroup := "dg01" awsMatcher := types.AWSMatcher{ @@ -839,6 +844,7 @@ func TestDiscoveryServerConcurrency(t *testing.T) { Matchers: staticMatcher, Emitter: emitter, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: defaultDiscoveryGroup, }) require.NoError(t, err) @@ -1337,9 +1343,11 @@ func TestDiscoveryInCloudKube(t *testing.T) { require.NoError(t, w.Close()) }) - logger := logrus.New() - logger.SetOutput(w) - logger.SetLevel(logrus.DebugLevel) + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() + + legacyLogger.SetOutput(w) + legacyLogger.SetLevel(logrus.DebugLevel) clustersNotUpdated := make(chan string, 10) go func() { // reconcileRegexp is the regex extractor of a log message emitted by reconciler when @@ -1378,6 +1386,7 @@ func TestDiscoveryInCloudKube(t *testing.T) { }, Emitter: authClient, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: mainDiscoveryGroup, }) @@ -2652,7 +2661,9 @@ func TestAzureVMDiscovery(t *testing.T) { require.NoError(t, err) } - logger := logrus.New() + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() + emitter := &mockEmitter{} reporter := &mockUsageReporter{} installer := &mockAzureInstaller{ @@ -2667,6 +2678,7 @@ func TestAzureVMDiscovery(t *testing.T) { Matchers: tc.staticMatchers, Emitter: emitter, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: defaultDiscoveryGroup, }) @@ -2958,7 +2970,8 @@ func TestGCPVMDiscovery(t *testing.T) { require.NoError(t, err) } - logger := logrus.New() + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() emitter := &mockEmitter{} reporter := &mockUsageReporter{} installer := &mockGCPInstaller{ @@ -2973,6 +2986,7 @@ func TestGCPVMDiscovery(t *testing.T) { Matchers: tc.staticMatchers, Emitter: emitter, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: defaultDiscoveryGroup, }) @@ -3019,7 +3033,8 @@ func TestServer_onCreate(t *testing.T) { Config: &Config{ DiscoveryGroup: "test-cluster", AccessPoint: accessPoint, - Log: logrus.New(), + Log: libutils.NewSlogLoggerForTests(), + LegacyLogger: logrus.New(), }, } diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go index 58c7228b4f031..d8efaceda4bf8 100644 --- a/lib/srv/discovery/kube_integration_watcher.go +++ b/lib/srv/discovery/kube_integration_watcher.go @@ -68,7 +68,7 @@ func (s *Server) startKubeIntegrationWatchers() error { s.submitFetchersEvent(kubeIntegrationFetchers) return kubeIntegrationFetchers }, - Log: s.Log.WithField("kind", types.KindKubernetesCluster), + Log: s.LegacyLogger.WithField("kind", types.KindKubernetesCluster), DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, Origin: types.OriginCloud, @@ -88,13 +88,13 @@ func (s *Server) startKubeIntegrationWatchers() error { existingServers, err := clt.GetKubernetesServers(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Failed to get Kubernetes servers from cache.") + s.Log.WarnContext(s.ctx, "Failed to get Kubernetes servers from cache", "error", err) continue } existingClusters, err := clt.GetKubernetesClusters(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Failed to get Kubernetes clusters from cache.") + s.Log.WarnContext(s.ctx, "Failed to get Kubernetes clusters from cache", "error", err) continue } @@ -120,7 +120,7 @@ func (s *Server) startKubeIntegrationWatchers() error { agentVersion, err := s.getKubeAgentVersion(releaseChannels) if err != nil { - s.Log.WithError(err).Warn("Could not get agent version to enroll EKS clusters") + s.Log.WarnContext(s.ctx, "Could not get agent version to enroll EKS clusters", "error", err) continue } @@ -195,19 +195,19 @@ func (s *Server) enrollEKSClusters(region, integration string, clusters []types. AgentVersion: agentVersion, }) if err != nil { - s.Log.WithError(err).Errorf("failed to enroll EKS clusters %v", clusterNames) + s.Log.ErrorContext(ctx, "Failed to enroll EKS clusters", "cluster_names", clusterNames, "error", err) continue } for _, r := range rsp.Results { if r.Error != "" { if !strings.Contains(r.Error, "teleport-kube-agent is already installed on the cluster") { - s.Log.Errorf("failed to enroll EKS cluster %q: %s", r.EksClusterName, r.Error) + s.Log.ErrorContext(ctx, "Failed to enroll EKS cluster", "cluster_name", r.EksClusterName, "error", err) } else { - s.Log.Debugf("EKS cluster %q already has installed kube agent", r.EksClusterName) + s.Log.DebugContext(ctx, "EKS cluster already has installed kube agent", "cluster_name", r.EksClusterName) } } else { - s.Log.Infof("successfully enrolled EKS cluster %q", r.EksClusterName) + s.Log.InfoContext(ctx, "Successfully enrolled EKS cluster", "cluster_name", r.EksClusterName) } } } diff --git a/lib/srv/discovery/kube_integration_watcher_test.go b/lib/srv/discovery/kube_integration_watcher_test.go index 65d0684dce397..197032884e13c 100644 --- a/lib/srv/discovery/kube_integration_watcher_test.go +++ b/lib/srv/discovery/kube_integration_watcher_test.go @@ -52,6 +52,7 @@ import ( "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/srv/discovery/common" "github.com/gravitational/teleport/lib/srv/discovery/fetchers" + libutils "github.com/gravitational/teleport/lib/utils" ) func TestServer_getKubeFetchers(t *testing.T) { @@ -380,7 +381,8 @@ func TestDiscoveryKubeIntegrationEKS(t *testing.T) { AWS: tc.awsMatchers, }, Emitter: authClient, - Log: logrus.New(), + Log: libutils.NewSlogLoggerForTests(), + LegacyLogger: logrus.New(), DiscoveryGroup: mainDiscoveryGroup, }) diff --git a/lib/srv/discovery/kube_services_watcher.go b/lib/srv/discovery/kube_services_watcher.go index 120c4d14aab4b..f9fc7ad47a16e 100644 --- a/lib/srv/discovery/kube_services_watcher.go +++ b/lib/srv/discovery/kube_services_watcher.go @@ -51,7 +51,7 @@ func (s *Server) startKubeAppsWatchers() error { GetCurrentResources: func() map[string]types.Application { apps, err := s.AccessPoint.GetApps(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Unable to get applications from cache.") + s.Log.WarnContext(s.ctx, "Unable to get applications from cache", "error", err) return nil } @@ -76,7 +76,7 @@ func (s *Server) startKubeAppsWatchers() error { watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ FetchersFn: common.StaticFetchers(s.kubeAppsFetchers), Interval: 5 * time.Minute, - Log: s.Log.WithField("kind", types.KindApp), + Log: s.LegacyLogger.WithField("kind", types.KindApp), DiscoveryGroup: s.DiscoveryGroup, Origin: types.OriginDiscoveryKubernetes, }) @@ -104,7 +104,7 @@ func (s *Server) startKubeAppsWatchers() error { mu.Unlock() if err := reconciler.Reconcile(s.ctx); err != nil { - s.Log.WithError(err).Warn("Unable to reconcile resources.") + s.Log.WarnContext(s.ctx, "Unable to reconcile resources", "error", err) } case <-s.ctx.Done(): @@ -116,7 +116,7 @@ func (s *Server) startKubeAppsWatchers() error { } func (s *Server) onAppCreate(ctx context.Context, app types.Application) error { - s.Log.Debugf("Creating app %s", app.GetName()) + s.Log.DebugContext(ctx, "Creating app", "app_name", app.GetName()) err := s.AccessPoint.CreateApp(ctx, app) // If the resource already exists, it means that the resource was created // by a previous discovery_service instance that didn't support the discovery @@ -141,17 +141,17 @@ func (s *Server) onAppCreate(ctx context.Context, app types.Application) error { }, }) if err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) onAppUpdate(ctx context.Context, app, _ types.Application) error { - s.Log.Debugf("Updating app %s.", app.GetName()) + s.Log.DebugContext(ctx, "Updating app", "app_name", app.GetName()) return trace.Wrap(s.AccessPoint.UpdateApp(ctx, app)) } func (s *Server) onAppDelete(ctx context.Context, app types.Application) error { - s.Log.Debugf("Deleting app %s.", app.GetName()) + s.Log.DebugContext(ctx, "Deleting app", "app_name", app.GetName()) return trace.Wrap(s.AccessPoint.DeleteApp(ctx, app.GetName())) } diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index 88f06a163827a..d6cc07aa75ae2 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -50,7 +50,7 @@ func (s *Server) startKubeWatchers() error { GetCurrentResources: func() map[string]types.KubeCluster { kcs, err := s.AccessPoint.GetKubernetesClusters(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Unable to get Kubernetes clusters from cache.") + s.Log.WarnContext(s.ctx, "Unable to get Kubernetes clusters from cache", "error", err) return nil } @@ -78,7 +78,7 @@ func (s *Server) startKubeWatchers() error { s.submitFetchersEvent(kubeNonIntegrationFetchers) return kubeNonIntegrationFetchers }, - Log: s.Log.WithField("kind", types.KindKubernetesCluster), + Log: s.LegacyLogger.WithField("kind", types.KindKubernetesCluster), DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, Origin: types.OriginCloud, @@ -108,7 +108,7 @@ func (s *Server) startKubeWatchers() error { mu.Unlock() if err := reconciler.Reconcile(s.ctx); err != nil { - s.Log.WithError(err).Warn("Unable to reconcile resources.") + s.Log.WarnContext(s.ctx, "Unable to reconcile resources", "error", err) } case <-s.ctx.Done(): @@ -120,7 +120,7 @@ func (s *Server) startKubeWatchers() error { } func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster) error { - s.Log.Debugf("Creating kube_cluster %s.", kubeCluster.GetName()) + s.Log.DebugContext(ctx, "Creating kube_cluster", "kube_cluster_name", kubeCluster.GetName()) err := s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster) // If the kube already exists but has an empty discovery group, update it. if err != nil { @@ -140,17 +140,17 @@ func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster }, }) if err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) onKubeUpdate(ctx context.Context, kubeCluster, _ types.KubeCluster) error { - s.Log.Debugf("Updating kube_cluster %s.", kubeCluster.GetName()) + s.Log.DebugContext(ctx, "Updating kube_cluster", "kube_cluster_name", kubeCluster.GetName()) return trace.Wrap(s.AccessPoint.UpdateKubernetesCluster(ctx, kubeCluster)) } func (s *Server) onKubeDelete(ctx context.Context, kubeCluster types.KubeCluster) error { - s.Log.Debugf("Deleting kube_cluster %s.", kubeCluster.GetName()) + s.Log.DebugContext(ctx, "Deleting kube_cluster", "kube_cluster_name", kubeCluster.GetName()) return trace.Wrap(s.AccessPoint.DeleteKubernetesCluster(ctx, kubeCluster.GetName())) } diff --git a/lib/srv/discovery/reconciler.go b/lib/srv/discovery/reconciler.go index 26b17410e1bd6..dd9dc1d605f9c 100644 --- a/lib/srv/discovery/reconciler.go +++ b/lib/srv/discovery/reconciler.go @@ -20,12 +20,12 @@ package discovery import ( "context" + "log/slog" "sync" "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/retryutils" @@ -46,7 +46,7 @@ type serverInfoUpserter interface { type labelReconcilerConfig struct { clock clockwork.Clock - log logrus.FieldLogger + log *slog.Logger accessPoint serverInfoUpserter } @@ -58,7 +58,7 @@ func (c *labelReconcilerConfig) checkAndSetDefaults() error { c.clock = clockwork.NewRealClock() } if c.log == nil { - c.log = logrus.New() + c.log = slog.Default() } return nil } @@ -124,7 +124,7 @@ func (r *labelReconciler) run(ctx context.Context) { for _, si := range batch { if err := r.cfg.accessPoint.UpsertServerInfo(ctx, si); err != nil { - r.cfg.log.WithError(err).Error("Failed to upsert server info.") + r.cfg.log.ErrorContext(ctx, "Failed to upsert server info", "error", err) // Allow the server info to be queued again. delete(r.discoveredServers, si.GetName()) } diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index 7fe0b0f39398d..a3ff7bf8559ff 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -25,7 +25,6 @@ import ( "time" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/timestamppb" discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1" @@ -63,9 +62,9 @@ func (s *Server) updateDiscoveryConfigStatus(discoveryConfigName string) { _, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, discoveryConfigName, discoveryConfigStatus) switch { case trace.IsNotImplemented(err): - s.Log.Warn("UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.") + s.Log.WarnContext(ctx, "UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.") case err != nil: - s.Log.WithError(err).WithField("discovery_config_name", discoveryConfigName).Info("Error updating discovery config status") + s.Log.InfoContext(ctx, "Error updating discovery config status", "discovery_config_name", discoveryConfigName, "error", err) } } @@ -422,7 +421,7 @@ func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (releaseFn fun cancel() lease.Stop() if err := lease.Wait(); err != nil { - s.Log.WithError(err).WithField("semaphore", userTaskName).Warn("error cleaning up UserTask semaphore") + s.Log.WarnContext(ctx, "Error cleaning up UserTask semaphore", "semaphore", semaphoreName, "error", err) } } @@ -522,13 +521,13 @@ func (s *Server) upsertTasksForAWSEC2FailedEnrollments() { } if err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID); err != nil { - s.Log.WithError(err).WithFields(logrus.Fields{ - "integration": g.integration, - "issue_type": g.issueType, - "aws_account_id": g.accountID, - "aws_region": g.region, - }, - ).Warning("Failed to create discover ec2 user task.", g.integration, g.issueType, g.accountID, g.region) + s.Log.WarnContext(s.ctx, "Failed to create discover ec2 user task", + "integration", g.integration, + "issue_type", g.issueType, + "aws_account_id", g.accountID, + "aws_region", g.region, + "error", err, + ) continue }