From 17b12970f15823c0ca178cb35f71ba398e8876c8 Mon Sep 17 00:00:00 2001 From: Gavin Frazar Date: Tue, 16 Jul 2024 19:35:55 -0700 Subject: [PATCH] fix discovery overwriting dynamic resources When the resource already exists, check its origin as well as discovery group. If it's not of discovery origin, then don't update it. If it's not in the same discovery group, and its discovery group is not blank, then don't update it. --- lib/srv/discovery/database_watcher.go | 15 +-- lib/srv/discovery/discovery.go | 38 ++++++- lib/srv/discovery/discovery_test.go | 125 +++++++++++++++++---- lib/srv/discovery/kube_services_watcher.go | 11 +- lib/srv/discovery/kube_watcher.go | 12 +- 5 files changed, 159 insertions(+), 42 deletions(-) diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index c49b32f4eaeb5..c3ab1abb437bf 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -138,16 +138,17 @@ func (s *Server) getCurrentDatabases() map[string]types.Database { func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) error { s.Log.Debugf("Creating database %s.", database.GetName()) err := s.AccessPoint.CreateDatabase(ctx, database) - // If the database already exists but has an empty discovery group, update it. - if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup( - func() (types.ResourceWithLabels, error) { + // If the database already exists but has cloud origin and an empty + // discovery group, then update it. + if err != nil { + err := s.resolveCreateErr(err, types.OriginCloud, func() (types.ResourceWithLabels, error) { return s.AccessPoint.GetDatabase(ctx, database.GetName()) - }) { + }) + if err != nil { + return trace.Wrap(err) + } return trace.Wrap(s.onDatabaseUpdate(ctx, database, nil)) } - if err != nil { - return trace.Wrap(err) - } err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{ databaseEventPrefix + database.GetName(): { ResourceType: types.DiscoveredResourceDatabase, diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index dcf4da61cee47..9f9d4c0d33481 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -1686,14 +1686,42 @@ func splitMatchers[T types.Matcher](matchers []T, matcherTypeCheck func(string) return } -func (s *Server) updatesEmptyDiscoveryGroup(getter func() (types.ResourceWithLabels, error)) bool { - if s.DiscoveryGroup == "" { - return false +func (s *Server) resolveCreateErr(createErr error, discoveryOrigin string, getter func() (types.ResourceWithLabels, error)) error { + // We can only resolve the error if we have a discovery group configured + // and the error is that the resource already exists. + if s.DiscoveryGroup == "" || !trace.IsAlreadyExists(createErr) { + return trace.Wrap(createErr) } + old, err := getter() if err != nil { - return false + return trace.NewAggregate(createErr, err) + } + + // Check that the registered resource origin matches the origin we want. + oldOrigin, err := types.GetOrigin(old) + if err != nil { + return trace.NewAggregate(createErr, err) + } + if oldOrigin != discoveryOrigin { + return trace.Wrap(createErr, + "not updating because the resource origin indicates that it is not managed by auto-discovery", + ) } + + // Check that the registered resource's discovery group is blank or matches + // this server's discovery group. + // We check if the old group is empty because that's a special case where + // the old/new groups don't match but we still want to update the resource. + // In this way, discovery agents with a discovery_group essentially claim + // the resources they discover that used to be (or currently are) discovered + // by an agent that did not have a discovery_group configured. oldDiscoveryGroup, _ := old.GetLabel(types.TeleportInternalDiscoveryGroupName) - return oldDiscoveryGroup == "" + if oldDiscoveryGroup != "" && oldDiscoveryGroup != s.DiscoveryGroup { + return trace.Wrap(createErr, + "not updating because the resource is in a different discovery group", + ) + } + + return nil } diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 18c02109becf9..ce0e18db784a2 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -2793,12 +2793,7 @@ func TestGCPVMDiscovery(t *testing.T) { // TestServer_onCreate tests the update of the discovery_group of a resource // when a resource already exists with the same name but an empty discovery_group. func TestServer_onCreate(t *testing.T) { - _, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test") - _, awsRedshiftDBEmptyDiscoveryGroup := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "" /* empty discovery group */) - accessPoint := &fakeAccessPoint{ - kube: mustConvertEKSToKubeCluster(t, eksMockClusters[0], "" /* empty discovery group */), - database: awsRedshiftDBEmptyDiscoveryGroup, - } + accessPoint := &fakeAccessPoint{} s := &Server{ Config: &Config{ DiscoveryGroup: "test-cluster", @@ -2808,31 +2803,106 @@ func TestServer_onCreate(t *testing.T) { } t.Run("onCreate update kube", func(t *testing.T) { + // With cloud origin and an empty discovery group, it should update. + accessPoint.kube = mustConvertEKSToKubeCluster(t, eksMockClusters[0], "" /* empty discovery group */) err := s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) require.NoError(t, err) - require.True(t, accessPoint.updateKube) + require.True(t, accessPoint.updatedKube) + + // Reset the updated flag and set the registered kube cluster to have + // non-cloud origin. It should not update. + accessPoint.updatedKube = false + accessPoint.kube.SetOrigin(types.OriginDynamic) + err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) + require.Error(t, err) + require.False(t, accessPoint.updatedKube) - // Reset the update flag. - accessPoint.updateKube = false + // Reset the updated flag and set the registered kube cluster to have + // an empty origin. It should not update. + accessPoint.updatedKube = false + accessPoint.kube.SetOrigin("") + err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) + require.Error(t, err) + require.False(t, accessPoint.updatedKube) + + // Reset the update flag and set the registered kube cluster to have + // a non-empty discovery group. It should not update. + accessPoint.updatedKube = false accessPoint.kube = mustConvertEKSToKubeCluster(t, eksMockClusters[0], "nonEmpty") - // Update the kube cluster with non-empty discovery group. err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) require.Error(t, err) - require.False(t, accessPoint.updateKube) + require.False(t, accessPoint.updatedKube) }) t.Run("onCreate update database", func(t *testing.T) { + _, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test") + _, awsRedshiftDBEmptyDiscoveryGroup := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "" /* empty discovery group */) + + // With cloud origin and an empty discovery group, it should update. + accessPoint.database = awsRedshiftDBEmptyDiscoveryGroup err := s.onDatabaseCreate(context.Background(), awsRedshiftDB) require.NoError(t, err) - require.True(t, accessPoint.updateDatabase) + require.True(t, accessPoint.updatedDatabase) - // Reset the update flag. - accessPoint.updateDatabase = false + // Reset the updated flag and set the db to empty discovery group + // but non-cloud origin. It should not update. + accessPoint.updatedDatabase = false + accessPoint.database.SetOrigin(types.OriginDynamic) + err = s.onDatabaseCreate(context.Background(), awsRedshiftDB) + require.Error(t, err) + require.False(t, accessPoint.updatedDatabase) + + // Reset the updated flag and set the db to empty discovery group + // but empty origin. It should not update. + accessPoint.updatedDatabase = false + accessPoint.database.SetOrigin("") + err = s.onDatabaseCreate(context.Background(), awsRedshiftDB) + require.Error(t, err) + require.False(t, accessPoint.updatedDatabase) + + // Reset the updated flag and set the registered db to have a non-empty + // discovery group. It should not update. + accessPoint.updatedDatabase = false accessPoint.database = awsRedshiftDB - // Update the db with non-empty discovery group. err = s.onDatabaseCreate(context.Background(), awsRedshiftDB) require.Error(t, err) - require.False(t, accessPoint.updateDatabase) + require.False(t, accessPoint.updatedDatabase) + }) + + t.Run("onCreate update app", func(t *testing.T) { + kubeSvc := newMockKubeService("service1", "ns1", "", + map[string]string{"test-label": "testval"}, nil, + []corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}}) + + // With kube origin and empty discovery group, it should update. + accessPoint.app = mustConvertKubeServiceToApp(t, "" /*empty discovery group*/, "http", kubeSvc, kubeSvc.Spec.Ports[0]) + err := s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])) + require.NoError(t, err) + require.True(t, accessPoint.updatedApp) + + // Reset the updated flag and set the app to empty discovery group + // but non-cloud origin. It should not update. + accessPoint.updatedApp = false + accessPoint.app.SetOrigin(types.OriginDynamic) + err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])) + require.Error(t, err) + require.False(t, accessPoint.updatedApp) + + // Reset the updated flag and set the app to empty discovery group + // but non-cloud origin. It should not update. + accessPoint.updatedApp = false + accessPoint.app.SetOrigin("") + err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])) + require.Error(t, err) + require.False(t, accessPoint.updatedApp) + + // Reset the updated flag and set the app to non-empty discovery group. + // It should not update. + accessPoint.updatedApp = false + accessPoint.app = mustConvertKubeServiceToApp(t, "nonEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0]) + err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])) + require.Error(t, err) + require.False(t, accessPoint.updatedApp) }) } @@ -2931,10 +3001,12 @@ type fakeAccessPoint struct { ping func(context.Context) (proto.PingResponse, error) enrollEKSClusters func(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) - updateKube bool - updateDatabase bool + updatedKube bool + updatedDatabase bool + updatedApp bool kube types.KubeCluster database types.Database + app types.Application upsertedServerInfos chan types.ServerInfo reports map[string][]discoveryconfig.Status } @@ -2981,7 +3053,7 @@ func (f *fakeAccessPoint) CreateDatabase(ctx context.Context, database types.Dat } func (f *fakeAccessPoint) UpdateDatabase(ctx context.Context, database types.Database) error { - f.updateDatabase = true + f.updatedDatabase = true return nil } @@ -2991,7 +3063,20 @@ func (f *fakeAccessPoint) CreateKubernetesCluster(ctx context.Context, cluster t // UpdateKubernetesCluster updates existing kubernetes cluster resource. func (f *fakeAccessPoint) UpdateKubernetesCluster(ctx context.Context, cluster types.KubeCluster) error { - f.updateKube = true + f.updatedKube = true + return nil +} + +func (f *fakeAccessPoint) GetApp(ctx context.Context, name string) (types.Application, error) { + return f.app, nil +} + +func (f *fakeAccessPoint) CreateApp(ctx context.Context, _ types.Application) error { + return trace.AlreadyExists("already exists") +} + +func (f *fakeAccessPoint) UpdateApp(ctx context.Context, _ types.Application) error { + f.updatedApp = true return nil } diff --git a/lib/srv/discovery/kube_services_watcher.go b/lib/srv/discovery/kube_services_watcher.go index 88b978f283cff..eb6d68cc964f7 100644 --- a/lib/srv/discovery/kube_services_watcher.go +++ b/lib/srv/discovery/kube_services_watcher.go @@ -122,11 +122,14 @@ func (s *Server) onAppCreate(ctx context.Context, app types.Application) error { // In this case, we need to update the resource with the // discovery group label to ensure the user doesn't have to manually delete // the resource. - if trace.IsAlreadyExists(err) { - return trace.Wrap(s.onAppUpdate(ctx, app, nil)) - } if err != nil { - return trace.Wrap(err) + err := s.resolveCreateErr(err, types.OriginDiscoveryKubernetes, func() (types.ResourceWithLabels, error) { + return s.AccessPoint.GetApp(ctx, app.GetName()) + }) + if err != nil { + return trace.Wrap(err) + } + return trace.Wrap(s.onAppUpdate(ctx, app, nil)) } err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{ appEventPrefix + app.GetName(): { diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index 58c61475c8f32..e18cc23e68c99 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -121,15 +121,15 @@ func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster s.Log.Debugf("Creating kube_cluster %s.", kubeCluster.GetName()) err := s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster) // If the kube already exists but has an empty discovery group, update it. - if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup( - func() (types.ResourceWithLabels, error) { + if err != nil { + err := s.resolveCreateErr(err, types.OriginCloud, func() (types.ResourceWithLabels, error) { return s.AccessPoint.GetKubernetesCluster(ctx, kubeCluster.GetName()) - }) { + }) + if err != nil { + return trace.Wrap(err) + } return trace.Wrap(s.onKubeUpdate(ctx, kubeCluster, nil)) } - if err != nil { - return trace.Wrap(err) - } err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{ kubeEventPrefix + kubeCluster.GetName(): { ResourceType: types.DiscoveredResourceKubernetes,