Skip to content

Commit

Permalink
DiscoveryConfig Status: update even when no resources are found
Browse files Browse the repository at this point in the history
The DiscoveryService was not updating the DiscoveryConfigStatus when its
matchers didn't discovered any resource.

This would lead the user to think that there the DiscoveryConfig was not
being processed, when in fact it was.
  • Loading branch information
marcoandredinis committed Nov 29, 2024
1 parent 76bfe76 commit d292bd2
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 51 deletions.
9 changes: 9 additions & 0 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/gravitational/teleport/lib/services"
"github.com/gravitational/teleport/lib/srv/discovery/common"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/slices"
)

const databaseEventPrefix = "db/"
Expand Down Expand Up @@ -74,6 +75,14 @@ func (s *Server) startDatabaseWatchers() error {
Origin: types.OriginCloud,
Clock: s.clock,
PreFetchHookFn: func() {
discoveryConfigs := slices.CollectValues(
s.getAllDatabaseFetchers(),
func(f common.Fetcher) (s string, skip bool) {
return f.DiscoveryConfigName(), f.DiscoveryConfigName() == ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsRDSResourcesStatus.reset()
},
},
Expand Down
66 changes: 48 additions & 18 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ import (
"github.com/gravitational/teleport/lib/srv/discovery/fetchers/db"
"github.com/gravitational/teleport/lib/srv/server"
logutils "github.com/gravitational/teleport/lib/utils/log"
libslices "github.com/gravitational/teleport/lib/utils/slices"
"github.com/gravitational/teleport/lib/utils/spreadwork"
)

var errNoInstances = errors.New("all fetched nodes already enrolled")

const noDiscoveryConfig = ""

// Matchers contains all matchers used by discovery service
type Matchers struct {
// AWS is a list of AWS EC2 matchers.
Expand Down Expand Up @@ -423,7 +426,7 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
return nil, trace.Wrap(err)
}

databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers, "" /* discovery config */)
databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers, noDiscoveryConfig)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -433,11 +436,11 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
return nil, trace.Wrap(err)
}

if err := s.initAzureWatchers(s.ctx, cfg.Matchers.Azure); err != nil {
if err := s.initAzureWatchers(s.ctx, cfg.Matchers.Azure, noDiscoveryConfig); err != nil {
return nil, trace.Wrap(err)
}

if err := s.initGCPWatchers(s.ctx, cfg.Matchers.GCP); err != nil {
if err := s.initGCPWatchers(s.ctx, cfg.Matchers.GCP, noDiscoveryConfig); err != nil {
return nil, trace.Wrap(err)
}

Expand Down Expand Up @@ -502,7 +505,6 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
return matcherType == types.AWSMatcherEC2
})

const noDiscoveryConfig = ""
s.staticServerAWSFetchers, err = server.MatchersToEC2InstanceFetchers(s.ctx, ec2Matchers, s.GetEC2Client, noDiscoveryConfig)
if err != nil {
return trace.Wrap(err)
Expand All @@ -513,6 +515,14 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
discoveryConfigs := libslices.CollectValues(
s.getAllAWSServerFetchers(),
func(f server.Fetcher) (s string, skip bool) {
return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEC2ResourcesStatus.reset()
s.awsEC2Tasks.reset()
}),
Expand Down Expand Up @@ -547,7 +557,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
_, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType)

// Add non-integration kube fetchers.
kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers, "" /* discovery config */)
kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers, noDiscoveryConfig)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -602,16 +612,16 @@ func (s *Server) awsServerFetchersFromMatchers(ctx context.Context, matchers []t
}

// azureServerFetchersFromMatchers converts Matchers into a set of Azure Servers Fetchers.
func (s *Server) azureServerFetchersFromMatchers(matchers []types.AzureMatcher) []server.Fetcher {
func (s *Server) azureServerFetchersFromMatchers(matchers []types.AzureMatcher, discoveryConfig string) []server.Fetcher {
serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.AzureMatcherVM
})

return server.MatchersToAzureInstanceFetchers(serverMatchers, s.CloudClients)
return server.MatchersToAzureInstanceFetchers(serverMatchers, s.CloudClients, discoveryConfig)
}

// gcpServerFetchersFromMatchers converts Matchers into a set of GCP Servers Fetchers.
func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []types.GCPMatcher) ([]server.Fetcher, error) {
func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []types.GCPMatcher, discoveryConfig string) ([]server.Fetcher, error) {
serverMatchers, _ := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.GCPMatcherCompute
})
Expand All @@ -632,7 +642,7 @@ func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []t
return nil, trace.Wrap(err)
}

return server.MatchersToGCPInstanceFetchers(serverMatchers, client, projectsClient), nil
return server.MatchersToGCPInstanceFetchers(serverMatchers, client, projectsClient, discoveryConfig), nil
}

// databaseFetchersFromMatchers converts Matchers into a set of Database Fetchers.
Expand Down Expand Up @@ -686,19 +696,28 @@ func (s *Server) kubeFetchersFromMatchers(matchers Matchers, discoveryConfig str
}

