From 92861f18c1488950f29ce6517bd81480b697a5d4 Mon Sep 17 00:00:00 2001 From: Marco Dinis Date: Tue, 5 Nov 2024 14:12:28 +0000 Subject: [PATCH] Move most of `lib/srv/discovery` from logrus to slog (#48292) (#48428) * Move most of `lib/srv/discovery` from logrus to slog * use local context --- lib/service/discovery.go | 3 +- lib/srv/discovery/access_graph.go | 24 ++-- lib/srv/discovery/database_watcher.go | 16 +-- lib/srv/discovery/discovery.go | 124 +++++++++--------- lib/srv/discovery/discovery_test.go | 31 +++-- lib/srv/discovery/kube_integration_watcher.go | 16 +-- .../kube_integration_watcher_test.go | 4 +- lib/srv/discovery/kube_services_watcher.go | 16 +-- lib/srv/discovery/kube_watcher.go | 16 +-- lib/srv/discovery/reconciler.go | 8 +- lib/srv/discovery/status.go | 21 ++- 11 files changed, 151 insertions(+), 128 deletions(-) diff --git a/lib/service/discovery.go b/lib/service/discovery.go index 845c9edbc4de2..b69f5f558994c 100644 --- a/lib/service/discovery.go +++ b/lib/service/discovery.go @@ -98,7 +98,8 @@ func (process *TeleportProcess) initDiscoveryService() error { Emitter: asyncEmitter, AccessPoint: accessPoint, ServerID: process.Config.HostUUID, - Log: process.log, + Log: process.logger, + LegacyLogger: process.log, ClusterName: conn.ClientIdentity.ClusterName, ClusterFeatures: process.GetClusterFeatures, PollInterval: process.Config.Discovery.PollInterval, diff --git a/lib/srv/discovery/access_graph.go b/lib/srv/discovery/access_graph.go index f3ba28a2d479f..dc7fee7a29dd9 100644 --- a/lib/srv/discovery/access_graph.go +++ b/lib/srv/discovery/access_graph.go @@ -66,7 +66,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * upsert, toDel := aws_sync.ReconcileResults(currentTAGResources, &aws_sync.Resources{}) if err := push(stream, upsert, toDel); err != nil { - s.Log.WithError(err).Error("Error pushing empty resources to TAGs") + s.Log.ErrorContext(ctx, "Error pushing empty resources to TAGs", "error", err) } return trace.Wrap(errNoAccessGraphFetchers) } @@ -109,7 +109,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * // Aggregate all errors into a single error. err := trace.NewAggregate(errs...) if err != nil { - s.Log.WithError(err).Error("Error polling TAGs") + s.Log.ErrorContext(ctx, "Error polling TAGs", "error", err) } result := aws_sync.MergeResources(results...) // Merge all results into a single result @@ -122,7 +122,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * } if pushErr != nil { - s.Log.WithError(pushErr).Error("Error pushing TAGs") + s.Log.ErrorContext(ctx, "Error pushing TAGs", "error", pushErr) return nil } // Update the currentTAGResources with the result of the reconciliation. @@ -135,7 +135,7 @@ func (s *Server) reconcileAccessGraph(ctx context.Context, currentTAGResources * }, }, }); err != nil { - s.Log.WithError(err).Error("Error submitting usage event") + s.Log.ErrorContext(ctx, "Error submitting usage event", "error", err) } return nil @@ -315,7 +315,7 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c defer func() { lease.Stop() if err := lease.Wait(); err != nil { - s.Log.WithError(err).Warn("error cleaning up semaphore") + s.Log.WarnContext(ctx, "Error cleaning up semaphore", "error", err) } }() @@ -336,12 +336,12 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c stream, err := client.AWSEventsStream(ctx) if err != nil { - s.Log.WithError(err).Error("Failed to get access graph service stream") + s.Log.ErrorContext(ctx, "Failed to get access graph service stream", "error", err) return trace.Wrap(err) } header, err := stream.Header() if err != nil { - s.Log.WithError(err).Error("Failed to get access graph service stream header") + s.Log.ErrorContext(ctx, "Failed to get access graph service stream header", "error", err) return trace.Wrap(err) } const ( @@ -361,7 +361,7 @@ func (s *Server) initializeAndWatchAccessGraph(ctx context.Context, reloadCh <-c defer wg.Done() defer cancel() if !accessGraphConn.WaitForStateChange(ctx, connectivity.Ready) { - s.Log.Info("access graph service connection was closed") + s.Log.InfoContext(ctx, "Access graph service connection was closed") } }() @@ -411,7 +411,7 @@ func grpcCredentials(config AccessGraphConfig, certs []tls.Certificate) (grpc.Di func (s *Server) initAccessGraphWatchers(ctx context.Context, cfg *Config) error { fetchers, err := s.accessGraphFetchersFromMatchers(ctx, cfg.Matchers, "" /* discoveryConfigName */) if err != nil { - s.Log.WithError(err).Error("Error initializing access graph fetchers") + s.Log.ErrorContext(ctx, "Error initializing access graph fetchers", "error", err) } s.staticTAGSyncFetchers = fetchers @@ -424,7 +424,7 @@ func (s *Server) initAccessGraphWatchers(ctx context.Context, cfg *Config) error // We will wait for the config to change and re-evaluate the fetchers // before starting the sync. if len(allFetchers) == 0 { - s.Log.Debug("No AWS sync fetchers configured. Access graph sync will not be enabled.") + s.Log.DebugContext(ctx, "No AWS sync fetchers configured. Access graph sync will not be enabled.") select { case <-ctx.Done(): return @@ -435,10 +435,10 @@ func (s *Server) initAccessGraphWatchers(ctx context.Context, cfg *Config) error } // reset the currentTAGResources to force a full sync if err := s.initializeAndWatchAccessGraph(ctx, reloadCh); errors.Is(err, errTAGFeatureNotEnabled) { - s.Log.Warn("Access Graph specified in config, but the license does not include Teleport Policy. Access graph sync will not be enabled.") + s.Log.WarnContext(ctx, "Access Graph specified in config, but the license does not include Teleport Policy. Access graph sync will not be enabled.") break } else if err != nil { - s.Log.Warnf("Error initializing and watching access graph: %v", err) + s.Log.WarnContext(ctx, "Error initializing and watching access graph", "error", err) } select { diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index c3ab1abb437bf..77b03d68113bb 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -52,7 +52,7 @@ func (s *Server) startDatabaseWatchers() error { defer mu.Unlock() return utils.FromSlice(newDatabases, types.Database.GetName) }, - Log: s.Log.WithField("kind", types.KindDatabase), + Log: s.LegacyLogger.WithField("kind", types.KindDatabase), OnCreate: s.onDatabaseCreate, OnUpdate: s.onDatabaseUpdate, OnDelete: s.onDatabaseDelete, @@ -64,7 +64,7 @@ func (s *Server) startDatabaseWatchers() error { watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ FetchersFn: s.getAllDatabaseFetchers, - Log: s.Log.WithField("kind", types.KindDatabase), + Log: s.LegacyLogger.WithField("kind", types.KindDatabase), DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, TriggerFetchC: s.newDiscoveryConfigChangedSub(), @@ -94,7 +94,7 @@ func (s *Server) startDatabaseWatchers() error { mu.Unlock() if err := reconciler.Reconcile(s.ctx); err != nil { - s.Log.WithError(err).Warn("Unable to reconcile database resources.") + s.Log.WarnContext(s.ctx, "Unable to reconcile database resources", "error", err) } else if s.onDatabaseReconcile != nil { s.onDatabaseReconcile() } @@ -126,7 +126,7 @@ func (s *Server) getAllDatabaseFetchers() []common.Fetcher { func (s *Server) getCurrentDatabases() map[string]types.Database { databases, err := s.AccessPoint.GetDatabases(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Failed to get databases from cache.") + s.Log.WarnContext(s.ctx, "Failed to get databases from cache", "error", err) return nil } @@ -136,7 +136,7 @@ func (s *Server) getCurrentDatabases() map[string]types.Database { } func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) error { - s.Log.Debugf("Creating database %s.", database.GetName()) + s.Log.DebugContext(ctx, "Creating database", "database", database.GetName()) err := s.AccessPoint.CreateDatabase(ctx, database) // If the database already exists but has cloud origin and an empty // discovery group, then update it. @@ -161,18 +161,18 @@ func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) }, }) if err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) onDatabaseUpdate(ctx context.Context, database, _ types.Database) error { - s.Log.Debugf("Updating database %s.", database.GetName()) + s.Log.DebugContext(ctx, "Updating database", "database", database.GetName()) return trace.Wrap(s.AccessPoint.UpdateDatabase(ctx, database)) } func (s *Server) onDatabaseDelete(ctx context.Context, database types.Database) error { - s.Log.Debugf("Deleting database %s.", database.GetName()) + s.Log.DebugContext(ctx, "Deleting database", "database", database.GetName()) return trace.Wrap(s.AccessPoint.DeleteDatabase(ctx, database.GetName())) } diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 829b16c3c01f7..a27b9c18b45fe 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -23,6 +23,7 @@ import ( "crypto/tls" "errors" "fmt" + "log/slog" "slices" "strings" "sync" @@ -59,6 +60,7 @@ import ( aws_sync "github.com/gravitational/teleport/lib/srv/discovery/fetchers/aws-sync" "github.com/gravitational/teleport/lib/srv/discovery/fetchers/db" "github.com/gravitational/teleport/lib/srv/server" + logutils "github.com/gravitational/teleport/lib/utils/log" "github.com/gravitational/teleport/lib/utils/spreadwork" ) @@ -119,7 +121,10 @@ type Config struct { // AccessPoint is a discovery access point AccessPoint authclient.DiscoveryAccessPoint // Log is the logger. - Log logrus.FieldLogger + Log *slog.Logger + // LegacyLogger is the old logger + // Deprecated: use Log instead. + LegacyLogger logrus.FieldLogger // ServerID identifies the Teleport instance where this service runs. ServerID string // onDatabaseReconcile is called after each database resource reconciliation. @@ -222,7 +227,10 @@ kubernetes matchers are present.`) } if c.Log == nil { - c.Log = logrus.New() + c.Log = slog.Default() + } + if c.LegacyLogger == nil { + c.LegacyLogger = logrus.New() } if c.protocolChecker == nil { c.protocolChecker = fetchers.NewProtoChecker(false) @@ -243,11 +251,13 @@ kubernetes matchers are present.`) return trace.BadParameter("cluster features are required") } - c.Log = c.Log.WithField(teleport.ComponentKey, teleport.ComponentDiscovery) + c.Log = c.Log.With(teleport.ComponentKey, teleport.ComponentDiscovery) + c.LegacyLogger = c.LegacyLogger.WithField(teleport.ComponentKey, teleport.ComponentDiscovery) if c.DiscoveryGroup == "" { - c.Log.Warn("discovery_service.discovery_group is not set. This field is required for the discovery service to work properly.\n" + - "Please set discovery_service.discovery_group according to the documentation: https://goteleport.com/docs/reference/config/#discovery-service") + const warningMessage = "discovery_service.discovery_group is not set. This field is required for the discovery service to work properly.\n" + + "Please set discovery_service.discovery_group according to the documentation: https://goteleport.com/docs/reference/config/#discovery-service" + c.Log.WarnContext(context.Background(), warningMessage) } c.Matchers.Azure = services.SimplifyAzureMatchers(c.Matchers.Azure) @@ -497,7 +507,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { _, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType) // Add non-integration kube fetchers. - kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, otherMatchers) + kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers) if err != nil { return trace.Wrap(err) } @@ -525,7 +535,7 @@ func (s *Server) initKubeAppWatchers(matchers []types.KubernetesMatcher) error { KubernetesClient: kubeClient, FilterLabels: matcher.Labels, Namespaces: matcher.Namespaces, - Log: s.Log, + Log: s.LegacyLogger, ClusterName: s.DiscoveryGroup, ProtocolChecker: s.Config.protocolChecker, }) @@ -623,7 +633,7 @@ func (s *Server) kubeFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, return matcherType == types.AWSMatcherEKS }) if len(awsKubeMatchers) > 0 { - eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.Log, s.CloudClients, awsKubeMatchers) + eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, awsKubeMatchers) if err != nil { return nil, trace.Wrap(err) } @@ -681,7 +691,7 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa Regions: matcher.Regions, FilterLabels: matcher.ResourceTags, ResourceGroups: matcher.ResourceGroups, - Log: s.Log, + Log: s.LegacyLogger, }) if err != nil { return trace.Wrap(err) @@ -762,7 +772,7 @@ func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatche Location: location, FilterLabels: matcher.GetLabels(), ProjectID: projectID, - Log: s.Log, + Log: s.LegacyLogger, }) if err != nil { return trace.Wrap(err) @@ -875,7 +885,7 @@ func (s *Server) handleEC2Instances(instances *server.EC2Instances) error { } if err := s.emitUsageEvents(instances.MakeEvents()); err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(s.ctx, "Error emitting usage event", "error", err) } return nil @@ -894,7 +904,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { for _, ec2Instance := range instances.Instances { eiceNode, err := common.NewAWSNodeFromEC2v1Instance(ec2Instance.OriginalInstance, awsInfo) if err != nil { - s.Log.WithField("instance_id", ec2Instance.InstanceID).Warnf("Error converting to Teleport EICE Node: %v", err) + s.Log.WarnContext(s.ctx, "Error converting to Teleport EICE Node", "error", err, "instance_id", ec2Instance.InstanceID) s.awsEC2ResourcesStatus.incrementFailed(awsResourceGroup{ discoveryConfig: instances.DiscoveryConfig, @@ -905,7 +915,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { existingNode, err := s.nodeWatcher.GetNode(s.ctx, eiceNode.GetName()) if err != nil && !trace.IsNotFound(err) { - s.Log.Warnf("Error finding the existing node with name %q: %v", eiceNode.GetName(), err) + s.Log.WarnContext(s.ctx, "Error finding the existing node", "node_name", eiceNode.GetName(), "error", err) continue } @@ -937,7 +947,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { err := spreadwork.ApplyOverTime(s.ctx, applyOverTimeConfig, nodesToUpsert, func(eiceNode types.Server) { if _, err := s.AccessPoint.UpsertNode(s.ctx, eiceNode); err != nil { instanceID := eiceNode.GetAWSInstanceID() - s.Log.WithField("instance_id", instanceID).Warnf("Error upserting EC2 instance: %v", err) + s.Log.WarnContext(s.ctx, "Error upserting EC2 instance", "instance_id", instanceID, "error", err) s.awsEC2ResourcesStatus.incrementFailed(awsResourceGroup{ discoveryConfig: instances.DiscoveryConfig, integration: instances.Integration, @@ -945,7 +955,7 @@ func (s *Server) heartbeatEICEInstance(instances *server.EC2Instances) { } }) if err != nil { - s.Log.Warnf("Failed to upsert EC2 nodes: %v", err) + s.Log.WarnContext(s.ctx, "Failed to upsert EC2 nodes", "error", err) } } @@ -959,8 +969,7 @@ func (s *Server) handleEC2RemoteInstallation(instances *server.EC2Instances) err return trace.Wrap(err) } - s.Log.Debugf("Running Teleport installation on these instances: AccountID: %s, Instances: %s", - instances.AccountID, genEC2InstancesLogStr(instances.Instances)) + s.Log.DebugContext(s.ctx, "Running Teleport installation on instances", "account_id", instances.AccountID, "instances", genEC2InstancesLogStr(instances.Instances)) req := server.SSMRunRequest{ DocumentName: instances.DocumentName, @@ -1005,11 +1014,17 @@ func (s *Server) handleEC2RemoteInstallation(instances *server.EC2Instances) err func (s *Server) logHandleInstancesErr(err error) { var aErr awserr.Error if errors.As(err, &aErr) && aErr.Code() == ssm.ErrCodeInvalidInstanceId { - s.Log.WithError(err).Error("SSM SendCommand failed with ErrCodeInvalidInstanceId. Make sure that the instances have AmazonSSMManagedInstanceCore policy assigned. Also check that SSM agent is running and registered with the SSM endpoint on that instance and try restarting or reinstalling it in case of issues. See https://docs.aws.amazon.com/systems-manager/latest/APIReference/API_SendCommand.html#API_SendCommand_Errors for more details.") + const errorMessage = "SSM SendCommand failed with ErrCodeInvalidInstanceId. " + + "Make sure that the instances have AmazonSSMManagedInstanceCore policy assigned. " + + "Also check that SSM agent is running and registered with the SSM endpoint on that instance and try restarting or reinstalling it in case of issues. " + + "See https://docs.aws.amazon.com/systems-manager/latest/APIReference/API_SendCommand.html#API_SendCommand_Errors for more details." + s.Log.ErrorContext(s.ctx, + errorMessage, + "error", err) } else if trace.IsNotFound(err) { - s.Log.Debug("All discovered EC2 instances are already part of the cluster.") + s.Log.DebugContext(s.ctx, "All discovered EC2 instances are already part of the cluster") } else { - s.Log.WithError(err).Error("Failed to enroll discovered EC2 instances.") + s.Log.ErrorContext(s.ctx, "Failed to enroll discovered EC2 instances", "error", err) } } @@ -1022,13 +1037,13 @@ func (s *Server) watchCARotation(ctx context.Context) { nodes, err := s.findUnrotatedEC2Nodes(ctx) if err != nil { if trace.IsNotFound(err) { - s.Log.Debug("No OpenSSH nodes require CA rotation") + s.Log.DebugContext(ctx, "No OpenSSH nodes require CA rotation") continue } - s.Log.Errorf("Error finding OpenSSH nodes requiring CA rotation: %s", err) + s.Log.ErrorContext(ctx, "Error finding OpenSSH nodes requiring CA rotation", "error", err) continue } - s.Log.Debugf("Found %d nodes requiring rotation", len(nodes)) + s.Log.DebugContext(ctx, "Found nodes requiring rotation", "nodes_count", len(nodes)) s.caRotationCh <- nodes case <-s.ctx.Done(): return @@ -1085,7 +1100,7 @@ func (s *Server) findUnrotatedEC2Nodes(ctx context.Context) ([]types.Server, err func (s *Server) handleEC2Discovery() { if err := s.nodeWatcher.WaitInitialization(); err != nil { - s.Log.WithError(err).Error("Failed to initialize nodeWatcher.") + s.Log.ErrorContext(s.ctx, "Failed to initialize nodeWatcher", "error", err) return } @@ -1096,8 +1111,7 @@ func (s *Server) handleEC2Discovery() { select { case instances := <-s.ec2Watcher.InstancesC: ec2Instances := instances.EC2 - s.Log.Debugf("EC2 instances discovered (AccountID: %s, Instances: %v), starting installation", - ec2Instances.AccountID, genEC2InstancesLogStr(ec2Instances.Instances)) + s.Log.DebugContext(s.ctx, "EC2 instances discovered, starting installation", "account_id", ec2Instances.AccountID, "instances", genEC2InstancesLogStr(ec2Instances.Instances)) s.awsEC2ResourcesStatus.incrementFound(awsResourceGroup{ discoveryConfig: instances.EC2.DiscoveryConfig, @@ -1155,9 +1169,7 @@ func (s *Server) handleAzureInstances(instances *server.AzureInstances) error { return trace.Wrap(errNoInstances) } - s.Log.Debugf("Running Teleport installation on these virtual machines: SubscriptionID: %s, VMs: %s", - instances.SubscriptionID, genAzureInstancesLogStr(instances.Instances), - ) + s.Log.DebugContext(s.ctx, "Running Teleport installation on virtual machines", "subscription_id", instances.SubscriptionID, "vms", genAzureInstancesLogStr(instances.Instances)) req := server.AzureRunRequest{ Client: client, Instances: instances.Instances, @@ -1172,14 +1184,14 @@ func (s *Server) handleAzureInstances(instances *server.AzureInstances) error { return trace.Wrap(err) } if err := s.emitUsageEvents(instances.MakeEvents()); err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(s.ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) handleAzureDiscovery() { if err := s.nodeWatcher.WaitInitialization(); err != nil { - s.Log.WithError(err).Error("Failed to initialize nodeWatcher.") + s.Log.ErrorContext(s.ctx, "Failed to initialize nodeWatcher", "error", err) return } @@ -1188,14 +1200,12 @@ func (s *Server) handleAzureDiscovery() { select { case instances := <-s.azureWatcher.InstancesC: azureInstances := instances.Azure - s.Log.Debugf("Azure instances discovered (SubscriptionID: %s, Instances: %v), starting installation", - azureInstances.SubscriptionID, genAzureInstancesLogStr(azureInstances.Instances), - ) + s.Log.DebugContext(s.ctx, "Azure instances discovered, starting installation", "subscription_id", azureInstances.SubscriptionID, "instances", genAzureInstancesLogStr(azureInstances.Instances)) if err := s.handleAzureInstances(azureInstances); err != nil { if errors.Is(err, errNoInstances) { - s.Log.Debug("All discovered Azure VMs are already part of the cluster.") + s.Log.DebugContext(s.ctx, "All discovered Azure VMs are already part of the cluster") } else { - s.Log.WithError(err).Error("Failed to enroll discovered Azure VMs.") + s.Log.ErrorContext(s.ctx, "Failed to enroll discovered Azure VMs", "error", err) } } case <-s.ctx.Done(): @@ -1241,9 +1251,7 @@ func (s *Server) handleGCPInstances(instances *server.GCPInstances) error { return trace.Wrap(errNoInstances) } - s.Log.Debugf("Running Teleport installation on these virtual machines: ProjectID: %s, VMs: %s", - instances.ProjectID, genGCPInstancesLogStr(instances.Instances), - ) + s.Log.DebugContext(s.ctx, "Running Teleport installation on virtual machines", "project_id", instances.ProjectID, "vms", genGCPInstancesLogStr(instances.Instances)) req := server.GCPRunRequest{ Client: client, Instances: instances.Instances, @@ -1257,14 +1265,14 @@ func (s *Server) handleGCPInstances(instances *server.GCPInstances) error { return trace.Wrap(err) } if err := s.emitUsageEvents(instances.MakeEvents()); err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(s.ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) handleGCPDiscovery() { if err := s.nodeWatcher.WaitInitialization(); err != nil { - s.Log.WithError(err).Error("Failed to initialize nodeWatcher.") + s.Log.ErrorContext(s.ctx, "Failed to initialize nodeWatcher", "error", err) return } go s.gcpWatcher.Run() @@ -1272,14 +1280,12 @@ func (s *Server) handleGCPDiscovery() { select { case instances := <-s.gcpWatcher.InstancesC: gcpInstances := instances.GCP - s.Log.Debugf("GCP instances discovered (ProjectID: %s, Instances %v), starting installation", - gcpInstances.ProjectID, genGCPInstancesLogStr(gcpInstances.Instances), - ) + s.Log.DebugContext(s.ctx, "GCP instances discovered, starting installation", "project_id", gcpInstances.ProjectID, "instances", genGCPInstancesLogStr(gcpInstances.Instances)) if err := s.handleGCPInstances(gcpInstances); err != nil { if errors.Is(err, errNoInstances) { - s.Log.Debug("All discovered GCP VMs are already part of the cluster.") + s.Log.DebugContext(s.ctx, "All discovered GCP VMs are already part of the cluster") } else { - s.Log.WithError(err).Error("Failed to enroll discovered GCP VMs.") + s.Log.ErrorContext(s.ctx, "Failed to enroll discovered GCP VMs", "error", err) } } case <-s.ctx.Done(): @@ -1342,7 +1348,7 @@ func (s *Server) submitFetchEvent(cloudProvider, resourceType string) { }, }) if err != nil { - s.Log.WithError(err).Debug("Error emitting discovery fetch event.") + s.Log.DebugContext(s.ctx, "Error emitting discovery fetch event", "error", err) } } @@ -1435,7 +1441,7 @@ func (s *Server) loadExistingDynamicDiscoveryConfigs() error { for { dcs, respNextKey, err := s.AccessPoint.ListDiscoveryConfigs(s.ctx, 0, nextKey) if err != nil { - s.Log.WithError(err).Warnf("failed to list discovery configs") + s.Log.WarnContext(s.ctx, "Failed to list discovery configs", "error", err) return trace.Wrap(err) } @@ -1444,7 +1450,7 @@ func (s *Server) loadExistingDynamicDiscoveryConfigs() error { continue } if err := s.upsertDynamicMatchers(s.ctx, dc); err != nil { - s.Log.WithError(err).Warnf("failed to update dynamic matchers for discovery config %q", dc.GetName()) + s.Log.WarnContext(s.ctx, "Failed to update dynamic matchers for discovery config", "discovery_config", dc.GetName(), "error", err) continue } s.dynamicDiscoveryConfig[dc.GetName()] = dc @@ -1470,7 +1476,7 @@ func (s *Server) startDynamicWatcherUpdater() { case types.OpPut: dc, ok := event.Resource.(*discoveryconfig.DiscoveryConfig) if !ok { - s.Log.Warnf("dynamic matcher watcher: unexpected resource type %T", event.Resource) + s.Log.WarnContext(s.ctx, "Dynamic matcher watcher: unexpected resource type", "expected", logutils.TypeAttr(dc), "got", logutils.TypeAttr(event.Resource)) return } @@ -1498,7 +1504,7 @@ func (s *Server) startDynamicWatcherUpdater() { } if err := s.upsertDynamicMatchers(s.ctx, dc); err != nil { - s.Log.WithError(err).Warnf("failed to update dynamic matchers for discovery config %q", dc.GetName()) + s.Log.WarnContext(s.ctx, "Failed to update dynamic matchers for discovery config", "discovery_config", dc.GetName(), "error", err) continue } s.dynamicDiscoveryConfig[dc.GetName()] = dc @@ -1515,10 +1521,10 @@ func (s *Server) startDynamicWatcherUpdater() { delete(s.dynamicDiscoveryConfig, name) s.notifyDiscoveryConfigChanged() default: - s.Log.Warnf("Skipping unknown event type %s", event.Type) + s.Log.WarnContext(s.ctx, "Skipping unknown event type %s", "got", event.Type) } case <-s.dynamicMatcherWatcher.Done(): - s.Log.Warnf("dynamic matcher watcher error: %v", s.dynamicMatcherWatcher.Error()) + s.Log.WarnContext(s.ctx, "Dynamic matcher watcher error", "error", s.dynamicMatcherWatcher.Error()) return } } @@ -1650,7 +1656,7 @@ func (s *Server) discardUnsupportedMatchers(m *Matchers) { validAWSMatchers := make([]types.AWSMatcher, 0, len(m.AWS)) for i, m := range m.AWS { if m.Integration == "" { - s.Log.Warnf("discarding AWS matcher [%d] - missing integration", i) + s.Log.WarnContext(s.ctx, "Discarding AWS matcher - missing integration", "matcher_pos", i) continue } validAWSMatchers = append(validAWSMatchers, m) @@ -1658,17 +1664,17 @@ func (s *Server) discardUnsupportedMatchers(m *Matchers) { m.AWS = validAWSMatchers if len(m.GCP) > 0 { - s.Log.Warnf("discarding GCP matchers - missing integration") + s.Log.WarnContext(s.ctx, "Discarding GCP matchers - missing integration") m.GCP = []types.GCPMatcher{} } if len(m.Azure) > 0 { - s.Log.Warnf("discarding Azure matchers - missing integration") + s.Log.WarnContext(s.ctx, "Discarding Azure matchers - missing integration") m.Azure = []types.AzureMatcher{} } if len(m.Kubernetes) > 0 { - s.Log.Warnf("discarding Kubernetes matchers - missing integration") + s.Log.WarnContext(s.ctx, "Discarding Kubernetes matchers - missing integration") m.Kubernetes = []types.KubernetesMatcher{} } } @@ -1687,7 +1693,7 @@ func (s *Server) Stop() { } if s.dynamicMatcherWatcher != nil { if err := s.dynamicMatcherWatcher.Close(); err != nil { - s.Log.Warnf("dynamic matcher watcher closing error: ", trace.Wrap(err)) + s.Log.WarnContext(s.ctx, "Dynamic matcher watcher closing error", "error", err) } } } @@ -1719,7 +1725,7 @@ func (s *Server) initTeleportNodeWatcher() (err error) { s.nodeWatcher, err = services.NewNodeWatcher(s.ctx, services.NodeWatcherConfig{ ResourceWatcherConfig: services.ResourceWatcherConfig{ Component: teleport.ComponentDiscovery, - Log: s.Log, + Log: s.LegacyLogger, Client: s.AccessPoint, MaxStaleness: time.Minute, }, diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 100973a56e242..84e2d9e7bcf01 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -87,6 +87,7 @@ import ( "github.com/gravitational/teleport/lib/srv/discovery/common" "github.com/gravitational/teleport/lib/srv/server" usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport" + libutils "github.com/gravitational/teleport/lib/utils" ) func TestMain(m *testing.M) { @@ -679,7 +680,9 @@ func TestDiscoveryServer(t *testing.T) { require.NoError(t, err) } - logger := logrus.New() + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() + reporter := &mockUsageReporter{} installer := &mockSSMInstaller{ installedInstances: make(map[string]struct{}), @@ -700,6 +703,7 @@ func TestDiscoveryServer(t *testing.T) { Matchers: tc.staticMatchers, Emitter: tc.emitter, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: defaultDiscoveryGroup, clock: fakeClock, }) @@ -759,7 +763,8 @@ func TestDiscoveryServer(t *testing.T) { func TestDiscoveryServerConcurrency(t *testing.T) { t.Parallel() ctx := context.Background() - logger := logrus.New() + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() defaultDiscoveryGroup := "dg01" awsMatcher := types.AWSMatcher{ @@ -839,6 +844,7 @@ func TestDiscoveryServerConcurrency(t *testing.T) { Matchers: staticMatcher, Emitter: emitter, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: defaultDiscoveryGroup, }) require.NoError(t, err) @@ -1336,9 +1342,11 @@ func TestDiscoveryInCloudKube(t *testing.T) { require.NoError(t, w.Close()) }) - logger := logrus.New() - logger.SetOutput(w) - logger.SetLevel(logrus.DebugLevel) + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() + + legacyLogger.SetOutput(w) + legacyLogger.SetLevel(logrus.DebugLevel) clustersNotUpdated := make(chan string, 10) go func() { // reconcileRegexp is the regex extractor of a log message emitted by reconciler when @@ -1377,6 +1385,7 @@ func TestDiscoveryInCloudKube(t *testing.T) { }, Emitter: authClient, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: mainDiscoveryGroup, }) @@ -2668,7 +2677,9 @@ func TestAzureVMDiscovery(t *testing.T) { require.NoError(t, err) } - logger := logrus.New() + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() + emitter := &mockEmitter{} reporter := &mockUsageReporter{} installer := &mockAzureInstaller{ @@ -2683,6 +2694,7 @@ func TestAzureVMDiscovery(t *testing.T) { Matchers: tc.staticMatchers, Emitter: emitter, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: defaultDiscoveryGroup, }) @@ -2974,7 +2986,8 @@ func TestGCPVMDiscovery(t *testing.T) { require.NoError(t, err) } - logger := logrus.New() + legacyLogger := logrus.New() + logger := libutils.NewSlogLoggerForTests() emitter := &mockEmitter{} reporter := &mockUsageReporter{} installer := &mockGCPInstaller{ @@ -2989,6 +3002,7 @@ func TestGCPVMDiscovery(t *testing.T) { Matchers: tc.staticMatchers, Emitter: emitter, Log: logger, + LegacyLogger: legacyLogger, DiscoveryGroup: defaultDiscoveryGroup, }) @@ -3035,7 +3049,8 @@ func TestServer_onCreate(t *testing.T) { Config: &Config{ DiscoveryGroup: "test-cluster", AccessPoint: accessPoint, - Log: logrus.New(), + Log: libutils.NewSlogLoggerForTests(), + LegacyLogger: logrus.New(), }, } diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go index 58c7228b4f031..d8efaceda4bf8 100644 --- a/lib/srv/discovery/kube_integration_watcher.go +++ b/lib/srv/discovery/kube_integration_watcher.go @@ -68,7 +68,7 @@ func (s *Server) startKubeIntegrationWatchers() error { s.submitFetchersEvent(kubeIntegrationFetchers) return kubeIntegrationFetchers }, - Log: s.Log.WithField("kind", types.KindKubernetesCluster), + Log: s.LegacyLogger.WithField("kind", types.KindKubernetesCluster), DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, Origin: types.OriginCloud, @@ -88,13 +88,13 @@ func (s *Server) startKubeIntegrationWatchers() error { existingServers, err := clt.GetKubernetesServers(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Failed to get Kubernetes servers from cache.") + s.Log.WarnContext(s.ctx, "Failed to get Kubernetes servers from cache", "error", err) continue } existingClusters, err := clt.GetKubernetesClusters(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Failed to get Kubernetes clusters from cache.") + s.Log.WarnContext(s.ctx, "Failed to get Kubernetes clusters from cache", "error", err) continue } @@ -120,7 +120,7 @@ func (s *Server) startKubeIntegrationWatchers() error { agentVersion, err := s.getKubeAgentVersion(releaseChannels) if err != nil { - s.Log.WithError(err).Warn("Could not get agent version to enroll EKS clusters") + s.Log.WarnContext(s.ctx, "Could not get agent version to enroll EKS clusters", "error", err) continue } @@ -195,19 +195,19 @@ func (s *Server) enrollEKSClusters(region, integration string, clusters []types. AgentVersion: agentVersion, }) if err != nil { - s.Log.WithError(err).Errorf("failed to enroll EKS clusters %v", clusterNames) + s.Log.ErrorContext(ctx, "Failed to enroll EKS clusters", "cluster_names", clusterNames, "error", err) continue } for _, r := range rsp.Results { if r.Error != "" { if !strings.Contains(r.Error, "teleport-kube-agent is already installed on the cluster") { - s.Log.Errorf("failed to enroll EKS cluster %q: %s", r.EksClusterName, r.Error) + s.Log.ErrorContext(ctx, "Failed to enroll EKS cluster", "cluster_name", r.EksClusterName, "error", err) } else { - s.Log.Debugf("EKS cluster %q already has installed kube agent", r.EksClusterName) + s.Log.DebugContext(ctx, "EKS cluster already has installed kube agent", "cluster_name", r.EksClusterName) } } else { - s.Log.Infof("successfully enrolled EKS cluster %q", r.EksClusterName) + s.Log.InfoContext(ctx, "Successfully enrolled EKS cluster", "cluster_name", r.EksClusterName) } } } diff --git a/lib/srv/discovery/kube_integration_watcher_test.go b/lib/srv/discovery/kube_integration_watcher_test.go index f6cab69c9ec46..556796981c996 100644 --- a/lib/srv/discovery/kube_integration_watcher_test.go +++ b/lib/srv/discovery/kube_integration_watcher_test.go @@ -52,6 +52,7 @@ import ( "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/srv/discovery/common" "github.com/gravitational/teleport/lib/srv/discovery/fetchers" + libutils "github.com/gravitational/teleport/lib/utils" ) func TestServer_getKubeFetchers(t *testing.T) { @@ -380,7 +381,8 @@ func TestDiscoveryKubeIntegrationEKS(t *testing.T) { AWS: tc.awsMatchers, }, Emitter: authClient, - Log: logrus.New(), + Log: libutils.NewSlogLoggerForTests(), + LegacyLogger: logrus.New(), DiscoveryGroup: mainDiscoveryGroup, }) diff --git a/lib/srv/discovery/kube_services_watcher.go b/lib/srv/discovery/kube_services_watcher.go index eb6d68cc964f7..8a80aea590b89 100644 --- a/lib/srv/discovery/kube_services_watcher.go +++ b/lib/srv/discovery/kube_services_watcher.go @@ -50,7 +50,7 @@ func (s *Server) startKubeAppsWatchers() error { GetCurrentResources: func() map[string]types.Application { apps, err := s.AccessPoint.GetApps(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Unable to get applications from cache.") + s.Log.WarnContext(s.ctx, "Unable to get applications from cache", "error", err) return nil } @@ -61,7 +61,7 @@ func (s *Server) startKubeAppsWatchers() error { defer mu.Unlock() return utils.FromSlice(appResources, types.Application.GetName) }, - Log: s.Log.WithField("kind", types.KindApp), + Log: s.LegacyLogger.WithField("kind", types.KindApp), OnCreate: s.onAppCreate, OnUpdate: s.onAppUpdate, OnDelete: s.onAppDelete, @@ -74,7 +74,7 @@ func (s *Server) startKubeAppsWatchers() error { watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ FetchersFn: common.StaticFetchers(s.kubeAppsFetchers), Interval: 5 * time.Minute, - Log: s.Log.WithField("kind", types.KindApp), + Log: s.LegacyLogger.WithField("kind", types.KindApp), DiscoveryGroup: s.DiscoveryGroup, Origin: types.OriginDiscoveryKubernetes, }) @@ -102,7 +102,7 @@ func (s *Server) startKubeAppsWatchers() error { mu.Unlock() if err := reconciler.Reconcile(s.ctx); err != nil { - s.Log.WithError(err).Warn("Unable to reconcile resources.") + s.Log.WarnContext(s.ctx, "Unable to reconcile resources", "error", err) } case <-s.ctx.Done(): @@ -114,7 +114,7 @@ func (s *Server) startKubeAppsWatchers() error { } func (s *Server) onAppCreate(ctx context.Context, app types.Application) error { - s.Log.Debugf("Creating app %s", app.GetName()) + s.Log.DebugContext(ctx, "Creating app", "app_name", app.GetName()) err := s.AccessPoint.CreateApp(ctx, app) // If the resource already exists, it means that the resource was created // by a previous discovery_service instance that didn't support the discovery @@ -139,17 +139,17 @@ func (s *Server) onAppCreate(ctx context.Context, app types.Application) error { }, }) if err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) onAppUpdate(ctx context.Context, app, _ types.Application) error { - s.Log.Debugf("Updating app %s.", app.GetName()) + s.Log.DebugContext(ctx, "Updating app", "app_name", app.GetName()) return trace.Wrap(s.AccessPoint.UpdateApp(ctx, app)) } func (s *Server) onAppDelete(ctx context.Context, app types.Application) error { - s.Log.Debugf("Deleting app %s.", app.GetName()) + s.Log.DebugContext(ctx, "Deleting app", "app_name", app.GetName()) return trace.Wrap(s.AccessPoint.DeleteApp(ctx, app.GetName())) } diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index e18cc23e68c99..5247ff213b2e7 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -49,7 +49,7 @@ func (s *Server) startKubeWatchers() error { GetCurrentResources: func() map[string]types.KubeCluster { kcs, err := s.AccessPoint.GetKubernetesClusters(s.ctx) if err != nil { - s.Log.WithError(err).Warn("Unable to get Kubernetes clusters from cache.") + s.Log.WarnContext(s.ctx, "Unable to get Kubernetes clusters from cache", "error", err) return nil } @@ -60,7 +60,7 @@ func (s *Server) startKubeWatchers() error { defer mu.Unlock() return utils.FromSlice(kubeResources, types.KubeCluster.GetName) }, - Log: s.Log.WithField("kind", types.KindKubernetesCluster), + Log: s.LegacyLogger.WithField("kind", types.KindKubernetesCluster), OnCreate: s.onKubeCreate, OnUpdate: s.onKubeUpdate, OnDelete: s.onKubeDelete, @@ -76,7 +76,7 @@ func (s *Server) startKubeWatchers() error { s.submitFetchersEvent(kubeNonIntegrationFetchers) return kubeNonIntegrationFetchers }, - Log: s.Log.WithField("kind", types.KindKubernetesCluster), + Log: s.LegacyLogger.WithField("kind", types.KindKubernetesCluster), DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, Origin: types.OriginCloud, @@ -106,7 +106,7 @@ func (s *Server) startKubeWatchers() error { mu.Unlock() if err := reconciler.Reconcile(s.ctx); err != nil { - s.Log.WithError(err).Warn("Unable to reconcile resources.") + s.Log.WarnContext(s.ctx, "Unable to reconcile resources", "error", err) } case <-s.ctx.Done(): @@ -118,7 +118,7 @@ func (s *Server) startKubeWatchers() error { } func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster) error { - s.Log.Debugf("Creating kube_cluster %s.", kubeCluster.GetName()) + s.Log.DebugContext(ctx, "Creating kube_cluster", "kube_cluster_name", kubeCluster.GetName()) err := s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster) // If the kube already exists but has an empty discovery group, update it. if err != nil { @@ -138,17 +138,17 @@ func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster }, }) if err != nil { - s.Log.WithError(err).Debug("Error emitting usage event.") + s.Log.DebugContext(ctx, "Error emitting usage event", "error", err) } return nil } func (s *Server) onKubeUpdate(ctx context.Context, kubeCluster, _ types.KubeCluster) error { - s.Log.Debugf("Updating kube_cluster %s.", kubeCluster.GetName()) + s.Log.DebugContext(ctx, "Updating kube_cluster", "kube_cluster_name", kubeCluster.GetName()) return trace.Wrap(s.AccessPoint.UpdateKubernetesCluster(ctx, kubeCluster)) } func (s *Server) onKubeDelete(ctx context.Context, kubeCluster types.KubeCluster) error { - s.Log.Debugf("Deleting kube_cluster %s.", kubeCluster.GetName()) + s.Log.DebugContext(ctx, "Deleting kube_cluster", "kube_cluster_name", kubeCluster.GetName()) return trace.Wrap(s.AccessPoint.DeleteKubernetesCluster(ctx, kubeCluster.GetName())) } diff --git a/lib/srv/discovery/reconciler.go b/lib/srv/discovery/reconciler.go index 26b17410e1bd6..dd9dc1d605f9c 100644 --- a/lib/srv/discovery/reconciler.go +++ b/lib/srv/discovery/reconciler.go @@ -20,12 +20,12 @@ package discovery import ( "context" + "log/slog" "sync" "time" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/retryutils" @@ -46,7 +46,7 @@ type serverInfoUpserter interface { type labelReconcilerConfig struct { clock clockwork.Clock - log logrus.FieldLogger + log *slog.Logger accessPoint serverInfoUpserter } @@ -58,7 +58,7 @@ func (c *labelReconcilerConfig) checkAndSetDefaults() error { c.clock = clockwork.NewRealClock() } if c.log == nil { - c.log = logrus.New() + c.log = slog.Default() } return nil } @@ -124,7 +124,7 @@ func (r *labelReconciler) run(ctx context.Context) { for _, si := range batch { if err := r.cfg.accessPoint.UpsertServerInfo(ctx, si); err != nil { - r.cfg.log.WithError(err).Error("Failed to upsert server info.") + r.cfg.log.ErrorContext(ctx, "Failed to upsert server info", "error", err) // Allow the server info to be queued again. delete(r.discoveredServers, si.GetName()) } diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index 66769b0a88933..321619bb02636 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -25,7 +25,6 @@ import ( "time" "github.com/gravitational/trace" - "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/timestamppb" discoveryconfigv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/discoveryconfig/v1" @@ -69,9 +68,9 @@ func (s *Server) updateDiscoveryConfigStatus(discoveryConfigName string) { _, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, discoveryConfigName, discoveryConfigStatus) switch { case trace.IsNotImplemented(err): - s.Log.Warn("UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.") + s.Log.WarnContext(ctx, "UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.") case err != nil: - s.Log.WithError(err).WithField("discovery_config_name", discoveryConfigName).Info("Error updating discovery config status") + s.Log.InfoContext(ctx, "Error updating discovery config status", "discovery_config_name", discoveryConfigName, "error", err) } } @@ -428,7 +427,7 @@ func (s *Server) acquireSemaphoreForUserTask(userTaskName string) (releaseFn fun cancel() lease.Stop() if err := lease.Wait(); err != nil { - s.Log.WithError(err).WithField("semaphore", userTaskName).Warn("error cleaning up UserTask semaphore") + s.Log.WarnContext(ctx, "Error cleaning up UserTask semaphore", "semaphore", semaphoreName, "error", err) } } @@ -528,13 +527,13 @@ func (s *Server) upsertTasksForAWSEC2FailedEnrollments() { } if err := s.mergeUpsertDiscoverEC2Task(g, instancesIssueByID); err != nil { - s.Log.WithError(err).WithFields(logrus.Fields{ - "integration": g.integration, - "issue_type": g.issueType, - "aws_account_id": g.accountID, - "aws_region": g.region, - }, - ).Warning("Failed to create discover ec2 user task.", g.integration, g.issueType, g.accountID, g.region) + s.Log.WarnContext(s.ctx, "Failed to create discover ec2 user task", + "integration", g.integration, + "issue_type", g.issueType, + "aws_account_id", g.accountID, + "aws_region", g.region, + "error", err, + ) continue }