Skip to content

Commit

Permalink
Add Integration and DiscoveryConfig as labels to auto discovered RDS/…
Browse files Browse the repository at this point in the history
…EKS (#48986) (#49177)

* Add Integration and DiscoveryConfig as labels to auto discovered RDS/EKS

This PR adds two new labels to RDS and EKS resources:
- Integration
- DiscoveryConfig

Those labels help identify which Integration and DiscoveryConfig were
used to auto enroll the resources.

* remove unused code

* add missing discovery config labels for GCP and Azure
  • Loading branch information
marcoandredinis authored Nov 20, 2024
1 parent 2754bbb commit 50ae8f5
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 132 deletions.
10 changes: 10 additions & 0 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,16 @@ const (
// that belong to different discovery services that operate on different sets of resources.
TeleportInternalDiscoveryGroupName = TeleportInternalLabelPrefix + "discovery-group-name"

// TeleportInternalDiscoveryIntegrationName is the label used to store the name of the integration
// whose credentials were used to discover the resource.
// It is used to report stats for a given Integration / DiscoveryConfig.
TeleportInternalDiscoveryIntegrationName = TeleportInternalLabelPrefix + "discovery-integration-name"

// TeleportInternalDiscoveryConfigName is the label used to store the name of the discovery config
// whose matchers originated the resource.
// It is used to report stats for a given Integration / DiscoveryConfig.
TeleportInternalDiscoveryConfigName = TeleportInternalLabelPrefix + "discovery-config-name"

// TeleportDowngradedLabel identifies resources that have been automatically
// downgraded before being returned to clients on older versions that do not
// support one or more features enabled in that resource.
Expand Down
4 changes: 2 additions & 2 deletions lib/srv/db/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func (s *Server) startResourceWatcher(ctx context.Context) (*services.DatabaseWa
// startCloudWatcher starts fetching cloud databases according to the
// selectors and register/unregister them appropriately.
func (s *Server) startCloudWatcher(ctx context.Context) error {
awsFetchers, err := dbfetchers.MakeAWSFetchers(ctx, s.cfg.CloudClients, s.cfg.AWSMatchers)
awsFetchers, err := dbfetchers.MakeAWSFetchers(ctx, s.cfg.CloudClients, s.cfg.AWSMatchers, "" /* discovery config */)
if err != nil {
return trace.Wrap(err)
}
azureFetchers, err := dbfetchers.MakeAzureFetchers(s.cfg.CloudClients, s.cfg.AzureMatchers)
azureFetchers, err := dbfetchers.MakeAzureFetchers(s.cfg.CloudClients, s.cfg.AzureMatchers, "" /* discovery config */)
if err != nil {
return trace.Wrap(err)
}
Expand Down
7 changes: 7 additions & 0 deletions lib/srv/discovery/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ type Fetcher interface {
// FetcherType identifies the Fetcher Type (cloud resource name).
// Eg, ec2, rds, aks, gce
FetcherType() string
// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
IntegrationName() string
// DiscoveryConfigName is the name of the discovery config which originated the resource.
// Might be empty when the fetcher is using static matchers:
// ie teleport.yaml/discovery_service.<cloud>.<matcher>
DiscoveryConfigName() string
// Cloud returns the cloud the fetcher is operating.
Cloud() string
}
40 changes: 27 additions & 13 deletions lib/srv/discovery/common/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package common

import (
"context"
"maps"
"sync"
"time"

Expand Down Expand Up @@ -102,11 +103,12 @@ func NewWatcher(ctx context.Context, config WatcherConfig) (*Watcher, error) {
if err := config.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
return &Watcher{
watcher := &Watcher{
cfg: config,
ctx: ctx,
resourcesC: make(chan types.ResourcesWithLabels),
}, nil
}
return watcher, nil
}

// Start starts fetching cloud resources and sending them to the channel.
Expand Down Expand Up @@ -163,23 +165,35 @@ func (w *Watcher) fetchAndSend() {
return nil
}

fetcherLabels := make(map[string]string, 0)

if lFetcher.IntegrationName() != "" {
// Add the integration name to the static labels for each resource.
fetcherLabels[types.TeleportInternalDiscoveryIntegrationName] = lFetcher.IntegrationName()
}
if lFetcher.DiscoveryConfigName() != "" {
// Add the discovery config name to the static labels of each resource.
fetcherLabels[types.TeleportInternalDiscoveryConfigName] = lFetcher.DiscoveryConfigName()
}

if w.cfg.DiscoveryGroup != "" {
// Add the discovery group name to the static labels of each resource.
fetcherLabels[types.TeleportInternalDiscoveryGroupName] = w.cfg.DiscoveryGroup
}

// Set the origin label to provide information where resource comes from
fetcherLabels[types.OriginLabel] = w.cfg.Origin
if c := lFetcher.Cloud(); c != "" {
fetcherLabels[types.CloudLabel] = c
}

for _, r := range resources {
staticLabels := r.GetStaticLabels()
if staticLabels == nil {
staticLabels = make(map[string]string)
}

if w.cfg.DiscoveryGroup != "" {
// Add the discovery group name to the static labels of each resource.
staticLabels[types.TeleportInternalDiscoveryGroupName] = w.cfg.DiscoveryGroup
}

// Set the origin label to provide information where resource comes from
staticLabels[types.OriginLabel] = w.cfg.Origin
if c := lFetcher.Cloud(); c != "" {
staticLabels[types.CloudLabel] = c
}

maps.Copy(staticLabels, fetcherLabels)
r.SetStaticLabels(staticLabels)
}

Expand Down
7 changes: 7 additions & 0 deletions lib/srv/discovery/common/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ func (m *mockFetcher) FetcherType() string {
return "empty"
}

func (m *mockFetcher) IntegrationName() string {
return ""
}

func (m *mockFetcher) DiscoveryConfigName() string {
return ""
}
func (m *mockFetcher) Cloud() string {
return m.cloud
}
20 changes: 10 additions & 10 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
return nil, trace.Wrap(err)
}

databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers)
databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers, "" /* discovery config */)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -507,7 +507,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
_, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType)

// Add non-integration kube fetchers.
kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers)
kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers, "" /* discovery config */)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -596,13 +596,13 @@ func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []t
}

