diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index 19971903c9014..b64fa502a6864 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -30,6 +30,7 @@ import ( "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/srv/discovery/common" "github.com/gravitational/teleport/lib/utils" + "github.com/gravitational/teleport/lib/utils/slices" ) const databaseEventPrefix = "db/" @@ -74,6 +75,14 @@ func (s *Server) startDatabaseWatchers() error { Origin: types.OriginCloud, Clock: s.clock, PreFetchHookFn: func() { + discoveryConfigs := slices.CollectValues( + s.getAllDatabaseFetchers(), + func(f common.Fetcher) (s string, skip bool) { + return f.DiscoveryConfigName(), f.DiscoveryConfigName() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + s.awsRDSResourcesStatus.reset() }, }, diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index 2f8c4d097b845..3ff4faf774713 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -66,11 +66,14 @@ import ( "github.com/gravitational/teleport/lib/srv/discovery/fetchers/db" "github.com/gravitational/teleport/lib/srv/server" logutils "github.com/gravitational/teleport/lib/utils/log" + libslices "github.com/gravitational/teleport/lib/utils/slices" "github.com/gravitational/teleport/lib/utils/spreadwork" ) var errNoInstances = errors.New("all fetched nodes already enrolled") +const noDiscoveryConfig = "" + // Matchers contains all matchers used by discovery service type Matchers struct { // AWS is a list of AWS EC2 matchers. @@ -423,7 +426,7 @@ func New(ctx context.Context, cfg *Config) (*Server, error) { return nil, trace.Wrap(err) } - databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers, "" /* discovery config */) + databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers, noDiscoveryConfig) if err != nil { return nil, trace.Wrap(err) } @@ -433,11 +436,11 @@ func New(ctx context.Context, cfg *Config) (*Server, error) { return nil, trace.Wrap(err) } - if err := s.initAzureWatchers(s.ctx, cfg.Matchers.Azure); err != nil { + if err := s.initAzureWatchers(s.ctx, cfg.Matchers.Azure, noDiscoveryConfig); err != nil { return nil, trace.Wrap(err) } - if err := s.initGCPWatchers(s.ctx, cfg.Matchers.GCP); err != nil { + if err := s.initGCPWatchers(s.ctx, cfg.Matchers.GCP, noDiscoveryConfig); err != nil { return nil, trace.Wrap(err) } @@ -502,7 +505,6 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { return matcherType == types.AWSMatcherEC2 }) - const noDiscoveryConfig = "" s.staticServerAWSFetchers, err = server.MatchersToEC2InstanceFetchers(s.ctx, ec2Matchers, s.GetEC2Client, noDiscoveryConfig) if err != nil { return trace.Wrap(err) @@ -513,6 +515,14 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { server.WithPollInterval(s.PollInterval), server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()), server.WithPreFetchHookFn(func() { + discoveryConfigs := libslices.CollectValues( + s.getAllAWSServerFetchers(), + func(f server.Fetcher) (s string, skip bool) { + return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + s.awsEC2ResourcesStatus.reset() s.awsEC2Tasks.reset() }), @@ -547,7 +557,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error { _, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType) // Add non-integration kube fetchers. - kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers, "" /* discovery config */) + kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers, noDiscoveryConfig) if err != nil { return trace.Wrap(err) } @@ -602,16 +612,16 @@ func (s *Server) awsServerFetchersFromMatchers(ctx context.Context, matchers []t } // azureServerFetchersFromMatchers converts Matchers into a set of Azure Servers Fetchers. -func (s *Server) azureServerFetchersFromMatchers(matchers []types.AzureMatcher) []server.Fetcher { +func (s *Server) azureServerFetchersFromMatchers(matchers []types.AzureMatcher, discoveryConfig string) []server.Fetcher { serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool { return matcherType == types.AzureMatcherVM }) - return server.MatchersToAzureInstanceFetchers(serverMatchers, s.CloudClients) + return server.MatchersToAzureInstanceFetchers(serverMatchers, s.CloudClients, discoveryConfig) } // gcpServerFetchersFromMatchers converts Matchers into a set of GCP Servers Fetchers. -func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []types.GCPMatcher) ([]server.Fetcher, error) { +func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []types.GCPMatcher, discoveryConfig string) ([]server.Fetcher, error) { serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool { return matcherType == types.GCPMatcherCompute }) @@ -632,7 +642,7 @@ func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []t return nil, trace.Wrap(err) } - return server.MatchersToGCPInstanceFetchers(serverMatchers, client, projectsClient), nil + return server.MatchersToGCPInstanceFetchers(serverMatchers, client, projectsClient, discoveryConfig), nil } // databaseFetchersFromMatchers converts Matchers into a set of Database Fetchers. @@ -686,12 +696,12 @@ func (s *Server) kubeFetchersFromMatchers(matchers Matchers, discoveryConfig str } // initAzureWatchers starts Azure resource watchers based on types provided. -func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMatcher) error { +func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMatcher, discoveryConfig string) error { vmMatchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool { return matcherType == types.AzureMatcherVM }) - s.staticServerAzureFetchers = server.MatchersToAzureInstanceFetchers(vmMatchers, s.CloudClients) + s.staticServerAzureFetchers = server.MatchersToAzureInstanceFetchers(vmMatchers, s.CloudClients, discoveryConfig) // VM watcher. var err error @@ -699,6 +709,15 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa s.ctx, s.getAllAzureServerFetchers, server.WithPollInterval(s.PollInterval), server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()), + server.WithPreFetchHookFn(func() { + discoveryConfigs := libslices.CollectValues( + s.getAllAzureServerFetchers(), + func(f server.Fetcher) (s string, skip bool) { + return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + }), ) if err != nil { return trace.Wrap(err) @@ -744,10 +763,10 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa return nil } -func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GCPMatcher) error { +func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GCPMatcher, discoveryConfig string) error { var err error - s.staticServerGCPFetchers, err = s.gcpServerFetchersFromMatchers(ctx, vmMatchers) + s.staticServerGCPFetchers, err = s.gcpServerFetchersFromMatchers(ctx, vmMatchers, discoveryConfig) if err != nil { return trace.Wrap(err) } @@ -756,6 +775,15 @@ func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GC s.ctx, s.getAllGCPServerFetchers, server.WithPollInterval(s.PollInterval), server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()), + server.WithPreFetchHookFn(func() { + discoveryConfigs := libslices.CollectValues( + s.getAllGCPServerFetchers(), + func(f server.Fetcher) (s string, skip bool) { + return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + }), ) if err != nil { return trace.Wrap(err) @@ -771,7 +799,7 @@ func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GC } // initGCPWatchers starts GCP resource watchers based on types provided. -func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatcher) error { +func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatcher, discoveryConfig string) error { // return early if there are no matchers as GetGCPGKEClient causes // an error if there are no credentials present @@ -779,7 +807,7 @@ func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatche return matcherType == types.GCPMatcherCompute }) - if err := s.initGCPServerWatcher(ctx, vmMatchers); err != nil { + if err := s.initGCPServerWatcher(ctx, vmMatchers, discoveryConfig); err != nil { return trace.Wrap(err) } @@ -1606,7 +1634,9 @@ func (s *Server) startDynamicWatcherUpdater() { s.Log.WarnContext(s.ctx, "Skipping unknown event type %s", "got", event.Type) } case <-s.dynamicMatcherWatcher.Done(): - s.Log.WarnContext(s.ctx, "Dynamic matcher watcher error", "error", s.dynamicMatcherWatcher.Error()) + if err := s.dynamicMatcherWatcher.Error(); err != nil { + s.Log.WarnContext(s.ctx, "Dynamic matcher watcher error", "error", err) + } return } } @@ -1681,12 +1711,12 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig. s.dynamicServerAWSFetchers[dc.GetName()] = awsServerFetchers s.muDynamicServerAWSFetchers.Unlock() - azureServerFetchers := s.azureServerFetchersFromMatchers(matchers.Azure) + azureServerFetchers := s.azureServerFetchersFromMatchers(matchers.Azure, dc.GetName()) s.muDynamicServerAzureFetchers.Lock() s.dynamicServerAzureFetchers[dc.GetName()] = azureServerFetchers s.muDynamicServerAzureFetchers.Unlock() - gcpServerFetchers, err := s.gcpServerFetchersFromMatchers(s.ctx, matchers.GCP) + gcpServerFetchers, err := s.gcpServerFetchersFromMatchers(s.ctx, matchers.GCP, dc.GetName()) if err != nil { return trace.Wrap(err) } diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 8c9c08b0aff95..ceca9098339db 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -1898,6 +1898,7 @@ func TestDiscoveryDatabase(t *testing.T) { {Engine: aws.String(services.RDSEnginePostgres)}, }, }, + MemoryDB: &mocks.MemoryDBMock{}, Redshift: &mocks.RedshiftMock{ Clusters: []*redshift.Cluster{awsRedshiftResource}, }, @@ -2160,6 +2161,27 @@ func TestDiscoveryDatabase(t *testing.T) { require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsEks.Enrolled) }, }, + { + name: "discovery config status must be updated even when there are no resources", + discoveryConfigs: func(t *testing.T) []*discoveryconfig.DiscoveryConfig { + dc1 := matcherForDiscoveryConfigFn(t, mainDiscoveryGroup, Matchers{ + AWS: []types.AWSMatcher{{ + // MemoryDB mock client returns no resources. + Types: []string{types.AWSMatcherMemoryDB}, + Tags: map[string]utils.Strings{types.Wildcard: {types.Wildcard}}, + Regions: []string{"us-east-1"}, + Integration: integrationName, + }}, + }) + return []*discoveryconfig.DiscoveryConfig{dc1} + }, + expectDatabases: []types.Database{}, + wantEvents: 0, + discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) { + require.Equal(t, uint64(0), s.DiscoveredResources) + require.Equal(t, "DISCOVERY_CONFIG_STATE_SYNCING", s.State) + }, + }, } for _, tc := range tcs { diff --git a/lib/srv/discovery/kube_integration_watcher.go b/lib/srv/discovery/kube_integration_watcher.go index 444565bb6a299..db786d8c22c8a 100644 --- a/lib/srv/discovery/kube_integration_watcher.go +++ b/lib/srv/discovery/kube_integration_watcher.go @@ -33,6 +33,7 @@ import ( "github.com/gravitational/teleport/lib/automaticupgrades" kubeutils "github.com/gravitational/teleport/lib/kube/utils" "github.com/gravitational/teleport/lib/srv/discovery/common" + libslices "github.com/gravitational/teleport/lib/utils/slices" ) // startKubeIntegrationWatchers starts kube watchers that use integration for the credentials. Currently only @@ -74,6 +75,14 @@ func (s *Server) startKubeIntegrationWatchers() error { Origin: types.OriginCloud, TriggerFetchC: s.newDiscoveryConfigChangedSub(), PreFetchHookFn: func() { + discoveryConfigs := libslices.CollectValues( + s.getKubeIntegrationFetchers(), + func(f common.Fetcher) (s string, skip bool) { + return f.DiscoveryConfigName(), f.DiscoveryConfigName() == "" + }, + ) + s.updateDiscoveryConfigStatus(discoveryConfigs...) + s.awsEKSResourcesStatus.reset() }, }) diff --git a/lib/srv/discovery/status.go b/lib/srv/discovery/status.go index ebf1c0b32ee92..83c4d5877e07d 100644 --- a/lib/srv/discovery/status.go +++ b/lib/srv/discovery/status.go @@ -45,40 +45,42 @@ import ( // - AWS EC2 Auto Discover status // - AWS RDS Auto Discover status // - AWS EKS Auto Discover status -func (s *Server) updateDiscoveryConfigStatus(discoveryConfigName string) { - // Static configurations (ie those in `teleport.yaml/discovery_config..matchers`) do not have a DiscoveryConfig resource. - // Those are discarded because there's no Status to update. - if discoveryConfigName == "" { - return - } +func (s *Server) updateDiscoveryConfigStatus(discoveryConfigNames ...string) { + for _, discoveryConfigName := range discoveryConfigNames { + // Static configurations (ie those in `teleport.yaml/discovery_config..matchers`) do not have a DiscoveryConfig resource. + // Those are discarded because there's no Status to update. + if discoveryConfigName == "" { + return + } - discoveryConfigStatus := discoveryconfig.Status{ - State: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(), - LastSyncTime: s.clock.Now(), - IntegrationDiscoveredResources: make(map[string]*discoveryconfigv1.IntegrationDiscoveredSummary), - } + discoveryConfigStatus := discoveryconfig.Status{ + State: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(), + LastSyncTime: s.clock.Now(), + IntegrationDiscoveredResources: make(map[string]*discoveryconfigv1.IntegrationDiscoveredSummary), + } - // Merge AWS Sync (TAG) status - discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS Sync (TAG) status + discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) - // Merge AWS EC2 Instances (auto discovery) status - discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS EC2 Instances (auto discovery) status + discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) - // Merge AWS RDS databases (auto discovery) status - discoveryConfigStatus = s.awsRDSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS RDS databases (auto discovery) status + discoveryConfigStatus = s.awsRDSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) - // Merge AWS EKS clusters (auto discovery) status - discoveryConfigStatus = s.awsEKSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) + // Merge AWS EKS clusters (auto discovery) status + discoveryConfigStatus = s.awsEKSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus) - ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) + defer cancel() - _, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, discoveryConfigName, discoveryConfigStatus) - switch { - case trace.IsNotImplemented(err): - s.Log.WarnContext(ctx, "UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.") - case err != nil: - s.Log.InfoContext(ctx, "Error updating discovery config status", "discovery_config_name", discoveryConfigName, "error", err) + _, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, discoveryConfigName, discoveryConfigStatus) + switch { + case trace.IsNotImplemented(err): + s.Log.WarnContext(ctx, "UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.") + case err != nil: + s.Log.InfoContext(ctx, "Error updating discovery config status", "discovery_config_name", discoveryConfigName, "error", err) + } } } diff --git a/lib/srv/server/azure_watcher.go b/lib/srv/server/azure_watcher.go index 4339bfc713713..33ac1dd532f49 100644 --- a/lib/srv/server/azure_watcher.go +++ b/lib/srv/server/azure_watcher.go @@ -97,7 +97,7 @@ func NewAzureWatcher(ctx context.Context, fetchersFn func() []Fetcher, opts ...O } // MatchersToAzureInstanceFetchers converts a list of Azure VM Matchers into a list of Azure VM Fetchers. -func MatchersToAzureInstanceFetchers(matchers []types.AzureMatcher, clients azureClientGetter) []Fetcher { +func MatchersToAzureInstanceFetchers(matchers []types.AzureMatcher, clients azureClientGetter, discoveryConfig string) []Fetcher { ret := make([]Fetcher, 0) for _, matcher := range matchers { for _, subscription := range matcher.Subscriptions { @@ -107,6 +107,7 @@ func MatchersToAzureInstanceFetchers(matchers []types.AzureMatcher, clients azur Subscription: subscription, ResourceGroup: resourceGroup, AzureClientGetter: clients, + DiscoveryConfig: discoveryConfig, }) ret = append(ret, fetcher) } @@ -120,6 +121,7 @@ type azureFetcherConfig struct { Subscription string ResourceGroup string AzureClientGetter azureClientGetter + DiscoveryConfig string } type azureInstanceFetcher struct { @@ -130,6 +132,7 @@ type azureInstanceFetcher struct { Labels types.Labels Parameters map[string]string ClientID string + DiscoveryConfig string } func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher { @@ -139,6 +142,7 @@ func newAzureInstanceFetcher(cfg azureFetcherConfig) *azureInstanceFetcher { Subscription: cfg.Subscription, ResourceGroup: cfg.ResourceGroup, Labels: cfg.Matcher.ResourceTags, + DiscoveryConfig: cfg.DiscoveryConfig, } if cfg.Matcher.Params != nil { @@ -157,6 +161,10 @@ func (*azureInstanceFetcher) GetMatchingInstances(_ []types.Server, _ bool) ([]I return nil, trace.NotImplemented("not implemented for azure fetchers") } +func (f *azureInstanceFetcher) GetDiscoveryConfig() string { + return f.DiscoveryConfig +} + // GetInstances fetches all Azure virtual machines matching configured filters. func (f *azureInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) { client, err := f.AzureClientGetter.GetAzureVirtualMachinesClient(f.Subscription) diff --git a/lib/srv/server/azure_watcher_test.go b/lib/srv/server/azure_watcher_test.go index ad507911c6882..1b656205f97ad 100644 --- a/lib/srv/server/azure_watcher_test.go +++ b/lib/srv/server/azure_watcher_test.go @@ -146,7 +146,7 @@ func TestAzureWatcher(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) watcher, err := NewAzureWatcher(ctx, func() []Fetcher { - return MatchersToAzureInstanceFetchers([]types.AzureMatcher{tc.matcher}, &clients) + return MatchersToAzureInstanceFetchers([]types.AzureMatcher{tc.matcher}, &clients, "" /* discovery config */) }) require.NoError(t, err) diff --git a/lib/srv/server/ec2_watcher.go b/lib/srv/server/ec2_watcher.go index 4c6300e7bf661..c7ceb13b92cc0 100644 --- a/lib/srv/server/ec2_watcher.go +++ b/lib/srv/server/ec2_watcher.go @@ -456,3 +456,8 @@ func (f *ec2InstanceFetcher) GetInstances(ctx context.Context, rotation bool) ([ return instances, nil } + +// GetDiscoveryConfig returns the discovery config name that created this fetcher. +func (f *ec2InstanceFetcher) GetDiscoveryConfig() string { + return f.DiscoveryConfig +} diff --git a/lib/srv/server/gcp_watcher.go b/lib/srv/server/gcp_watcher.go index 466a85e29fe3b..18f11ee9fe9c6 100644 --- a/lib/srv/server/gcp_watcher.go +++ b/lib/srv/server/gcp_watcher.go @@ -91,7 +91,7 @@ func NewGCPWatcher(ctx context.Context, fetchersFn func() []Fetcher, opts ...Opt } // MatchersToGCPInstanceFetchers converts a list of GCP GCE Matchers into a list of GCP GCE Fetchers. -func MatchersToGCPInstanceFetchers(matchers []types.GCPMatcher, gcpClient gcp.InstancesClient, projectsClient gcp.ProjectsClient) []Fetcher { +func MatchersToGCPInstanceFetchers(matchers []types.GCPMatcher, gcpClient gcp.InstancesClient, projectsClient gcp.ProjectsClient, discoveryConfig string) []Fetcher { fetchers := make([]Fetcher, 0, len(matchers)) for _, matcher := range matchers { @@ -106,9 +106,10 @@ func MatchersToGCPInstanceFetchers(matchers []types.GCPMatcher, gcpClient gcp.In } type gcpFetcherConfig struct { - Matcher types.GCPMatcher - GCPClient gcp.InstancesClient - projectsClient gcp.ProjectsClient + Matcher types.GCPMatcher + GCPClient gcp.InstancesClient + projectsClient gcp.ProjectsClient + DiscoveryConfig string } type gcpInstanceFetcher struct { @@ -120,6 +121,7 @@ type gcpInstanceFetcher struct { Labels types.Labels Parameters map[string]string projectsClient gcp.ProjectsClient + DiscoveryConfig string } func newGCPInstanceFetcher(cfg gcpFetcherConfig) *gcpInstanceFetcher { @@ -145,6 +147,10 @@ func (*gcpInstanceFetcher) GetMatchingInstances(_ []types.Server, _ bool) ([]Ins return nil, trace.NotImplemented("not implemented for gcp fetchers") } +func (f *gcpInstanceFetcher) GetDiscoveryConfig() string { + return f.DiscoveryConfig +} + // GetInstances fetches all GCP virtual machines matching configured filters. func (f *gcpInstanceFetcher) GetInstances(ctx context.Context, _ bool) ([]Instances, error) { // Key by project ID, then by zone. diff --git a/lib/srv/server/watcher.go b/lib/srv/server/watcher.go index 437fb90fed660..bb34389a3db4e 100644 --- a/lib/srv/server/watcher.go +++ b/lib/srv/server/watcher.go @@ -42,6 +42,9 @@ type Fetcher interface { // GetMatchingInstances finds Instances from the list of nodes // that the fetcher matches. GetMatchingInstances(nodes []types.Server, rotation bool) ([]Instances, error) + // GetDiscoveryConfig returns the DiscoveryConfig name that created this fetcher. + // Empty for Fetchers created from `teleport.yaml/discovery_service.aws.` matchers. + GetDiscoveryConfig() string } // WithTriggerFetchC sets a poll trigger to manual start a resource polling. diff --git a/lib/utils/slices/slices.go b/lib/utils/slices/slices.go new file mode 100644 index 0000000000000..16b00922b2856 --- /dev/null +++ b/lib/utils/slices/slices.go @@ -0,0 +1,40 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package slices + +import ( + "cmp" + "slices" +) + +// CollectValues applies a function to all elements of a slice and collects them. +// The function returns the value to collect and whether the current element should be skipped. +// Returned values are sorted and deduplicated. +func CollectValues[T any, S cmp.Ordered](ts []T, fn func(T) (s S, skip bool)) []S { + ss := make([]S, 0, len(ts)) + for _, t := range ts { + s, skip := fn(t) + if skip { + continue + } + ss = append(ss, s) + } + slices.Sort(ss) + return slices.Compact(ss) +} diff --git a/lib/utils/slices/slices_test.go b/lib/utils/slices/slices_test.go new file mode 100644 index 0000000000000..d84f4e13435ca --- /dev/null +++ b/lib/utils/slices/slices_test.go @@ -0,0 +1,72 @@ +/* + * Teleport + * Copyright (C) 2024 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package slices + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCollectValues(t *testing.T) { + for _, tt := range []struct { + name string + input []string + collector func(string) (s string, skip bool) + expected []string + }{ + { + name: "no elements", + input: []string{}, + collector: func(in string) (s string, skip bool) { + return in, false + }, + expected: []string{}, + }, + { + name: "multiple strings, all match", + input: []string{"x", "y"}, + collector: func(in string) (s string, skip bool) { + return in, false + }, + expected: []string{"x", "y"}, + }, + { + name: "deduplicates items", + input: []string{"x", "y", "z", "x"}, + collector: func(in string) (s string, skip bool) { + return in, false + }, + expected: []string{"x", "y", "z"}, + }, + { + name: "skipped values are not returned", + input: []string{"x", "y", "z", ""}, + collector: func(in string) (s string, skip bool) { + return in, in == "" + }, + expected: []string{"x", "y", "z"}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + got := CollectValues(tt.input, tt.collector) + require.Equal(t, tt.expected, got) + }) + } +}