diff --git a/lib/srv/discovery/common/watcher.go b/lib/srv/discovery/common/watcher.go index 8c6310c28c67d..fd182b12418c1 100644 --- a/lib/srv/discovery/common/watcher.go +++ b/lib/srv/discovery/common/watcher.go @@ -63,6 +63,8 @@ type WatcherConfig struct { DiscoveryGroup string // Origin is used to specify what type of origin watcher's resources are Origin string + // PreFetchHookFn is called before starting a new fetch cycle. + PreFetchHookFn func() } // CheckAndSetDefaults validates the config. @@ -103,12 +105,12 @@ func NewWatcher(ctx context.Context, config WatcherConfig) (*Watcher, error) { if err := config.CheckAndSetDefaults(); err != nil { return nil, trace.Wrap(err) } - watcher := &Watcher{ + + return &Watcher{ cfg: config, ctx: ctx, resourcesC: make(chan types.ResourcesWithLabels), - } - return watcher, nil + }, nil } // Start starts fetching cloud resources and sending them to the channel. @@ -141,6 +143,10 @@ func (w *Watcher) Start() { // fetchAndSend fetches resources from all fetchers and sends them to the channel. func (w *Watcher) fetchAndSend() { + if w.cfg.PreFetchHookFn != nil { + w.cfg.PreFetchHookFn() + } + var ( newFetcherResources = make(types.ResourcesWithLabels, 0, 50) fetchersLock sync.Mutex diff --git a/lib/srv/discovery/common/watcher_test.go b/lib/srv/discovery/common/watcher_test.go index c831fec1e9b94..3f5828a204b9f 100644 --- a/lib/srv/discovery/common/watcher_test.go +++ b/lib/srv/discovery/common/watcher_test.go @@ -20,6 +20,7 @@ package common import ( "context" + "sync/atomic" "testing" "time" @@ -61,11 +62,15 @@ func TestWatcher(t *testing.T) { } clock := clockwork.NewFakeClock() + fetchIterations := atomic.Uint32{} watcher, err := NewWatcher(ctx, WatcherConfig{ FetchersFn: StaticFetchers([]Fetcher{appFetcher, noAuthFetcher, dbFetcher}), Interval: time.Hour, Clock: clock, Origin: types.OriginCloud, + PreFetchHookFn: func() { + fetchIterations.Add(1) + }, }) require.NoError(t, err) go watcher.Start() @@ -77,6 +82,8 @@ func TestWatcher(t *testing.T) { // Watcher should fetch again after interval. clock.Advance(time.Hour + time.Minute) assertFetchResources(t, watcher, wantResources) + + require.Equal(t, uint32(2), fetchIterations.Load()) } func TestWatcherWithDynamicFetchers(t *testing.T) { diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index 302859420ad92..19971903c9014 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -64,15 +64,20 @@ func (s *Server) startDatabaseWatchers() error { return trace.Wrap(err) } - watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{ - FetchersFn: s.getAllDatabaseFetchers, - Log: s.LegacyLogger.WithField("kind", types.KindDatabase), - DiscoveryGroup: s.DiscoveryGroup, - Interval: s.PollInterval, - TriggerFetchC: s.newDiscoveryConfigChangedSub(), - Origin: types.OriginCloud, - Clock: s.clock, - }) + watcher, err := common.NewWatcher(s.ctx, + common.WatcherConfig{ + FetchersFn: s.getAllDatabaseFetchers, + Log: s.LegacyLogger.WithField("kind", types.KindDatabase), + DiscoveryGroup: s.DiscoveryGroup, + Interval: s.PollInterval, + TriggerFetchC: s.newDiscoveryConfigChangedSub(), + Origin: types.OriginCloud, + Clock: s.clock, + PreFetchHookFn: func() { + s.awsRDSResourcesStatus.reset() + }, + }, + ) if err != nil { return trace.Wrap(err) } @@ -80,6 +85,9 @@ func (s *Server) startDatabaseWatchers() error { go func() { for { + discoveryConfigsChanged := map[string]struct{}{} + resourcesFoundByGroup := make(map[awsResourceGroup]int) + select { case newResources := <-watcher.ResourcesC(): dbs := make([]types.Database, 0, len(newResources)) @@ -89,21 +97,46 @@ func (s *Server) startDatabaseWatchers() error { continue } + resourceGroup := awsResourceGroupFromLabels(db.GetStaticLabels()) + resourcesFoundByGroup[resourceGroup] += 1 + discoveryConfigsChanged[resourceGroup.discoveryConfig] = struct{}{} + dbs = append(dbs, db) } mu.Lock() newDatabases = dbs mu.Unlock() + for group, count := range resourcesFoundByGroup { + s.awsRDSResourcesStatus.incrementFound(group, count) + } + if err := reconciler.Reconcile(s.ctx); err != nil { s.Log.WarnContext(s.ctx, "Unable to reconcile database resources", "error", err) - } else if s.onDatabaseReconcile != nil { + + // When reconcile fails, it is assumed that everything failed. + for group, count := range resourcesFoundByGroup { + s.awsRDSResourcesStatus.incrementFailed(group, count) + } + + break + } + + for group, count := range resourcesFoundByGroup { + s.awsRDSResourcesStatus.incrementEnrolled(group, count) + } + + if s.onDatabaseReconcile != nil { s.onDatabaseReconcile() } case <-s.ctx.Done(): return } + + for dc := range discoveryConfigsChanged { + s.updateDiscoveryConfigStatus(dc) + } } }() return nil diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index efaa28ff71d47..9c8104157d5c8 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -344,6 +344,8 @@ type Server struct { awsSyncStatus awsSyncStatus awsEC2ResourcesStatus awsResourcesStatus + awsRDSResourcesStatus awsResourcesStatus + awsEKSResourcesStatus awsResourcesStatus awsEC2Tasks awsEC2Tasks // caRotationCh receives nodes that need to have their CAs rotated. @@ -378,6 +380,9 @@ func New(ctx context.Context, cfg *Config) (*Server, error) { dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync), dynamicDiscoveryConfig: make(map[string]*discoveryconfig.DiscoveryConfig), awsSyncStatus: awsSyncStatus{}, + awsEC2ResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEC2), + awsRDSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherRDS), + awsEKSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEKS), } s.discardUnsupportedMatchers(&s.Matchers) diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index a4e379f63933e..7e67fdcc80539 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -1858,6 +1858,8 @@ func TestDiscoveryDatabase(t *testing.T) { awsRDSDBWithRole.SetAWSAssumeRole("arn:aws:iam::123456789012:role/test-role") awsRDSDBWithRole.SetAWSExternalID("test123") + eksAWSResource, _ := makeEKSCluster(t, "aws-eks", "us-east-1", rewriteDiscoveryLabelsParams{discoveryGroup: mainDiscoveryGroup, integration: integrationName, discoveryConfig: discoveryConfigName}) + matcherForDiscoveryConfigFn := func(t *testing.T, discoveryGroup string, m Matchers) *discoveryconfig.DiscoveryConfig { dc, err := discoveryconfig.NewDiscoveryConfig( header.Metadata{Name: discoveryConfigName}, @@ -1892,6 +1894,9 @@ func TestDiscoveryDatabase(t *testing.T) { &azure.ARMRedisEnterpriseClusterMock{}, &azure.ARMRedisEnterpriseDatabaseMock{}, ), + EKS: &mocks.EKSMock{ + Clusters: []*eks.Cluster{eksAWSResource}, + }, } tcs := []struct { @@ -1902,6 +1907,7 @@ func TestDiscoveryDatabase(t *testing.T) { azureMatchers []types.AzureMatcher expectDatabases []types.Database discoveryConfigs func(*testing.T) []*discoveryconfig.DiscoveryConfig + discoveryConfigStatusCheck func(*testing.T, discoveryconfig.Status) wantEvents int }{ { @@ -2100,6 +2106,12 @@ func TestDiscoveryDatabase(t *testing.T) { return []*discoveryconfig.DiscoveryConfig{dc1} }, wantEvents: 1, + discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) { + require.Equal(t, uint64(1), s.DiscoveredResources) + require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsRds.Enrolled) + require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsRds.Found) + require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsRds.Failed) + }, }, { name: "running in integrations-only-mode with a matcher without an integration, must find 1 database", @@ -2113,6 +2125,27 @@ func TestDiscoveryDatabase(t *testing.T) { expectDatabases: []types.Database{awsRedshiftDBWithIntegration}, wantEvents: 1, }, + { + name: "running in integrations-only-mode with a dynamic matcher with an integration, must find 1 eks cluster", + discoveryConfigs: func(t *testing.T) []*discoveryconfig.DiscoveryConfig { + dc1 := matcherForDiscoveryConfigFn(t, mainDiscoveryGroup, Matchers{ + AWS: []types.AWSMatcher{{ + Types: []string{types.AWSMatcherEKS}, + Tags: map[string]utils.Strings{types.Wildcard: {types.Wildcard}}, + Regions: []string{"us-east-1"}, + Integration: integrationName, + }}, + }) + return []*discoveryconfig.DiscoveryConfig{dc1} + }, + expectDatabases: []types.Database{}, + wantEvents: 0, + discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) { + require.Equal(t, uint64(1), s.DiscoveredResources) + require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsEks.Found) + require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsEks.Enrolled) + }, + }, } for _, tc := range tcs { @@ -2213,6 +2246,13 @@ func TestDiscoveryDatabase(t *testing.T) { return reporter.ResourceCreateEventCount() != 0 }, time.Second, 100*time.Millisecond) } + + if tc.discoveryConfigStatusCheck != nil { + dc, err := tlsServer.Auth().GetDiscoveryConfig(ctx, discoveryConfigName) + require.NoError(t, err) + + tc.discoveryConfigStatusCheck(t, dc.Status) + } }) } } @@ -2375,6 +2415,23 @@ func TestDiscoveryDatabaseRemovingDiscoveryConfigs(t *testing.T) { }) } +func makeEKSCluster(t *testing.T, name, region string, discoveryParams rewriteDiscoveryLabelsParams) (*eks.Cluster, types.KubeCluster) { + t.Helper() + eksAWSCluster := &eks.Cluster{ + Name: aws.String(name), + Arn: aws.String(fmt.Sprintf("arn:aws:eks:%s:123456789012:cluster/%s", region, name)), + Status: aws.String(eks.ClusterStatusActive), + Tags: map[string]*string{ + "env": aws.String("prod"), + }, + } + actual, err := common.NewKubeClusterFromAWSEKS(aws.StringValue(eksAWSCluster.Name), aws.StringValue(eksAWSCluster.Arn), eksAWSCluster.Tags) + require.NoError(t, err) + discoveryParams.matcherType = types.AWSMatcherEKS + rewriteCloudResource(t, actual, discoveryParams) + return eksAWSCluster, actual +} + func makeRDSInstance(t *testing.T, name, region string, discoveryParams rewriteDiscoveryLabelsParams) (*rds.DBInstance, types.Database) { instance := &rds.DBInstance{ DBInstanceArn: aws.String(fmt.Sprintf("arn:aws:rds:%v:123456789012:db:%v", region, name)), diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go index d8efaceda4bf8..444565bb6a299 100644 --- a/lib/srv/discovery/kube_integration_watcher.go +++ b/lib/srv/discovery/kube_integration_watcher.go @@ -72,6 +72,10 @@ func (s *Server) startKubeIntegrationWatchers() error { DiscoveryGroup: s.DiscoveryGroup, Interval: s.PollInterval, Origin: types.OriginCloud, + TriggerFetchC: s.newDiscoveryConfigChangedSub(), + PreFetchHookFn: func() { + s.awsEKSResourcesStatus.reset() + }, }) if err != nil { return trace.Wrap(err) @@ -80,6 +84,10 @@ func (s *Server) startKubeIntegrationWatchers() error { go func() { for { + discoveryConfigsChanged := map[string]struct{}{} + resourcesFoundByGroup := make(map[awsResourceGroup]int) + resourcesEnrolledByGroup := make(map[awsResourceGroup]int) + select { case resources := <-watcher.ResourcesC(): if len(resources) == 0 { @@ -98,15 +106,29 @@ func (s *Server) startKubeIntegrationWatchers() error { continue } + agentVersion, err := s.getKubeAgentVersion(releaseChannels) + if err != nil { + s.Log.WarnContext(s.ctx, "Could not get agent version to enroll EKS clusters", "error", err) + continue + } + var newClusters []types.DiscoveredEKSCluster mu.Lock() for _, r := range resources { newCluster, ok := r.(types.DiscoveredEKSCluster) - if !ok || - enrollingClusters[newCluster.GetAWSConfig().Name] || + if !ok { + continue + } + + resourceGroup := awsResourceGroupFromLabels(newCluster.GetStaticLabels()) + resourcesFoundByGroup[resourceGroup] += 1 + discoveryConfigsChanged[resourceGroup.discoveryConfig] = struct{}{} + + if enrollingClusters[newCluster.GetAWSConfig().Name] || slices.ContainsFunc(existingServers, func(c types.KubeServer) bool { return c.GetName() == newCluster.GetName() }) || slices.ContainsFunc(existingClusters, func(c types.KubeCluster) bool { return c.GetName() == newCluster.GetName() }) { + resourcesEnrolledByGroup[resourceGroup] += 1 continue } @@ -114,26 +136,26 @@ func (s *Server) startKubeIntegrationWatchers() error { } mu.Unlock() - if len(newClusters) == 0 { - continue + for group, count := range resourcesFoundByGroup { + s.awsEKSResourcesStatus.incrementFound(group, count) } - agentVersion, err := s.getKubeAgentVersion(releaseChannels) - if err != nil { - s.Log.WarnContext(s.ctx, "Could not get agent version to enroll EKS clusters", "error", err) - continue + if len(newClusters) == 0 { + break } // When enrolling EKS clusters, client for enrollment depends on the region and integration used. type regionIntegrationMapKey struct { - region string - integration string + region string + integration string + discoveryConfig string } clustersByRegionAndIntegration := map[regionIntegrationMapKey][]types.DiscoveredEKSCluster{} for _, c := range newClusters { mapKey := regionIntegrationMapKey{ - region: c.GetAWSConfig().Region, - integration: c.GetIntegration(), + region: c.GetAWSConfig().Region, + integration: c.GetIntegration(), + discoveryConfig: c.GetStaticLabels()[types.TeleportInternalDiscoveryConfigName], } clustersByRegionAndIntegration[mapKey] = append(clustersByRegionAndIntegration[mapKey], c) @@ -141,18 +163,26 @@ func (s *Server) startKubeIntegrationWatchers() error { for key, val := range clustersByRegionAndIntegration { key, val := key, val - go s.enrollEKSClusters(key.region, key.integration, val, agentVersion, &mu, enrollingClusters) + go s.enrollEKSClusters(key.region, key.integration, key.discoveryConfig, val, agentVersion, &mu, enrollingClusters) } case <-s.ctx.Done(): return } + + for group, count := range resourcesEnrolledByGroup { + s.awsEKSResourcesStatus.incrementEnrolled(group, count) + } + + for dc := range discoveryConfigsChanged { + s.updateDiscoveryConfigStatus(dc) + } } }() return nil } -func (s *Server) enrollEKSClusters(region, integration string, clusters []types.DiscoveredEKSCluster, agentVersion string, mu *sync.Mutex, enrollingClusters map[string]bool) { +func (s *Server) enrollEKSClusters(region, integration, discoveryConfig string, clusters []types.DiscoveredEKSCluster, agentVersion string, mu *sync.Mutex, enrollingClusters map[string]bool) { mu.Lock() for _, c := range clusters { if _, ok := enrollingClusters[c.GetAWSConfig().Name]; !ok { @@ -167,6 +197,8 @@ func (s *Server) enrollEKSClusters(region, integration string, clusters []types. delete(enrollingClusters, c.GetAWSConfig().Name) } mu.Unlock() + + s.updateDiscoveryConfigStatus(discoveryConfig) }() // We sort input clusters into two batches - one that has Kubernetes App Discovery @@ -195,12 +227,20 @@ func (s *Server) enrollEKSClusters(region, integration string, clusters []types. AgentVersion: agentVersion, }) if err != nil { + s.awsEKSResourcesStatus.incrementFailed(awsResourceGroup{ + discoveryConfig: discoveryConfig, + integration: integration, + }, len(clusterNames)) s.Log.ErrorContext(ctx, "Failed to enroll EKS clusters", "cluster_names", clusterNames, "error", err) continue } for _, r := range rsp.Results { if r.Error != "" { + s.awsEKSResourcesStatus.incrementFailed(awsResourceGroup{ + discoveryConfig: discoveryConfig, + integration: integration, + }, 1) if !strings.Contains(r.Error, "teleport-kube-agent is already installed on the cluster") { s.Log.ErrorContext(ctx, "Failed to enroll EKS cluster", "cluster_name", r.EksClusterName, "error", err) } else { diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index 460053ff47daf..ebf1c0b32ee92 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -43,6 +43,8 @@ import ( // The status will be updated with the following matchers: // - AWS Sync (TAG) status // - AWS EC2 Auto Discover status +// - AWS RDS Auto Discover status +// - AWS EKS Auto Discover status func (s *Server) updateDiscoveryConfigStatus(discoveryConfigName string) { // Static configurations (ie those in `teleport.yaml/discovery_config..matchers`) do not have a DiscoveryConfig resource. // Those are discarded because there's no Status to update. @@ -60,7 +62,13 @@ func (s *Server) updateDiscoveryConfigStatus(discoveryConfigName string) { discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) // Merge AWS EC2 Instances (auto discovery) status - discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeEC2IntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + + // Merge AWS RDS databases (auto discovery) status + discoveryConfigStatus = s.awsRDSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + + // Merge AWS EKS clusters (auto discovery) status + discoveryConfigStatus = s.awsEKSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) defer cancel() @@ -196,11 +204,18 @@ func (d *awsSyncStatus) mergeIntoGlobalStatus(discoveryConfigName string, existi return existingStatus } +func newAWSResourceStatusCollector(resourceType string) awsResourcesStatus { + return awsResourcesStatus{ + resourceType: resourceType, + } +} + // awsResourcesStatus contains all the status for AWS Matchers grouped by DiscoveryConfig for a specific matcher type. type awsResourcesStatus struct { mu sync.RWMutex // awsResourcesResults maps the DiscoveryConfig name and integration to a summary of discovered/enrolled resources. awsResourcesResults map[awsResourceGroup]awsResourceGroupResult + resourceType string } // awsResourceGroup is the key for the summary @@ -209,6 +224,13 @@ type awsResourceGroup struct { integration string } +func awsResourceGroupFromLabels(labels map[string]string) awsResourceGroup { + return awsResourceGroup{ + discoveryConfig: labels[types.TeleportInternalDiscoveryConfigName], + integration: labels[types.TeleportInternalDiscoveryIntegrationName], + } +} + // awsResourceGroupResult stores the result of the aws_sync Matchers for a given DiscoveryConfig. type awsResourceGroupResult struct { found int @@ -223,7 +245,7 @@ func (d *awsResourcesStatus) reset() { d.awsResourcesResults = make(map[awsResourceGroup]awsResourceGroupResult) } -func (ars *awsResourcesStatus) mergeEC2IntoGlobalStatus(discoveryConfigName string, existingStatus discoveryconfig.Status) discoveryconfig.Status { +func (ars *awsResourcesStatus) mergeIntoGlobalStatus(discoveryConfigName string, existingStatus discoveryconfig.Status) discoveryconfig.Status { ars.mu.RLock() defer ars.mu.RUnlock() @@ -235,16 +257,26 @@ func (ars *awsResourcesStatus) mergeEC2IntoGlobalStatus(discoveryConfigName stri // Update global discovered resources count. existingStatus.DiscoveredResources = existingStatus.DiscoveredResources + uint64(groupResult.found) - // Update counters specific to AWS EC2 resources discovered. + // Update counters specific to AWS resources discovered. existingIntegrationResources, ok := existingStatus.IntegrationDiscoveredResources[group.integration] if !ok { existingIntegrationResources = &discoveryconfigv1.IntegrationDiscoveredSummary{} } - existingIntegrationResources.AwsEc2 = &discoveryconfigv1.ResourcesDiscoveredSummary{ + + resourcesSummary := &discoveryconfigv1.ResourcesDiscoveredSummary{ Found: uint64(groupResult.found), Enrolled: uint64(groupResult.enrolled), Failed: uint64(groupResult.failed), } + + switch ars.resourceType { + case types.AWSMatcherEC2: + existingIntegrationResources.AwsEc2 = resourcesSummary + case types.AWSMatcherRDS: + existingIntegrationResources.AwsRds = resourcesSummary + case types.AWSMatcherEKS: + existingIntegrationResources.AwsEks = resourcesSummary + } existingStatus.IntegrationDiscoveredResources[group.integration] = existingIntegrationResources }