Skip to content

Commit

Permalink
DiscoveryConfig Status: report counters for RDS and EKS
Browse files Browse the repository at this point in the history
This PR changes the DiscoveryService to add a new status report for RDS
and EKS Clusters.

It will, for each DiscoveryConfig/Integration, report how many resources
were found, enrolled and failed to enroll.
  • Loading branch information
marcoandredinis committed Nov 19, 2024
1 parent 563216b commit 4d5a568
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 36 deletions.
23 changes: 22 additions & 1 deletion lib/srv/discovery/common/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,22 @@ type Watcher struct {
ctx context.Context
// resourcesC is a channel where fetched resourcess are sent.
resourcesC chan (types.ResourcesWithLabels)
// preFetchHookFn is called before starting a new fetch cycle.
preFetchHookFn func()
}

// WatcherOption is a functional option for the Watcher.
type WatcherOption func(*Watcher)

// WithPreFetchHookFn sets a function that gets called before each new iteration.
func WithPreFetchHookFn(f func()) WatcherOption {
return func(w *Watcher) {
w.preFetchHookFn = f
}
}

// NewWatcher returns a new instance of a common discovery watcher.
func NewWatcher(ctx context.Context, config WatcherConfig) (*Watcher, error) {
func NewWatcher(ctx context.Context, config WatcherConfig, options ...WatcherOption) (*Watcher, error) {
if err := config.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
Expand All @@ -108,6 +120,11 @@ func NewWatcher(ctx context.Context, config WatcherConfig) (*Watcher, error) {
ctx: ctx,
resourcesC: make(chan types.ResourcesWithLabels),
}

for _, opt := range options {
opt(watcher)
}

return watcher, nil
}

Expand Down Expand Up @@ -141,6 +158,10 @@ func (w *Watcher) Start() {

// fetchAndSend fetches resources from all fetchers and sends them to the channel.
func (w *Watcher) fetchAndSend() {
if w.preFetchHookFn != nil {
w.preFetchHookFn()
}

var (
newFetcherResources = make(types.ResourcesWithLabels, 0, 50)
fetchersLock sync.Mutex
Expand Down
20 changes: 14 additions & 6 deletions lib/srv/discovery/common/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package common

import (
"context"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -61,12 +62,17 @@ func TestWatcher(t *testing.T) {
}

clock := clockwork.NewFakeClock()
watcher, err := NewWatcher(ctx, WatcherConfig{
FetchersFn: StaticFetchers([]Fetcher{appFetcher, noAuthFetcher, dbFetcher}),
Interval: time.Hour,
Clock: clock,
Origin: types.OriginCloud,
})
fetchIterations := atomic.Uint32{}
watcher, err := NewWatcher(ctx,
WatcherConfig{
FetchersFn: StaticFetchers([]Fetcher{appFetcher, noAuthFetcher, dbFetcher}),
Interval: time.Hour,
Clock: clock,
Origin: types.OriginCloud,
},
WithPreFetchHookFn(func() {
fetchIterations.Add(1)
}))
require.NoError(t, err)
go watcher.Start()

Expand All @@ -77,6 +83,8 @@ func TestWatcher(t *testing.T) {
// Watcher should fetch again after interval.
clock.Advance(time.Hour + time.Minute)
assertFetchResources(t, watcher, wantResources)

require.Equal(t, uint32(2), fetchIterations.Load())
}

func TestWatcherWithDynamicFetchers(t *testing.T) {
Expand Down
53 changes: 43 additions & 10 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,30 @@ func (s *Server) startDatabaseWatchers() error {
return trace.Wrap(err)
}

watcher, err := common.NewWatcher(s.ctx, common.WatcherConfig{
FetchersFn: s.getAllDatabaseFetchers,
Log: s.LegacyLogger.WithField("kind", types.KindDatabase),
DiscoveryGroup: s.DiscoveryGroup,
Interval: s.PollInterval,
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
Origin: types.OriginCloud,
Clock: s.clock,
})
watcher, err := common.NewWatcher(s.ctx,
common.WatcherConfig{
FetchersFn: s.getAllDatabaseFetchers,
Log: s.LegacyLogger.WithField("kind", types.KindDatabase),
DiscoveryGroup: s.DiscoveryGroup,
Interval: s.PollInterval,
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
Origin: types.OriginCloud,
Clock: s.clock,
},
common.WithPreFetchHookFn(func() {
s.awsRDSResourcesStatus.reset()
}),
)
if err != nil {
return trace.Wrap(err)
}
go watcher.Start()

go func() {
for {
discoveryConfigsChanged := map[string]struct{}{}
resourcesFoundByGroup := make(map[awsResourceGroup]int)

select {
case newResources := <-watcher.ResourcesC():
dbs := make([]types.Database, 0, len(newResources))
Expand All @@ -89,21 +97,46 @@ func (s *Server) startDatabaseWatchers() error {
continue
}

resourceGroup := awsResourceGroupFromLabels(db.GetStaticLabels())
resourcesFoundByGroup[resourceGroup] = resourcesFoundByGroup[resourceGroup] + 1
discoveryConfigsChanged[resourceGroup.discoveryConfig] = struct{}{}

dbs = append(dbs, db)
}
mu.Lock()
newDatabases = dbs
mu.Unlock()

for group, count := range resourcesFoundByGroup {
s.awsRDSResourcesStatus.incrementFound(group, count)
}

if err := reconciler.Reconcile(s.ctx); err != nil {
s.Log.WarnContext(s.ctx, "Unable to reconcile database resources", "error", err)
} else if s.onDatabaseReconcile != nil {

// When reconcile fails, it is assumed that everything failed.
for group, count := range resourcesFoundByGroup {
s.awsRDSResourcesStatus.incrementFailed(group, count)
}

break
}

for group, count := range resourcesFoundByGroup {
s.awsRDSResourcesStatus.incrementEnrolled(group, count)
}

if s.onDatabaseReconcile != nil {
s.onDatabaseReconcile()
}

case <-s.ctx.Done():
return
}

for dc := range discoveryConfigsChanged {
s.updateDiscoveryConfigStatus(dc)
}
}
}()
return nil
Expand Down
5 changes: 5 additions & 0 deletions lib/srv/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ type Server struct {

awsSyncStatus awsSyncStatus
awsEC2ResourcesStatus awsResourcesStatus
awsRDSResourcesStatus awsResourcesStatus
awsEKSResourcesStatus awsResourcesStatus
awsEC2Tasks awsEC2Tasks

// caRotationCh receives nodes that need to have their CAs rotated.
Expand Down Expand Up @@ -378,6 +380,9 @@ func New(ctx context.Context, cfg *Config) (*Server, error) {
dynamicTAGSyncFetchers: make(map[string][]aws_sync.AWSSync),
dynamicDiscoveryConfig: make(map[string]*discoveryconfig.DiscoveryConfig),
awsSyncStatus: awsSyncStatus{},
awsEC2ResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEC2),
awsRDSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherRDS),
awsEKSResourcesStatus: newAWSResourceStatusCollector(types.AWSMatcherEKS),
}
s.discardUnsupportedMatchers(&s.Matchers)

Expand Down
57 changes: 57 additions & 0 deletions lib/srv/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,8 @@ func TestDiscoveryDatabase(t *testing.T) {
awsRDSDBWithRole.SetAWSAssumeRole("arn:aws:iam::123456789012:role/test-role")
awsRDSDBWithRole.SetAWSExternalID("test123")

eksAWSResource, _ := makeEKSCluster(t, "aws-eks", "us-east-1", rewriteDiscoveryLabelsParams{discoveryGroup: mainDiscoveryGroup, integration: integrationName, discoveryConfig: discoveryConfigName})

matcherForDiscoveryConfigFn := func(t *testing.T, discoveryGroup string, m Matchers) *discoveryconfig.DiscoveryConfig {
dc, err := discoveryconfig.NewDiscoveryConfig(
header.Metadata{Name: discoveryConfigName},
Expand Down Expand Up @@ -1892,6 +1894,9 @@ func TestDiscoveryDatabase(t *testing.T) {
&azure.ARMRedisEnterpriseClusterMock{},
&azure.ARMRedisEnterpriseDatabaseMock{},
),
EKS: &mocks.EKSMock{
Clusters: []*eks.Cluster{eksAWSResource},
},
}

tcs := []struct {
Expand All @@ -1902,6 +1907,7 @@ func TestDiscoveryDatabase(t *testing.T) {
azureMatchers []types.AzureMatcher
expectDatabases []types.Database
discoveryConfigs func(*testing.T) []*discoveryconfig.DiscoveryConfig
discoveryConfigStatusCheck func(*testing.T, discoveryconfig.Status)
wantEvents int
}{
{
Expand Down Expand Up @@ -2100,6 +2106,12 @@ func TestDiscoveryDatabase(t *testing.T) {
return []*discoveryconfig.DiscoveryConfig{dc1}
},
wantEvents: 1,
discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) {
require.Equal(t, uint64(1), s.DiscoveredResources)
require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsRds.Enrolled)
require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsRds.Found)
require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsRds.Failed)
},
},
{
name: "running in integrations-only-mode with a matcher without an integration, must find 1 database",
Expand All @@ -2113,6 +2125,27 @@ func TestDiscoveryDatabase(t *testing.T) {
expectDatabases: []types.Database{awsRedshiftDBWithIntegration},
wantEvents: 1,
},
{
name: "running in integrations-only-mode with a dynamic matcher with an integration, must find 1 eks cluster",
discoveryConfigs: func(t *testing.T) []*discoveryconfig.DiscoveryConfig {
dc1 := matcherForDiscoveryConfigFn(t, mainDiscoveryGroup, Matchers{
AWS: []types.AWSMatcher{{
Types: []string{types.AWSMatcherEKS},
Tags: map[string]utils.Strings{types.Wildcard: {types.Wildcard}},
Regions: []string{"us-east-1"},
Integration: integrationName,
}},
})
return []*discoveryconfig.DiscoveryConfig{dc1}
},
expectDatabases: []types.Database{},
wantEvents: 0,
discoveryConfigStatusCheck: func(t *testing.T, s discoveryconfig.Status) {
require.Equal(t, uint64(1), s.DiscoveredResources)
require.Equal(t, uint64(1), s.IntegrationDiscoveredResources[integrationName].AwsEks.Found)
require.Zero(t, s.IntegrationDiscoveredResources[integrationName].AwsEks.Enrolled)
},
},
}

for _, tc := range tcs {
Expand Down Expand Up @@ -2213,6 +2246,13 @@ func TestDiscoveryDatabase(t *testing.T) {
return reporter.ResourceCreateEventCount() != 0
}, time.Second, 100*time.Millisecond)
}

if tc.discoveryConfigStatusCheck != nil {
dc, err := tlsServer.Auth().GetDiscoveryConfig(ctx, discoveryConfigName)
require.NoError(t, err)

tc.discoveryConfigStatusCheck(t, dc.Status)
}
})
}
}
Expand Down Expand Up @@ -2375,6 +2415,23 @@ func TestDiscoveryDatabaseRemovingDiscoveryConfigs(t *testing.T) {
})
}

