From d292bd27ebee2fa608cfab4e324ad1cd38e3070c Mon Sep 17 00:00:00 2001 From: Marco Dinis Date: Fri, 29 Nov 2024 17:15:26 +0000 Subject: [PATCH] DiscoveryConfig Status: update even when no resources are found The DiscoveryService was not updating the DiscoveryConfigStatus when its matchers didn't discovered any resource. This would lead the user to think that there the DiscoveryConfig was not being processed, when in fact it was. --- lib/srv/discovery/database_watcher.go | 9 +++ lib/srv/discovery/discovery.go | 66 ++++++++++++----- lib/srv/discovery/discovery_test.go | 22 ++++++ lib/srv/discovery/kube_integration_watcher.go | 9 +++ lib/srv/discovery/status.go | 56 ++++++++------- lib/srv/server/azure_watcher.go | 10 ++- lib/srv/server/azure_watcher_test.go | 2 +- lib/srv/server/ec2_watcher.go | 5 ++ lib/srv/server/gcp_watcher.go | 14 ++-- lib/srv/server/watcher.go | 3 + lib/utils/slices/slices.go | 40 +++++++++++ lib/utils/slices/slices_test.go | 72 +++++++++++++++++++ 12 files changed, 257 insertions(+), 51 deletions(-) create mode 100644 lib/utils/slices/slices.go create mode 100644 lib/utils/slices/slices_test.go diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index 19971903c9014..b64fa502a6864 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -30,6 +30,7 @@ import ( "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/srv/discovery/common" "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/slices" ) const databaseEventPrefix = "db/" @@ -74,6 +75,14 @@ func (s *Server) startDatabaseWatchers() error { Origin: types.OriginCloud, Clock: s.clock, PreFetchHookFn: func() { + discoveryConfigs := slices.CollectValues( + s.getAllDatabaseFetchers(), + func(f common.Fetcher) (s string, skip bool) { + return f.DiscoveryConfigName(), f.DiscoveryConfigName() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + s.awsRDSResourcesStatus.reset() }, }, diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 2f8c4d097b845..3ff4faf774713 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -66,11 +66,14 @@ import ( "github.com/gravitational/teleport/lib/srv/discovery/fetchers/db" "github.com/gravitational/teleport/lib/srv/server" logutils "github.com/gravitational/teleport/lib/utils/log" + libslices "github.com/gravitational/teleport/lib/utils/slices" "github.com/gravitational/teleport/lib/utils/spreadwork" ) var errNoInstances = errors.New("all fetched nodes already enrolled") +const noDiscoveryConfig = "" + // Matchers contains all matchers used by discovery service type Matchers struct { // AWS is a list of AWS EC2 matchers. @@ -423,7 +426,7 @@ func New(ctx context.Context, cfg *Config) (*Server, error) { return nil, trace.Wrap(err) } - databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers, "" /* discovery config */) + databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers, noDiscoveryConfig) if err != nil { return nil, trace.Wrap(err) } @@ -433,11 +436,11 @@ func New(ctx context.Context, cfg *Config) (*Server, error) { return nil, trace.Wrap(err) } - if err := s.initAzureWatchers(s.ctx, cfg.Matchers.Azure); err != nil { + if err := s.initAzureWatchers(s.ctx, cfg.Matchers.Azure, noDiscoveryConfig); err != nil { return nil, trace.Wrap(err) } - if err := s.initGCPWatchers(s.ctx, cfg.Matchers.GCP); err != nil { + if err := s.initGCPWatchers(s.ctx, cfg.Matchers.GCP, noDiscoveryConfig); err != nil { return nil, trace.Wrap(err) } @@ -502,7 +505,6 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { return matcherType == types.AWSMatcherEC2 }) - const noDiscoveryConfig = "" s.staticServerAWSFetchers, err = server.MatchersToEC2InstanceFetchers(s.ctx, ec2Matchers, s.GetEC2Client, noDiscoveryConfig) if err != nil { return trace.Wrap(err) @@ -513,6 +515,14 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { server.WithPollInterval(s.PollInterval), server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()), server.WithPreFetchHookFn(func() { + discoveryConfigs := libslices.CollectValues( + s.getAllAWSServerFetchers(), + func(f server.Fetcher) (s string, skip bool) { + return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + s.awsEC2ResourcesStatus.reset() s.awsEC2Tasks.reset() }), @@ -547,7 +557,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { _, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType) // Add non-integration kube fetchers. - kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers, "" /* discovery config */) + kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers, noDiscoveryConfig) if err != nil { return trace.Wrap(err) } @@ -602,16 +612,16 @@ func (s *Server) awsServerFetchersFromMatchers(ctx context.Context, matchers []t } // azureServerFetchersFromMatchers converts Matchers into a set of Azure Servers Fetchers. -func (s *Server) azureServerFetchersFromMatchers(matchers []types.AzureMatcher) []server.Fetcher { +func (s *Server) azureServerFetchersFromMatchers(matchers []types.AzureMatcher, discoveryConfig string) []server.Fetcher { serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool { return matcherType == types.AzureMatcherVM }) - return server.MatchersToAzureInstanceFetchers(serverMatchers, s.CloudClients) + return server.MatchersToAzureInstanceFetchers(serverMatchers, s.CloudClients, discoveryConfig) } // gcpServerFetchersFromMatchers converts Matchers into a set of GCP Servers Fetchers. -func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []types.GCPMatcher) ([]server.Fetcher, error) { +func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []types.GCPMatcher, discoveryConfig string) ([]server.Fetcher, error) { serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool { return matcherType == types.GCPMatcherCompute }) @@ -632,7 +642,7 @@ func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []t return nil, trace.Wrap(err) } - return server.MatchersToGCPInstanceFetchers(serverMatchers, client, projectsClient), nil + return server.MatchersToGCPInstanceFetchers(serverMatchers, client, projectsClient, discoveryConfig), nil } // databaseFetchersFromMatchers converts Matchers into a set of Database Fetchers. @@ -686,12 +696,12 @@ func (s *Server) kubeFetchersFromMatchers(matchers Matchers, discoveryConfig str } // initAzureWatchers starts Azure resource watchers based on types provided. -func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMatcher) error { +func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMatcher, discoveryConfig string) error { vmMatchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool { return matcherType == types.AzureMatcherVM }) - s.staticServerAzureFetchers = server.MatchersToAzureInstanceFetchers(vmMatchers, s.CloudClients) + s.staticServerAzureFetchers = server.MatchersToAzureInstanceFetchers(vmMatchers, s.CloudClients, discoveryConfig) // VM watcher. var err error @@ -699,6 +709,15 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa s.ctx, s.getAllAzureServerFetchers, server.WithPollInterval(s.PollInterval), server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()), + server.WithPreFetchHookFn(func() { + discoveryConfigs := libslices.CollectValues( + s.getAllAzureServerFetchers(), + func(f server.Fetcher) (s string, skip bool) { + return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + }), ) if err != nil { return trace.Wrap(err) @@ -744,10 +763,10 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa return nil } -func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GCPMatcher) error { +func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GCPMatcher, discoveryConfig string) error { var err error - s.staticServerGCPFetchers, err = s.gcpServerFetchersFromMatchers(ctx, vmMatchers) + s.staticServerGCPFetchers, err = s.gcpServerFetchersFromMatchers(ctx, vmMatchers, discoveryConfig) if err != nil { return trace.Wrap(err) } @@ -756,6 +775,15 @@ func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GC s.ctx, s.getAllGCPServerFetchers, server.WithPollInterval(s.PollInterval), server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()), + server.WithPreFetchHookFn(func() { + discoveryConfigs := libslices.CollectValues( + s.getAllGCPServerFetchers(), + func(f server.Fetcher) (s string, skip bool) { + return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + }), ) if err != nil { return trace.Wrap(err) @@ -771,7 +799,7 @@ func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GC } // initGCPWatchers starts GCP resource watchers based on types provided. -func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatcher) error { +func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatcher, discoveryConfig string) error { // return early if there are no matchers as GetGCPGKEClient causes // an error if there are no credentials present @@ -779,7 +807,7 @@ func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatche return matcherType == types.GCPMatcherCompute }) - if err := s.initGCPServerWatcher(ctx, vmMatchers); err != nil { + if err := s.initGCPServerWatcher(ctx, vmMatchers, discoveryConfig); err != nil { return trace.Wrap(err) } @@ -1606,7 +1634,9 @@ func (s *Server) startDynamicWatcherUpdater() { s.Log.WarnContext(s.ctx, "Skipping unknown event type %s", "got", event.Type) } case <-s.dynamicMatcherWatcher.Done(): - s.Log.WarnContext(s.ctx, "Dynamic matcher watcher error", "error", s.dynamicMatcherWatcher.Error()) + if err := s.dynamicMatcherWatcher.Error(); err != nil { + s.Log.WarnContext(s.ctx, "Dynamic matcher watcher error", "error", err) + } return } } @@ -1681,12 +1711,12 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig. s.dynamicServerAWSFetchers[dc.GetName()] = awsServerFetchers s.muDynamicServerAWSFetchers.Unlock() - azureServerFetchers := s.azureServerFetchersFromMatchers(matchers.Azure) + azureServerFetchers := s.azureServerFetchersFromMatchers(matchers.Azure, dc.GetName()) s.muDynamicServerAzureFetchers.Lock() s.dynamicServerAzureFetchers[dc.GetName()] = azureServerFetchers s.muDynamicServerAzureFetchers.Unlock() - gcpServerFetchers, err := s.gcpServerFetchersFromMatchers(s.ctx, matchers.GCP) + gcpServerFetchers, err := s.gcpServerFetchersFromMatchers(s.ctx, matchers.GCP, dc.GetName()) if err != nil { return trace.Wrap(err) } diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 8c9c08b0aff95..ceca9098339db 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -1898,6 +1898,7 @@ func TestDiscoveryDatabase(t *testing.T) { {Engine: aws.String(services.RDSEnginePostgres)}, }, }, + MemoryDB: &mocks.MemoryDBMock{}, Redshift: &mocks.RedshiftMock{ Clusters: []*redshift.Cluster{awsRedshiftResource}, }, @@ -2160,6 +2161,27 @@ func TestDiscoveryDatabase(t *testing.T) { require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsEks.Enrolled) }, }, + { + name: "discovery config status must be updated even when there are no resources", + discoveryConfigs: func(t *testing.T) []*discoveryconfig.DiscoveryConfig { + dc1 := matcherForDiscoveryConfigFn(t, mainDiscoveryGroup, Matchers{ + AWS: []types.AWSMatcher{{ + // MemoryDB mock client returns no resources. + Types: []string{types.AWSMatcherMemoryDB}, + Tags: map[string]utils.Strings{types.Wildcard: {types.Wildcard}}, + Regions: []string{"us-east-1"}, + Integration: integrationName, + }}, + }) + return []*discoveryconfig.DiscoveryConfig{dc1} + }, + expectDatabases: []types.Database{}, + wantEvents: 0, + discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) { + require.Equal(t, uint64(0), s.DiscoveredResources) + require.Equal(t, "DISCOVERY_CONFIG_STATE_SYNCING", s.State) + }, + }, } for _, tc := range tcs { diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go index 444565bb6a299..db786d8c22c8a 100644 --- a/lib/srv/discovery/kube_integration_watcher.go +++ b/lib/srv/discovery/kube_integration_watcher.go @@ -33,6 +33,7 @@ import ( "github.com/gravitational/teleport/lib/automaticupgrades" kubeutils "github.com/gravitational/teleport/lib/kube/utils" "github.com/gravitational/teleport/lib/srv/discovery/common" + libslices "github.com/gravitational/teleport/lib/utils/slices" ) // startKubeIntegrationWatchers starts kube watchers that use integration for the credentials. Currently only @@ -74,6 +75,14 @@ func (s *Server) startKubeIntegrationWatchers() error { Origin: types.OriginCloud, TriggerFetchC: s.newDiscoveryConfigChangedSub(), PreFetchHookFn: func() { + discoveryConfigs := libslices.CollectValues( + s.getKubeIntegrationFetchers(), + func(f common.Fetcher) (s string, skip bool) { + return f.DiscoveryConfigName(), f.DiscoveryConfigName() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + s.awsEKSResourcesStatus.reset() }, }) diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index ebf1c0b32ee92..83c4d5877e07d 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -45,40 +45,42 @@ import ( // - AWS EC2 Auto Discover status // - AWS RDS Auto Discover status // - AWS EKS Auto Discover status -func (s *Server) updateDiscoveryConfigStatus(discoveryConfigName string) { - // Static configurations (ie those in `teleport.yaml/discovery_config..matchers`) do not have a DiscoveryConfig resource. - // Those are discarded because there's no Status to update. - if discoveryConfigName == "" { - return - } +func (s *Server) updateDiscoveryConfigStatus(discoveryConfigNames ...string) { + for _, discoveryConfigName := range discoveryConfigNames { + // Static configurations (ie those in `teleport.yaml/discovery_config..matchers`) do not have a DiscoveryConfig resource. + // Those are discarded because there's no Status to update. + if discoveryConfigName == "" { + return + } - discoveryConfigStatus := discoveryconfig.Status{ - State: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(), - LastSyncTime: s.clock.Now(), - IntegrationDiscoveredResources: make(map[string]*discoveryconfigv1.IntegrationDiscoveredSummary), - } + discoveryConfigStatus := discoveryconfig.Status{ + State: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(), + LastSyncTime: s.clock.Now(), + IntegrationDiscoveredResources: make(map[string]*discoveryconfigv1.IntegrationDiscoveredSummary), + } - // Merge AWS Sync (TAG) status - discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS Sync (TAG) status + discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) - // Merge AWS EC2 Instances (auto discovery) status - discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS EC2 Instances (auto discovery) status + discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) - // Merge AWS RDS databases (auto discovery) status - discoveryConfigStatus = s.awsRDSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS RDS databases (auto discovery) status + discoveryConfigStatus = s.awsRDSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) - // Merge AWS EKS clusters (auto discovery) status - discoveryConfigStatus = s.awsEKSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS EKS clusters (auto discovery) status + discoveryConfigStatus = s.awsEKSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) - ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) + defer cancel() - _, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, discoveryConfigName, discoveryConfigStatus) - switch { - case trace.IsNotImplemented(err): - s.Log.WarnContext(ctx, "UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.") - case err != nil: - s.Log.InfoContext(ctx, "Error updating discovery config status", "discovery_config_name", discoveryConfigName, "error", err) + _, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, discoveryConfigName, discoveryConfigStatus) + switch { + case trace.IsNotImplemented(err): + s.Log.WarnContext(ctx, "UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.") + case err != nil: + s.Log.InfoContext(ctx, "Error updating discovery config status", "discovery_config_name", discoveryConfigName, "error", err) + } } } diff --git a/lib/srv/server/azure_watcher.go b/lib/srv/server/azure_watcher.go index 4339bfc713713..33ac1dd532f49 100644 --- a/lib/srv/server/azure_watcher.go +++ b/lib/srv/server/azure_watcher.go @@ -97,7 +97,7 @@ func NewAzureWatcher(ctx context.Context, fetchersFn func() []Fetcher, opts ...O } // MatchersToAzureInstanceFetchers converts a list of Azure VM Matchers into a list of Azure VM Fetchers. -func MatchersToAzureInstanceFetchers(matchers []types.AzureMatcher, clients azureClientGetter) []Fetcher { +func MatchersToAzureInstanceFetchers(matchers []types.AzureMatcher, clients azureClientGetter, discoveryConfig string) []Fetcher { ret := make([]Fetcher, 0) for _, matcher := range matchers { for _, subscription := range matcher.Subscriptions { @@ -107,6 +107,7 @@ func MatchersToAzureInstanceFetchers(matchers []types.AzureMatcher, clients azur Subscription: subscription, ResourceGroup: resourceGroup, AzureClientGetter: clients, + DiscoveryConfig: discoveryConfig, }) ret = append(ret, fetcher) } @@ -120,6 +121,7 @@ type azureFetcherConfig struct { Subscription string ResourceGroup string AzureClientGetter azureClientGetter + DiscoveryConfig string } type azureInstanceFetcher struct { @@ -130,6 +132,7 @@ type azureInstanceFetcher struct { Labels types.Labels Parameters map[string]string ClientID string + DiscoveryConfig string } func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher { @@ -139,6 +142,7 @@ func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher { Subscription: cfg.Subscription, ResourceGroup: cfg.ResourceGroup, Labels: cfg.Matcher.ResourceTags, + DiscoveryConfig: cfg.DiscoveryConfig, } if cfg.Matcher.Params != nil { @@ -157,6 +161,10 @@ func (*azureInstanceFetcher) GetMatchingInstances(_ []types.Server, _ bool) ([]I return nil, trace.NotImplemented("not implemented for azure fetchers") } +func (f *azureInstanceFetcher) GetDiscoveryConfig() string { + return f.DiscoveryConfig +} + // GetInstances fetches all Azure virtual machines matching configured filters. func (f *azureInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) { client, err := f.AzureClientGetter.GetAzureVirtualMachinesClient(f.Subscription) diff --git a/lib/srv/server/azure_watcher_test.go b/lib/srv/server/azure_watcher_test.go index ad507911c6882..1b656205f97ad 100644 --- a/lib/srv/server/azure_watcher_test.go +++ b/lib/srv/server/azure_watcher_test.go @@ -146,7 +146,7 @@ func TestAzureWatcher(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) watcher, err := NewAzureWatcher(ctx, func() []Fetcher { - return MatchersToAzureInstanceFetchers([]types.AzureMatcher{tc.matcher}, &clients) + return MatchersToAzureInstanceFetchers([]types.AzureMatcher{tc.matcher}, &clients, "" /* discovery config */) }) require.NoError(t, err) diff --git a/lib/srv/server/ec2_watcher.go b/lib/srv/server/ec2_watcher.go index 4c6300e7bf661..c7ceb13b92cc0 100644 --- a/lib/srv/server/ec2_watcher.go +++ b/lib/srv/server/ec2_watcher.go @@ -456,3 +456,8 @@ func (f *ec2InstanceFetcher) GetInstances(ctx context.Context, rotation bool) ([ return instances, nil } + +// GetDiscoveryConfig returns the discovery config name that created this fetcher. +func (f *ec2InstanceFetcher) GetDiscoveryConfig() string { + return f.DiscoveryConfig +} diff --git a/lib/srv/server/gcp_watcher.go b/lib/srv/server/gcp_watcher.go index 466a85e29fe3b..18f11ee9fe9c6 100644 --- a/lib/srv/server/gcp_watcher.go +++ b/lib/srv/server/gcp_watcher.go @@ -91,7 +91,7 @@ func NewGCPWatcher(ctx context.Context, fetchersFn func() []Fetcher, opts ...Opt } // MatchersToGCPInstanceFetchers converts a list of GCP GCE Matchers into a list of GCP GCE Fetchers. -func MatchersToGCPInstanceFetchers(matchers []types.GCPMatcher, gcpClient gcp.InstancesClient, projectsClient gcp.ProjectsClient) []Fetcher { +func MatchersToGCPInstanceFetchers(matchers []types.GCPMatcher, gcpClient gcp.InstancesClient, projectsClient gcp.ProjectsClient, discoveryConfig string) []Fetcher { fetchers := make([]Fetcher, 0, len(matchers)) for _, matcher := range matchers { @@ -106,9 +106,10 @@ func MatchersToGCPInstanceFetchers(matchers []types.GCPMatcher, gcpClient gcp.In } type gcpFetcherConfig struct { - Matcher types.GCPMatcher - GCPClient gcp.InstancesClient - projectsClient gcp.ProjectsClient + Matcher types.GCPMatcher + GCPClient gcp.InstancesClient + projectsClient gcp.ProjectsClient + DiscoveryConfig string } type gcpInstanceFetcher struct { @@ -120,6 +121,7 @@ type gcpInstanceFetcher struct { Labels types.Labels Parameters map[string]string projectsClient gcp.ProjectsClient + DiscoveryConfig string } func newGCPInstanceFetcher(cfg gcpFetcherConfig) *gcpInstanceFetcher { @@ -145,6 +147,10 @@ func (*gcpInstanceFetcher) GetMatchingInstances(_ []types.Server, _ bool) ([]Ins return nil, trace.NotImplemented("not implemented for gcp fetchers") } +func (f *gcpInstanceFetcher) GetDiscoveryConfig() string { + return f.DiscoveryConfig +} + // GetInstances fetches all GCP virtual machines matching configured filters. func (f *gcpInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) { // Key by project ID, then by zone. diff --git a/lib/srv/server/watcher.go b/lib/srv/server/watcher.go index 437fb90fed660..bb34389a3db4e 100644 --- a/lib/srv/server/watcher.go +++ b/lib/srv/server/watcher.go @@ -42,6 +42,9 @@ type Fetcher interface { // GetMatchingInstances finds Instances from the list of nodes // that the fetcher matches. GetMatchingInstances(nodes []types.Server, rotation bool) ([]Instances, error) + // GetDiscoveryConfig returns the DiscoveryConfig name that created this fetcher. + // Empty for Fetchers created from `teleport.yaml/discovery_service.aws.` matchers. + GetDiscoveryConfig() string } // WithTriggerFetchC sets a poll trigger to manual start a resource polling. diff --git a/lib/utils/slices/slices.go b/lib/utils/slices/slices.go new file mode 100644 index 0000000000000..16b00922b2856 --- /dev/null +++ b/lib/utils/slices/slices.go @@ -0,0 +1,40 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package slices + +import ( + "cmp" + "slices" +) + +// CollectValues applies a function to all elements of a slice and collects them. +// The function returns the value to collect and whether the current element should be skipped. +// Returned values are sorted and deduplicated. +func CollectValues[T any, S cmp.Ordered](ts []T, fn func(T) (s S, skip bool)) []S { + ss := make([]S, 0, len(ts)) + for _, t := range ts { + s, skip := fn(t) + if skip { + continue + } + ss = append(ss, s) + } + slices.Sort(ss) + return slices.Compact(ss) +} diff --git a/lib/utils/slices/slices_test.go b/lib/utils/slices/slices_test.go new file mode 100644 index 0000000000000..d84f4e13435ca --- /dev/null +++ b/lib/utils/slices/slices_test.go @@ -0,0 +1,72 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package slices + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCollectValues(t *testing.T) { + for _, tt := range []struct { + name string + input []string + collector func(string) (s string, skip bool) + expected []string + }{ + { + name: "no elements", + input: []string{}, + collector: func(in string) (s string, skip bool) { + return in, false + }, + expected: []string{}, + }, + { + name: "multiple strings, all match", + input: []string{"x", "y"}, + collector: func(in string) (s string, skip bool) { + return in, false + }, + expected: []string{"x", "y"}, + }, + { + name: "deduplicates items", + input: []string{"x", "y", "z", "x"}, + collector: func(in string) (s string, skip bool) { + return in, false + }, + expected: []string{"x", "y", "z"}, + }, + { + name: "skipped values are not returned", + input: []string{"x", "y", "z", ""}, + collector: func(in string) (s string, skip bool) { + return in, in == "" + }, + expected: []string{"x", "y", "z"}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + got := CollectValues(tt.input, tt.collector) + require.Equal(t, tt.expected, got) + }) + } +}