// initAzureWatchers starts Azure resource watchers based on types provided.
func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMatcher) error {
func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMatcher, discoveryConfig string) error {
vmMatchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.AzureMatcherVM
})

s.staticServerAzureFetchers = server.MatchersToAzureInstanceFetchers(vmMatchers, s.CloudClients)
s.staticServerAzureFetchers = server.MatchersToAzureInstanceFetchers(vmMatchers, s.CloudClients, discoveryConfig)

// VM watcher.
var err error
s.azureWatcher, err = server.NewAzureWatcher(
s.ctx, s.getAllAzureServerFetchers,
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
discoveryConfigs := libslices.CollectValues(
s.getAllAzureServerFetchers(),
func(f server.Fetcher) (s string, skip bool) {
return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)
}),
)
if err != nil {
return trace.Wrap(err)
Expand Down Expand Up @@ -744,10 +763,10 @@ func (s *Server) initAzureWatchers(ctx context.Context, matchers []types.AzureMa
return nil
}

func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GCPMatcher) error {
func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GCPMatcher, discoveryConfig string) error {
var err error

s.staticServerGCPFetchers, err = s.gcpServerFetchersFromMatchers(ctx, vmMatchers)
s.staticServerGCPFetchers, err = s.gcpServerFetchersFromMatchers(ctx, vmMatchers, discoveryConfig)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -756,6 +775,15 @@ func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GC
s.ctx, s.getAllGCPServerFetchers,
server.WithPollInterval(s.PollInterval),
server.WithTriggerFetchC(s.newDiscoveryConfigChangedSub()),
server.WithPreFetchHookFn(func() {
discoveryConfigs := libslices.CollectValues(
s.getAllGCPServerFetchers(),
func(f server.Fetcher) (s string, skip bool) {
return f.GetDiscoveryConfig(), f.GetDiscoveryConfig() == ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)
}),
)
if err != nil {
return trace.Wrap(err)
Expand All @@ -771,15 +799,15 @@ func (s *Server) initGCPServerWatcher(ctx context.Context, vmMatchers []types.GC
}

// initGCPWatchers starts GCP resource watchers based on types provided.
func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatcher) error {
func (s *Server) initGCPWatchers(ctx context.Context, matchers []types.GCPMatcher, discoveryConfig string) error {
// return early if there are no matchers as GetGCPGKEClient causes
// an error if there are no credentials present

vmMatchers, otherMatchers := splitMatchers(matchers, func(matcherType string) bool {
return matcherType == types.GCPMatcherCompute
})

if err := s.initGCPServerWatcher(ctx, vmMatchers); err != nil {
if err := s.initGCPServerWatcher(ctx, vmMatchers, discoveryConfig); err != nil {
return trace.Wrap(err)
}

Expand Down Expand Up @@ -1606,7 +1634,9 @@ func (s *Server) startDynamicWatcherUpdater() {
s.Log.WarnContext(s.ctx, "Skipping unknown event type %s", "got", event.Type)
}
case <-s.dynamicMatcherWatcher.Done():
s.Log.WarnContext(s.ctx, "Dynamic matcher watcher error", "error", s.dynamicMatcherWatcher.Error())
if err := s.dynamicMatcherWatcher.Error(); err != nil {
s.Log.WarnContext(s.ctx, "Dynamic matcher watcher error", "error", err)
}
return
}
}
Expand Down Expand Up @@ -1681,12 +1711,12 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicServerAWSFetchers[dc.GetName()] = awsServerFetchers
s.muDynamicServerAWSFetchers.Unlock()

azureServerFetchers := s.azureServerFetchersFromMatchers(matchers.Azure)
azureServerFetchers := s.azureServerFetchersFromMatchers(matchers.Azure, dc.GetName())
s.muDynamicServerAzureFetchers.Lock()
s.dynamicServerAzureFetchers[dc.GetName()] = azureServerFetchers
s.muDynamicServerAzureFetchers.Unlock()

gcpServerFetchers, err := s.gcpServerFetchersFromMatchers(s.ctx, matchers.GCP)
gcpServerFetchers, err := s.gcpServerFetchersFromMatchers(s.ctx, matchers.GCP, dc.GetName())
if err != nil {
return trace.Wrap(err)
}
Expand Down
22 changes: 22 additions & 0 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,7 @@ func TestDiscoveryDatabase(t *testing.T) {
{Engine: aws.String(services.RDSEnginePostgres)},
},
},
MemoryDB: &mocks.MemoryDBMock{},
Redshift: &mocks.RedshiftMock{
Clusters: []*redshift.Cluster{awsRedshiftResource},
},
Expand Down Expand Up @@ -2160,6 +2161,27 @@ func TestDiscoveryDatabase(t *testing.T) {
require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsEks.Enrolled)
},
},
{
name: "discovery config status must be updated even when there are no resources",
discoveryConfigs: func(t *testing.T) []*discoveryconfig.DiscoveryConfig {
dc1 := matcherForDiscoveryConfigFn(t, mainDiscoveryGroup, Matchers{
AWS: []types.AWSMatcher{{
// MemoryDB mock client returns no resources.
Types: []string{types.AWSMatcherMemoryDB},
Tags: map[string]utils.Strings{types.Wildcard: {types.Wildcard}},
Regions: []string{"us-east-1"},
Integration: integrationName,
}},
})
return []*discoveryconfig.DiscoveryConfig{dc1}
},
expectDatabases: []types.Database{},
wantEvents: 0,
discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) {
require.Equal(t, uint64(0), s.DiscoveredResources)
require.Equal(t, "DISCOVERY_CONFIG_STATE_SYNCING", s.State)
},
},
}

