diff --git a/lib/srv/discovery/database_watcher.go b/lib/srv/discovery/database_watcher.go index c49b32f4eaeb5..c3ab1abb437bf 100644 --- a/lib/srv/discovery/database_watcher.go +++ b/lib/srv/discovery/database_watcher.go @@ -138,16 +138,17 @@ func (s *Server) getCurrentDatabases() map[string]types.Database { func (s *Server) onDatabaseCreate(ctx context.Context, database types.Database) error { s.Log.Debugf("Creating database %s.", database.GetName()) err := s.AccessPoint.CreateDatabase(ctx, database) - // If the database already exists but has an empty discovery group, update it. - if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup( - func() (types.ResourceWithLabels, error) { + // If the database already exists but has cloud origin and an empty + // discovery group, then update it. + if err != nil { + err := s.resolveCreateErr(err, types.OriginCloud, func() (types.ResourceWithLabels, error) { return s.AccessPoint.GetDatabase(ctx, database.GetName()) - }) { + }) + if err != nil { + return trace.Wrap(err) + } return trace.Wrap(s.onDatabaseUpdate(ctx, database, nil)) } - if err != nil { - return trace.Wrap(err) - } err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{ databaseEventPrefix + database.GetName(): { ResourceType: types.DiscoveredResourceDatabase, diff --git a/lib/srv/discovery/discovery.go b/lib/srv/discovery/discovery.go index c3dbdcfc5067d..9dd84664c36c7 100644 --- a/lib/srv/discovery/discovery.go +++ b/lib/srv/discovery/discovery.go @@ -1683,14 +1683,42 @@ func splitMatchers[T types.Matcher](matchers []T, matcherTypeCheck func(string) return } -func (s *Server) updatesEmptyDiscoveryGroup(getter func() (types.ResourceWithLabels, error)) bool { - if s.DiscoveryGroup == "" { - return false +func (s *Server) resolveCreateErr(createErr error, discoveryOrigin string, getter func() (types.ResourceWithLabels, error)) error { + // We can only resolve the error if we have a discovery group configured + // and the error is that the resource already exists. + if s.DiscoveryGroup == "" || !trace.IsAlreadyExists(createErr) { + return trace.Wrap(createErr) } + old, err := getter() if err != nil { - return false + return trace.NewAggregate(createErr, err) + } + + // Check that the registered resource origin matches the origin we want. + oldOrigin, err := types.GetOrigin(old) + if err != nil { + return trace.NewAggregate(createErr, err) + } + if oldOrigin != discoveryOrigin { + return trace.Wrap(createErr, + "not updating because the resource origin indicates that it is not managed by auto-discovery", + ) } + + // Check that the registered resource's discovery group is blank or matches + // this server's discovery group. + // We check if the old group is empty because that's a special case where + // the old/new groups don't match but we still want to update the resource. + // In this way, discovery agents with a discovery_group essentially claim + // the resources they discover that used to be (or currently are) discovered + // by an agent that did not have a discovery_group configured. oldDiscoveryGroup, _ := old.GetLabel(types.TeleportInternalDiscoveryGroupName) - return oldDiscoveryGroup == "" + if oldDiscoveryGroup != "" && oldDiscoveryGroup != s.DiscoveryGroup { + return trace.Wrap(createErr, + "not updating because the resource is in a different discovery group", + ) + } + + return nil } diff --git a/lib/srv/discovery/discovery_test.go b/lib/srv/discovery/discovery_test.go index 6204fc9e61f09..31d8de3c2f739 100644 --- a/lib/srv/discovery/discovery_test.go +++ b/lib/srv/discovery/discovery_test.go @@ -2786,12 +2786,7 @@ func TestGCPVMDiscovery(t *testing.T) { // TestServer_onCreate tests the update of the discovery_group of a resource // when a resource already exists with the same name but an empty discovery_group. func TestServer_onCreate(t *testing.T) { - _, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test") - _, awsRedshiftDBEmptyDiscoveryGroup := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "" /* empty discovery group */) - accessPoint := &fakeAccessPoint{ - kube: mustConvertEKSToKubeCluster(t, eksMockClusters[0], "" /* empty discovery group */), - database: awsRedshiftDBEmptyDiscoveryGroup, - } + accessPoint := &fakeAccessPoint{} s := &Server{ Config: &Config{ DiscoveryGroup: "test-cluster", @@ -2801,31 +2796,106 @@ func TestServer_onCreate(t *testing.T) { } t.Run("onCreate update kube", func(t *testing.T) { + // With cloud origin and an empty discovery group, it should update. + accessPoint.kube = mustConvertEKSToKubeCluster(t, eksMockClusters[0], "" /* empty discovery group */) err := s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) require.NoError(t, err) - require.True(t, accessPoint.updateKube) + require.True(t, accessPoint.updatedKube) + + // Reset the updated flag and set the registered kube cluster to have + // non-cloud origin. It should not update. + accessPoint.updatedKube = false + accessPoint.kube.SetOrigin(types.OriginDynamic) + err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) + require.Error(t, err) + require.False(t, accessPoint.updatedKube) - // Reset the update flag. - accessPoint.updateKube = false + // Reset the updated flag and set the registered kube cluster to have + // an empty origin. It should not update. + accessPoint.updatedKube = false + accessPoint.kube.SetOrigin("") + err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) + require.Error(t, err) + require.False(t, accessPoint.updatedKube) + + // Reset the update flag and set the registered kube cluster to have + // a non-empty discovery group. It should not update. + accessPoint.updatedKube = false accessPoint.kube = mustConvertEKSToKubeCluster(t, eksMockClusters[0], "nonEmpty") - // Update the kube cluster with non-empty discovery group. err = s.onKubeCreate(context.Background(), mustConvertEKSToKubeCluster(t, eksMockClusters[0], "test-cluster")) require.Error(t, err) - require.False(t, accessPoint.updateKube) + require.False(t, accessPoint.updatedKube) }) t.Run("onCreate update database", func(t *testing.T) { + _, awsRedshiftDB := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "test") + _, awsRedshiftDBEmptyDiscoveryGroup := makeRedshiftCluster(t, "aws-redshift", "us-east-1", "" /* empty discovery group */) + + // With cloud origin and an empty discovery group, it should update. + accessPoint.database = awsRedshiftDBEmptyDiscoveryGroup err := s.onDatabaseCreate(context.Background(), awsRedshiftDB) require.NoError(t, err) - require.True(t, accessPoint.updateDatabase) + require.True(t, accessPoint.updatedDatabase) - // Reset the update flag. - accessPoint.updateDatabase = false + // Reset the updated flag and set the db to empty discovery group + // but non-cloud origin. It should not update. + accessPoint.updatedDatabase = false + accessPoint.database.SetOrigin(types.OriginDynamic) + err = s.onDatabaseCreate(context.Background(), awsRedshiftDB) + require.Error(t, err) + require.False(t, accessPoint.updatedDatabase) + + // Reset the updated flag and set the db to empty discovery group + // but empty origin. It should not update. + accessPoint.updatedDatabase = false + accessPoint.database.SetOrigin("") + err = s.onDatabaseCreate(context.Background(), awsRedshiftDB) + require.Error(t, err) + require.False(t, accessPoint.updatedDatabase) + + // Reset the updated flag and set the registered db to have a non-empty + // discovery group. It should not update. + accessPoint.updatedDatabase = false accessPoint.database = awsRedshiftDB - // Update the db with non-empty discovery group. err = s.onDatabaseCreate(context.Background(), awsRedshiftDB) require.Error(t, err) - require.False(t, accessPoint.updateDatabase) + require.False(t, accessPoint.updatedDatabase) + }) + + t.Run("onCreate update app", func(t *testing.T) { + kubeSvc := newMockKubeService("service1", "ns1", "", + map[string]string{"test-label": "testval"}, nil, + []corev1.ServicePort{{Port: 42, Name: "http", Protocol: corev1.ProtocolTCP}}) + + // With kube origin and empty discovery group, it should update. + accessPoint.app = mustConvertKubeServiceToApp(t, "" /*empty discovery group*/, "http", kubeSvc, kubeSvc.Spec.Ports[0]) + err := s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])) + require.NoError(t, err) + require.True(t, accessPoint.updatedApp) + + // Reset the updated flag and set the app to empty discovery group + // but non-cloud origin. It should not update. + accessPoint.updatedApp = false + accessPoint.app.SetOrigin(types.OriginDynamic) + err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])) + require.Error(t, err) + require.False(t, accessPoint.updatedApp) + + // Reset the updated flag and set the app to empty discovery group + // but non-cloud origin. It should not update. + accessPoint.updatedApp = false + accessPoint.app.SetOrigin("") + err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])) + require.Error(t, err) + require.False(t, accessPoint.updatedApp) + + // Reset the updated flag and set the app to non-empty discovery group. + // It should not update. + accessPoint.updatedApp = false + accessPoint.app = mustConvertKubeServiceToApp(t, "nonEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0]) + err = s.onAppCreate(context.Background(), mustConvertKubeServiceToApp(t, "notEmpty", "http", kubeSvc, kubeSvc.Spec.Ports[0])) + require.Error(t, err) + require.False(t, accessPoint.updatedApp) }) } @@ -2924,10 +2994,12 @@ type fakeAccessPoint struct { ping func(context.Context) (proto.PingResponse, error) enrollEKSClusters func(context.Context, *integrationpb.EnrollEKSClustersRequest, ...grpc.CallOption) (*integrationpb.EnrollEKSClustersResponse, error) - updateKube bool - updateDatabase bool + updatedKube bool + updatedDatabase bool + updatedApp bool kube types.KubeCluster database types.Database + app types.Application upsertedServerInfos chan types.ServerInfo reports map[string][]discoveryconfig.Status } @@ -2974,7 +3046,7 @@ func (f *fakeAccessPoint) CreateDatabase(ctx context.Context, database types.Dat } func (f *fakeAccessPoint) UpdateDatabase(ctx context.Context, database types.Database) error { - f.updateDatabase = true + f.updatedDatabase = true return nil } @@ -2984,7 +3056,20 @@ func (f *fakeAccessPoint) CreateKubernetesCluster(ctx context.Context, cluster t // UpdateKubernetesCluster updates existing kubernetes cluster resource. func (f *fakeAccessPoint) UpdateKubernetesCluster(ctx context.Context, cluster types.KubeCluster) error { - f.updateKube = true + f.updatedKube = true + return nil +} + +func (f *fakeAccessPoint) GetApp(ctx context.Context, name string) (types.Application, error) { + return f.app, nil +} + +func (f *fakeAccessPoint) CreateApp(ctx context.Context, _ types.Application) error { + return trace.AlreadyExists("already exists") +} + +func (f *fakeAccessPoint) UpdateApp(ctx context.Context, _ types.Application) error { + f.updatedApp = true return nil } diff --git a/lib/srv/discovery/kube_services_watcher.go b/lib/srv/discovery/kube_services_watcher.go index 88b978f283cff..eb6d68cc964f7 100644 --- a/lib/srv/discovery/kube_services_watcher.go +++ b/lib/srv/discovery/kube_services_watcher.go @@ -122,11 +122,14 @@ func (s *Server) onAppCreate(ctx context.Context, app types.Application) error { // In this case, we need to update the resource with the // discovery group label to ensure the user doesn't have to manually delete // the resource. - if trace.IsAlreadyExists(err) { - return trace.Wrap(s.onAppUpdate(ctx, app, nil)) - } if err != nil { - return trace.Wrap(err) + err := s.resolveCreateErr(err, types.OriginDiscoveryKubernetes, func() (types.ResourceWithLabels, error) { + return s.AccessPoint.GetApp(ctx, app.GetName()) + }) + if err != nil { + return trace.Wrap(err) + } + return trace.Wrap(s.onAppUpdate(ctx, app, nil)) } err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{ appEventPrefix + app.GetName(): { diff --git a/lib/srv/discovery/kube_watcher.go b/lib/srv/discovery/kube_watcher.go index 58c61475c8f32..e18cc23e68c99 100644 --- a/lib/srv/discovery/kube_watcher.go +++ b/lib/srv/discovery/kube_watcher.go @@ -121,15 +121,15 @@ func (s *Server) onKubeCreate(ctx context.Context, kubeCluster types.KubeCluster s.Log.Debugf("Creating kube_cluster %s.", kubeCluster.GetName()) err := s.AccessPoint.CreateKubernetesCluster(ctx, kubeCluster) // If the kube already exists but has an empty discovery group, update it. - if trace.IsAlreadyExists(err) && s.updatesEmptyDiscoveryGroup( - func() (types.ResourceWithLabels, error) { + if err != nil { + err := s.resolveCreateErr(err, types.OriginCloud, func() (types.ResourceWithLabels, error) { return s.AccessPoint.GetKubernetesCluster(ctx, kubeCluster.GetName()) - }) { + }) + if err != nil { + return trace.Wrap(err) + } return trace.Wrap(s.onKubeUpdate(ctx, kubeCluster, nil)) } - if err != nil { - return trace.Wrap(err) - } err = s.emitUsageEvents(map[string]*usageeventsv1.ResourceCreateEvent{ kubeEventPrefix + kubeCluster.GetName(): { ResourceType: types.DiscoveredResourceKubernetes,