// databaseFetchersFromMatchers converts Matchers into a set of Database Fetchers.
func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
func (s *Server) databaseFetchersFromMatchers(matchers Matchers, discoveryConfig string) ([]common.Fetcher, error) {
var fetchers []common.Fetcher

// AWS
awsDatabaseMatchers, _ := splitMatchers(matchers.AWS, db.IsAWSMatcherType)
if len(awsDatabaseMatchers) > 0 {
databaseFetchers, err := db.MakeAWSFetchers(s.ctx, s.CloudClients, awsDatabaseMatchers)
databaseFetchers, err := db.MakeAWSFetchers(s.ctx, s.CloudClients, awsDatabaseMatchers, discoveryConfig)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -612,7 +612,7 @@ func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetch
// Azure
azureDatabaseMatchers, _ := splitMatchers(matchers.Azure, db.IsAzureMatcherType)
if len(azureDatabaseMatchers) > 0 {
databaseFetchers, err := db.MakeAzureFetchers(s.CloudClients, azureDatabaseMatchers)
databaseFetchers, err := db.MakeAzureFetchers(s.CloudClients, azureDatabaseMatchers, discoveryConfig)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -625,15 +625,15 @@ func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetch
return fetchers, nil
}

func (s *Server) kubeFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
func (s *Server) kubeFetchersFromMatchers(matchers Matchers, discoveryConfig string) ([]common.Fetcher, error) {
var result []common.Fetcher

// AWS
awsKubeMatchers, _ := splitMatchers(matchers.AWS, func(matcherType string) bool {
return matcherType == types.AWSMatcherEKS
})
if len(awsKubeMatchers) > 0 {
eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, awsKubeMatchers)
eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, awsKubeMatchers, discoveryConfig)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1612,7 +1612,7 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicServerGCPFetchers[dc.GetName()] = gcpServerFetchers
s.muDynamicServerGCPFetchers.Unlock()

databaseFetchers, err := s.databaseFetchersFromMatchers(matchers)
databaseFetchers, err := s.databaseFetchersFromMatchers(matchers, dc.GetName())
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -1631,7 +1631,7 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicTAGSyncFetchers[dc.GetName()] = awsSyncMatchers
s.muDynamicTAGSyncFetchers.Unlock()

kubeFetchers, err := s.kubeFetchersFromMatchers(matchers)
kubeFetchers, err := s.kubeFetchersFromMatchers(matchers, dc.GetName())
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -1640,7 +1640,7 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicKubeFetchers[dc.GetName()] = kubeFetchers
s.muDynamicKubeFetchers.Unlock()

// TODO(marco): add other fetchers: Kube Clusters and Kube Resources (Apps)
// TODO(marco): add other fetchers: Kube Resources (Apps)
return nil
}

Expand Down
Loading

0 comments on commit 50ae8f5

Please sign in to comment.