for _, tc := range tcs {
Expand Down
9 changes: 9 additions & 0 deletions lib/srv/discovery/kube_integration_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/gravitational/teleport/lib/automaticupgrades"
kubeutils "github.com/gravitational/teleport/lib/kube/utils"
"github.com/gravitational/teleport/lib/srv/discovery/common"
libslices "github.com/gravitational/teleport/lib/utils/slices"
)

// startKubeIntegrationWatchers starts kube watchers that use integration for the credentials. Currently only
Expand Down Expand Up @@ -74,6 +75,14 @@ func (s *Server) startKubeIntegrationWatchers() error {
Origin: types.OriginCloud,
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
PreFetchHookFn: func() {
discoveryConfigs := libslices.CollectValues(
s.getKubeIntegrationFetchers(),
func(f common.Fetcher) (s string, skip bool) {
return f.DiscoveryConfigName(), f.DiscoveryConfigName() == ""
},
)
s.updateDiscoveryConfigStatus(discoveryConfigs...)

s.awsEKSResourcesStatus.reset()
},
})
Expand Down
56 changes: 29 additions & 27 deletions lib/srv/discovery/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,40 +45,42 @@ import (
// - AWS EC2 Auto Discover status
// - AWS RDS Auto Discover status
// - AWS EKS Auto Discover status
func (s *Server) updateDiscoveryConfigStatus(discoveryConfigName string) {
// Static configurations (ie those in `teleport.yaml/discovery_config.<cloud>.matchers`) do not have a DiscoveryConfig resource.
// Those are discarded because there's no Status to update.
if discoveryConfigName == "" {
return
}
func (s *Server) updateDiscoveryConfigStatus(discoveryConfigNames ...string) {
for _, discoveryConfigName := range discoveryConfigNames {
// Static configurations (ie those in `teleport.yaml/discovery_config.<cloud>.matchers`) do not have a DiscoveryConfig resource.
// Those are discarded because there's no Status to update.
if discoveryConfigName == "" {
return
}

discoveryConfigStatus := discoveryconfig.Status{
State: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(),
LastSyncTime: s.clock.Now(),
IntegrationDiscoveredResources: make(map[string]*discoveryconfigv1.IntegrationDiscoveredSummary),
}
discoveryConfigStatus := discoveryconfig.Status{
State: discoveryconfigv1.DiscoveryConfigState_DISCOVERY_CONFIG_STATE_SYNCING.String(),
LastSyncTime: s.clock.Now(),
IntegrationDiscoveredResources: make(map[string]*discoveryconfigv1.IntegrationDiscoveredSummary),
}

// Merge AWS Sync (TAG) status
discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)
// Merge AWS Sync (TAG) status
discoveryConfigStatus = s.awsSyncStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)

// Merge AWS EC2 Instances (auto discovery) status
discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)
// Merge AWS EC2 Instances (auto discovery) status
discoveryConfigStatus = s.awsEC2ResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)

// Merge AWS RDS databases (auto discovery) status
discoveryConfigStatus = s.awsRDSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)
// Merge AWS RDS databases (auto discovery) status
discoveryConfigStatus = s.awsRDSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)

// Merge AWS EKS clusters (auto discovery) status
discoveryConfigStatus = s.awsEKSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)
// Merge AWS EKS clusters (auto discovery) status
discoveryConfigStatus = s.awsEKSResourcesStatus.mergeIntoGlobalStatus(discoveryConfigName, discoveryConfigStatus)

ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
defer cancel()
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
defer cancel()

_, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, discoveryConfigName, discoveryConfigStatus)
switch {
case trace.IsNotImplemented(err):
s.Log.WarnContext(ctx, "UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.")
case err != nil:
s.Log.InfoContext(ctx, "Error updating discovery config status", "discovery_config_name", discoveryConfigName, "error", err)
_, err := s.AccessPoint.UpdateDiscoveryConfigStatus(ctx, discoveryConfigName, discoveryConfigStatus)
switch {
case trace.IsNotImplemented(err):
s.Log.WarnContext(ctx, "UpdateDiscoveryConfigStatus method is not implemented in Auth Server. Please upgrade it to a recent version.")
case err != nil:
s.Log.InfoContext(ctx, "Error updating discovery config status", "discovery_config_name", discoveryConfigName, "error", err)
}
}
}

Expand Down
Loading

0 comments on commit d292bd2

Please sign in to comment.