diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go index 3d9cbae11..0402946d0 100644 --- a/pkg/cluster/streams.go +++ b/pkg/cluster/streams.go @@ -442,7 +442,9 @@ func (c *Cluster) syncStream(appId string) error { c.setProcessName("syncing stream with applicationId %s", appId) c.logger.Debugf("syncing stream with applicationId %s", appId) - listOptions := metav1.ListOptions{LabelSelector: c.labelsSet(true).String()} + listOptions := metav1.ListOptions{ + LabelSelector: c.labelsSet(false).String(), + } streams, err = c.KubeClient.FabricEventStreams(c.Namespace).List(context.TODO(), listOptions) if err != nil { return fmt.Errorf("could not list of FabricEventStreams for applicationId %s: %v", appId, err) @@ -476,7 +478,8 @@ func (c *Cluster) syncStream(appId string) error { } if match, reason := c.compareStreams(&stream, desiredStreams); !match { c.logger.Infof("updating event streams with applicationId %s: %s", appId, reason) - desiredStreams.ObjectMeta = stream.ObjectMeta + // make sure to keep the old name with randomly generated suffix + desiredStreams.ObjectMeta.Name = stream.ObjectMeta.Name updatedStream, err := c.updateStreams(desiredStreams) if err != nil { return fmt.Errorf("failed updating event streams %s with applicationId %s: %v", stream.Name, appId, err) @@ -510,6 +513,11 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason)) } + if !reflect.DeepEqual(curEventStreams.ObjectMeta.Labels, newEventStreams.ObjectMeta.Labels) { + match = false + reasons = append(reasons, "new streams labels do not match the current ones") + } + if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed { match = false reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason)) diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go index 92d28663e..ba7b6b816 100644 --- a/pkg/cluster/streams_test.go +++ b/pkg/cluster/streams_test.go @@ -90,7 +90,7 @@ var ( Namespace: namespace, Labels: map[string]string{ "application": "spilo", - "cluster-name": fmt.Sprintf("%s-2", clusterName), + "cluster-name": clusterName, "team": "acid", }, OwnerReferences: []metav1.OwnerReference{ @@ -449,7 +449,7 @@ func TestGenerateFabricEventStream(t *testing.T) { } listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(true).String(), + LabelSelector: cluster.labelsSet(false).String(), } streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) @@ -488,7 +488,8 @@ func newFabricEventStream(streams []zalandov1.EventStream, annotations map[strin } func TestSyncStreams(t *testing.T) { - pg.Name = fmt.Sprintf("%s-2", pg.Name) + newClusterName := fmt.Sprintf("%s-2", pg.Name) + pg.Name = newClusterName var cluster = New( Config{ OpConfig: config.Config{ @@ -515,6 +516,7 @@ func TestSyncStreams(t *testing.T) { assert.NoError(t, err) // create a second stream with same spec but with different name + fes.ObjectMeta.Labels["cluster-name"] = newClusterName createdStream, err := cluster.KubeClient.FabricEventStreams(namespace).Create( context.TODO(), fes, metav1.CreateOptions{}) assert.NoError(t, err) @@ -522,7 +524,7 @@ func TestSyncStreams(t *testing.T) { // check that two streams exist listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(true).String(), + LabelSelector: cluster.labelsSet(false).String(), } streams, err := cluster.KubeClient.FabricEventStreams(namespace).List(context.TODO(), listOptions) assert.NoError(t, err) @@ -695,7 +697,7 @@ func TestUpdateStreams(t *testing.T) { // compare stream returned from API with expected stream listOptions := metav1.ListOptions{ - LabelSelector: cluster.labelsSet(true).String(), + LabelSelector: cluster.labelsSet(false).String(), } streams := patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) result := cluster.generateFabricEventStream(appId) @@ -703,6 +705,14 @@ func TestUpdateStreams(t *testing.T) { t.Errorf("Malformed FabricEventStream after updating manifest, expected %#v, got %#v", streams.Items[0], result) } + // change teamId and check that stream is updated + pg.Spec.TeamID = "new-team" + streams = patchPostgresqlStreams(t, cluster, &pg.Spec, listOptions) + result = cluster.generateFabricEventStream(appId) + if match, _ := cluster.compareStreams(&streams.Items[0], result); !match { + t.Errorf("Malformed FabricEventStream after updating teamId, expected %#v, got %#v", streams.Items[0].ObjectMeta.Labels, result.ObjectMeta.Labels) + } + // disable recovery for idx, stream := range pg.Spec.Streams { if stream.ApplicationId == appId {