func makeEKSCluster(t *testing.T, name, region string, discoveryParams rewriteDiscoveryLabelsParams) (*eks.Cluster, types.KubeCluster) {
t.Helper()
eksAWSCluster := &eks.Cluster{
Name: aws.String(name),
Arn: aws.String(fmt.Sprintf("arn:aws:eks:%s:123456789012:cluster/%s", region, name)),
Status: aws.String(eks.ClusterStatusActive),
Tags: map[string]*string{
"env": aws.String("prod"),
},
}
actual, err := common.NewKubeClusterFromAWSEKS(aws.StringValue(eksAWSCluster.Name), aws.StringValue(eksAWSCluster.Arn), eksAWSCluster.Tags)
require.NoError(t, err)
discoveryParams.matcherType = types.AWSMatcherEKS
rewriteCloudResource(t, actual, discoveryParams)
return eksAWSCluster, actual
}

func makeRDSInstance(t *testing.T, name, region string, discoveryParams rewriteDiscoveryLabelsParams) (*rds.DBInstance, types.Database) {
instance := &rds.DBInstance{
DBInstanceArn: aws.String(fmt.Sprintf("arn:aws:rds:%v:123456789012:db:%v", region, name)),
Expand Down
Loading

0 comments on commit 4d5a568

Please sign in to comment.