Skip to content

Commit

Permalink
Add Integration and DiscoveryConfig as labels to auto discovered RDS/EKS
Browse files Browse the repository at this point in the history
This PR adds two new labels to RDS and EKS resources:
- Integration
- DiscoveryConfig

Those labels help identify which Integration and DiscoveryConfig were
used to auto enroll the resources.
  • Loading branch information
marcoandredinis committed Nov 14, 2024
1 parent e24c2ef commit 7efa13b
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 118 deletions.
10 changes: 10 additions & 0 deletions api/types/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,16 @@ const (
// that belong to different discovery services that operate on different sets of resources.
TeleportInternalDiscoveryGroupName = TeleportInternalLabelPrefix + "discovery-group-name"

// TeleportInternalDiscoveryIntegrationName is the label used to store the name of the integration
// whose credentials were used to discover the resource.
// It is used to report stats for a given Integration / DiscoveryConfig.
TeleportInternalDiscoveryIntegrationName = TeleportInternalLabelPrefix + "discovery-integration-name"

// TeleportInternalDiscoveryConfigName is the label used to store the name of the discovery config
// whose matchers originated the resource.
// It is used to report stats for a given Integration / DiscoveryConfig.
TeleportInternalDiscoveryConfigName = TeleportInternalLabelPrefix + "discovery-config-name"

// TeleportDowngradedLabel identifies resources that have been automatically
// downgraded before being returned to clients on older versions that do not
// support one or more features enabled in that resource.
Expand Down
2 changes: 1 addition & 1 deletion lib/srv/db/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *Server) startResourceWatcher(ctx context.Context) (*services.GenericWat
// startCloudWatcher starts fetching cloud databases according to the
// selectors and register/unregister them appropriately.
func (s *Server) startCloudWatcher(ctx context.Context) error {
awsFetchers, err := dbfetchers.MakeAWSFetchers(ctx, s.cfg.CloudClients, s.cfg.AWSMatchers)
awsFetchers, err := dbfetchers.MakeAWSFetchers(ctx, s.cfg.CloudClients, s.cfg.AWSMatchers, "" /* discovery config */)
if err != nil {
return trace.Wrap(err)
}
Expand Down
7 changes: 7 additions & 0 deletions lib/srv/discovery/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ type Fetcher interface {
// FetcherType identifies the Fetcher Type (cloud resource name).
// Eg, ec2, rds, aks, gce
FetcherType() string
// IntegrationName identifies the integration name whose credentials were used to fetch the resources.
// Might be empty when the fetcher is using ambient credentials.
IntegrationName() string
// DiscoveryConfigName is the name of the discovery config which originated the resource.
// Might be empty when the fetcher is using static matchers:
// ie teleport.yaml/discovery_service.<cloud>.<matcher>
DiscoveryConfigName() string
// Cloud returns the cloud the fetcher is operating.
Cloud() string
}
80 changes: 57 additions & 23 deletions lib/srv/discovery/common/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package common

import (
"context"
"maps"
"sync"
"time"

Expand Down Expand Up @@ -95,18 +96,37 @@ type Watcher struct {
ctx context.Context
// resourcesC is a channel where fetched resourcess are sent.
resourcesC chan (types.ResourcesWithLabels)
// preFetchHookFn is called before starting a new poll.
preFetchHookFn func()
}

// Option is a functional option for the Watcher.
type Option func(*Watcher)

// WithPreFetchHookFn sets a function that gets called before each new iteration.
func WithPreFetchHookFn(f func()) Option {
return func(w *Watcher) {
w.preFetchHookFn = f
}
}

// NewWatcher returns a new instance of a common discovery watcher.
func NewWatcher(ctx context.Context, config WatcherConfig) (*Watcher, error) {
func NewWatcher(ctx context.Context, config WatcherConfig, options ...Option) (*Watcher, error) {
if err := config.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
return &Watcher{
cfg: config,
ctx: ctx,
resourcesC: make(chan types.ResourcesWithLabels),
}, nil
watcher := &Watcher{
cfg: config,
ctx: ctx,
resourcesC: make(chan types.ResourcesWithLabels),
preFetchHookFn: func() {},
}

for _, opt := range options {
opt(watcher)
}

return watcher, nil
}

// Start starts fetching cloud resources and sending them to the channel.
Expand Down Expand Up @@ -139,6 +159,8 @@ func (w *Watcher) Start() {

// fetchAndSend fetches resources from all fetchers and sends them to the channel.
func (w *Watcher) fetchAndSend() {
w.preFetchHookFn()

var (
newFetcherResources = make(types.ResourcesWithLabels, 0, 50)
fetchersLock sync.Mutex
Expand All @@ -163,29 +185,41 @@ func (w *Watcher) fetchAndSend() {
return nil
}

fetcherLabels := make(map[string]string, 0)

if lFetcher.IntegrationName() != "" {
// Add the integration name to the static labels for each resource.
fetcherLabels[types.TeleportInternalDiscoveryIntegrationName] = lFetcher.IntegrationName()
}
if lFetcher.DiscoveryConfigName() != "" {
// Add the discovery config name to the static labels of each resource.
fetcherLabels[types.TeleportInternalDiscoveryConfigName] = lFetcher.DiscoveryConfigName()
}

if w.cfg.DiscoveryGroup != "" {
// Add the discovery group name to the static labels of each resource.
fetcherLabels[types.TeleportInternalDiscoveryGroupName] = w.cfg.DiscoveryGroup
}

// Set the discovery type label to provide information about the
// matcher type that matched the resource.
if t := lFetcher.FetcherType(); t != "" {
fetcherLabels[types.DiscoveryTypeLabel] = t
}

// Set the origin label to provide information where resource comes from
fetcherLabels[types.OriginLabel] = w.cfg.Origin
if c := lFetcher.Cloud(); c != "" {
fetcherLabels[types.CloudLabel] = c
}

for _, r := range resources {
staticLabels := r.GetStaticLabels()
if staticLabels == nil {
staticLabels = make(map[string]string)
}

if w.cfg.DiscoveryGroup != "" {
// Add the discovery group name to the static labels of each resource.
staticLabels[types.TeleportInternalDiscoveryGroupName] = w.cfg.DiscoveryGroup
}

// Set the origin label to provide information where resource comes from
staticLabels[types.OriginLabel] = w.cfg.Origin
if c := lFetcher.Cloud(); c != "" {
staticLabels[types.CloudLabel] = c
}

// Set the discovery type label to provide information about the
// matcher type that matched the resource.
if t := lFetcher.FetcherType(); t != "" {
staticLabels[types.DiscoveryTypeLabel] = t
}

maps.Copy(staticLabels, fetcherLabels)
r.SetStaticLabels(staticLabels)
}

Expand Down
7 changes: 7 additions & 0 deletions lib/srv/discovery/common/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ func (m *mockFetcher) FetcherType() string {
return "empty"
}

func (m *mockFetcher) IntegrationName() string {
return ""
}

func (m *mockFetcher) DiscoveryConfigName() string {
return ""
}
func (m *mockFetcher) Cloud() string {
return m.cloud
}
18 changes: 9 additions & 9 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
return nil, trace.Wrap(err)
}

databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers)
databaseFetchers, err := s.databaseFetchersFromMatchers(cfg.Matchers, "" /* discovery config */)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -509,7 +509,7 @@ func (s *Server) initAWSWatchers(matchers []types.AWSMatcher) error {
_, otherMatchers = splitMatchers(otherMatchers, db.IsAWSMatcherType)

// Add non-integration kube fetchers.
kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers)
kubeFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, otherMatchers, "" /* discovery config */)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -598,13 +598,13 @@ func (s *Server) gcpServerFetchersFromMatchers(ctx context.Context, matchers []t
}

// databaseFetchersFromMatchers converts Matchers into a set of Database Fetchers.
func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
func (s *Server) databaseFetchersFromMatchers(matchers Matchers, discoveryConfig string) ([]common.Fetcher, error) {
var fetchers []common.Fetcher

// AWS
awsDatabaseMatchers, _ := splitMatchers(matchers.AWS, db.IsAWSMatcherType)
if len(awsDatabaseMatchers) > 0 {
databaseFetchers, err := db.MakeAWSFetchers(s.ctx, s.CloudClients, awsDatabaseMatchers)
databaseFetchers, err := db.MakeAWSFetchers(s.ctx, s.CloudClients, awsDatabaseMatchers, discoveryConfig)
if err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -627,15 +627,15 @@ func (s *Server) databaseFetchersFromMatchers(matchers Matchers) ([]common.Fetch
return fetchers, nil
}

func (s *Server) kubeFetchersFromMatchers(matchers Matchers) ([]common.Fetcher, error) {
func (s *Server) kubeFetchersFromMatchers(matchers Matchers, discoveryConfig string) ([]common.Fetcher, error) {
var result []common.Fetcher

// AWS
awsKubeMatchers, _ := splitMatchers(matchers.AWS, func(matcherType string) bool {
return matcherType == types.AWSMatcherEKS
})
if len(awsKubeMatchers) > 0 {
eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, awsKubeMatchers)
eksFetchers, err := fetchers.MakeEKSFetchersFromAWSMatchers(s.LegacyLogger, s.CloudClients, awsKubeMatchers, discoveryConfig)
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -1656,7 +1656,7 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicServerGCPFetchers[dc.GetName()] = gcpServerFetchers
s.muDynamicServerGCPFetchers.Unlock()

databaseFetchers, err := s.databaseFetchersFromMatchers(matchers)
databaseFetchers, err := s.databaseFetchersFromMatchers(matchers, dc.GetName())
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -1675,7 +1675,7 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicTAGSyncFetchers[dc.GetName()] = awsSyncMatchers
s.muDynamicTAGSyncFetchers.Unlock()

kubeFetchers, err := s.kubeFetchersFromMatchers(matchers)
kubeFetchers, err := s.kubeFetchersFromMatchers(matchers, dc.GetName())
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -1684,7 +1684,7 @@ func (s *Server) upsertDynamicMatchers(ctx context.Context, dc *discoveryconfig.
s.dynamicKubeFetchers[dc.GetName()] = kubeFetchers
s.muDynamicKubeFetchers.Unlock()

// TODO(marco): add other fetchers: Kube Clusters and Kube Resources (Apps)
// TODO(marco): add other fetchers: Kube Resources (Apps)
return nil
}

Expand Down
Loading

0 comments on commit 7efa13b

Please sign in to comment.