diff --git a/pkg/api/resource_types.go b/pkg/api/resource_types.go index 165cba0f..4b1d8e83 100755 --- a/pkg/api/resource_types.go +++ b/pkg/api/resource_types.go @@ -16,6 +16,7 @@ import ( workv1 "open-cluster-management.io/api/work/v1" cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" ) type ResourceType string @@ -93,9 +94,35 @@ type ResourcePatchRequest struct{} // JSONMAPToCloudEvent converts a JSONMap (resource manifest or status) to a CloudEvent func JSONMAPToCloudEvent(res datatypes.JSONMap) (*cloudevents.Event, error) { - resJSON, err := res.MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err) + var err error + var resJSON []byte + + if metadata, ok := res[codec.ExtensionWorkMeta]; ok { + // cloudevents require its extension value as string, so we need convert the metadata object + // to string back + + // ensure the original resource will be not changed + resCopy := datatypes.JSONMap{} + for key, value := range res { + resCopy[key] = value + } + + metaJson, err := json.Marshal(metadata) + if err != nil { + return nil, err + } + + resCopy[codec.ExtensionWorkMeta] = string(metaJson) + + resJSON, err = resCopy.MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err) + } + } else { + resJSON, err = res.MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err) + } } evt := &cloudevents.Event{} @@ -118,6 +145,17 @@ func CloudEventToJSONMap(evt *cloudevents.Event) (datatypes.JSONMap, error) { return nil, fmt.Errorf("failed to unmarshal cloudevent JSON to JSONMAP: %v", err) } + if metadata, ok := res[codec.ExtensionWorkMeta]; ok { + // cloudevents treat its extension value as string, so we need convert metadata extension + // to an object for supporting to query the resources with metadata + objectMeta := map[string]any{} + + if err := json.Unmarshal([]byte(fmt.Sprintf("%s", metadata)), &objectMeta); err != nil { + return nil, err + } + res[codec.ExtensionWorkMeta] = objectMeta + } + return res, nil } diff --git a/pkg/client/cloudevents/grpcsource/util.go b/pkg/client/cloudevents/grpcsource/util.go index 966e701c..c10ac2d7 100644 --- a/pkg/client/cloudevents/grpcsource/util.go +++ b/pkg/client/cloudevents/grpcsource/util.go @@ -2,11 +2,15 @@ package grpcsource import ( "encoding/json" + "fmt" + "strings" "github.com/openshift-online/maestro/pkg/api/openapi" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" ) @@ -86,6 +90,39 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) { return work, nil } +func ToLabelSearch(opts metav1.ListOptions) (string, bool, error) { + if len(opts.LabelSelector) == 0 { + return "", false, nil + } + + labelSelector, err := labels.Parse(opts.LabelSelector) + if err != nil { + return "", false, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err) + } + + requirements, selectable := labelSelector.Requirements() + if !selectable { + return "", false, nil + } + + labels := []string{} + for _, requirement := range requirements { + if requirement.Operator() != selection.Equals { + // TODO support more operators + return "", false, fmt.Errorf("unsupported operator %s", requirement.Operator()) + } + + values := requirement.Values() + if len(values) != 1 { + return "", false, fmt.Errorf("") + } + + labels = append(labels, fmt.Sprintf(`"%s":"%s"`, requirement.Key(), values.List()[0])) + } + + return fmt.Sprintf(`payload -> 'metadata' @> '{"labels":{%s}}'`, strings.Join(labels, ",")), true, nil +} + func marshal(obj map[string]any) ([]byte, error) { unstructuredObj := unstructured.Unstructured{Object: obj} data, err := unstructuredObj.MarshalJSON() diff --git a/pkg/client/cloudevents/grpcsource/util_test.go b/pkg/client/cloudevents/grpcsource/util_test.go index a4fd4f7b..edcf3291 100644 --- a/pkg/client/cloudevents/grpcsource/util_test.go +++ b/pkg/client/cloudevents/grpcsource/util_test.go @@ -1,11 +1,13 @@ package grpcsource import ( + "fmt" "testing" "github.com/openshift-online/maestro/pkg/api/openapi" "k8s.io/apimachinery/pkg/api/equality" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" workv1 "open-cluster-management.io/api/work/v1" @@ -130,3 +132,66 @@ func TestToManifestWork(t *testing.T) { }) } } + +func TestToLabelSearch(t *testing.T) { + cases := []struct { + name string + opts v1.ListOptions + expectedSelectable bool + expectedLabelSearch string + expectedErr error + }{ + { + name: "no label selector", + opts: v1.ListOptions{}, + expectedSelectable: false, + expectedLabelSearch: "", + expectedErr: nil, + }, + { + name: "selector everything", + opts: v1.ListOptions{LabelSelector: labels.Everything().String()}, + expectedSelectable: false, + expectedLabelSearch: "", + expectedErr: nil, + }, + { + name: "unsupported selector operator", + opts: v1.ListOptions{LabelSelector: "a"}, + expectedSelectable: false, + expectedLabelSearch: "", + expectedErr: fmt.Errorf("unsupported operator exists"), + }, + { + name: "one selector", + opts: v1.ListOptions{LabelSelector: "a=b"}, + expectedSelectable: true, + expectedLabelSearch: "payload -> 'data' -> 'manifests' @> '[{\"metadata\":{\"labels\":{\"a\":\"b\"}}}]'", + expectedErr: nil, + }, + { + name: "multiple selector", + opts: v1.ListOptions{LabelSelector: "a=b,c=d"}, + expectedSelectable: true, + expectedLabelSearch: "payload -> 'data' -> 'manifests' @> '[{\"metadata\":{\"labels\":{\"a\":\"b\",\"c\":\"d\"}}}]'", + expectedErr: nil, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + labelSearch, selectable, err := ToLabelSearch(c.opts) + if c.expectedSelectable != selectable { + t.Errorf("expected %v, but got %v", c.expectedSelectable, selectable) + } + + if c.expectedLabelSearch != labelSearch { + t.Errorf("expected %s, but got %s", c.expectedLabelSearch, labelSearch) + } + + if err != nil && err.Error() != c.expectedErr.Error() { + t.Errorf("expected %v, but got %v", c.expectedErr, err) + } + }) + } +} diff --git a/pkg/client/cloudevents/grpcsource/watcherstore.go b/pkg/client/cloudevents/grpcsource/watcherstore.go index 262bb75b..2b1f949b 100644 --- a/pkg/client/cloudevents/grpcsource/watcherstore.go +++ b/pkg/client/cloudevents/grpcsource/watcherstore.go @@ -143,12 +143,22 @@ func (m *RESTFulAPIWatcherStore) List(namespace string, opts metav1.ListOptions) size = int32(opts.Limit) } - // TODO filter works by labels + labelSearch, selectable, err := ToLabelSearch(opts) + if err != nil { + return nil, err + } + search := []string{fmt.Sprintf("source = '%s'", m.sourceID)} if namespace != metav1.NamespaceAll { search = append(search, fmt.Sprintf("consumer_name = '%s'", namespace)) } + if selectable { + search = append(search, labelSearch) + } + + klog.V(4).Infof("search=%s", strings.Join(search, " and ")) + rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()). Search(strings.Join(search, " and ")). Page(page). diff --git a/test/e2e/pkg/sourceclient_test.go b/test/e2e/pkg/sourceclient_test.go index 5e45689a..fc77ef2c 100644 --- a/test/e2e/pkg/sourceclient_test.go +++ b/test/e2e/pkg/sourceclient_test.go @@ -12,12 +12,14 @@ import ( . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" workv1 "open-cluster-management.io/api/work/v1" @@ -163,6 +165,91 @@ var _ = Describe("gRPC Source ManifestWork Client Test", func() { }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) }) }) + + Context("List works with gRPC source ManifestWork client", func() { + var noLabelsWorkName string + var oneLabelWorkName string + var twoLabelWorkName string + + BeforeEach(func() { + // prepare works firstly + noLabelsWorkName = "work-list-test" + rand.String(5) + work := NewManifestWork(noLabelsWorkName) + _, err := workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + oneLabelWorkName = "work-list-test" + rand.String(5) + work = NewManifestWorkWithLabels(oneLabelWorkName, map[string]string{"test": "true"}) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + twoLabelWorkName = "work-list-test" + rand.String(5) + work = NewManifestWorkWithLabels(twoLabelWorkName, map[string]string{"test": "true", "work.namespace": consumer_name}) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + AfterEach(func() { + err := workClient.ManifestWorks(consumer_name).Delete(ctx, noLabelsWorkName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = workClient.ManifestWorks(consumer_name).Delete(ctx, oneLabelWorkName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = workClient.ManifestWorks(consumer_name).Delete(ctx, twoLabelWorkName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() error { + if err := AssertWorkNotFound(noLabelsWorkName); err != nil { + return err + } + + if err := AssertWorkNotFound(oneLabelWorkName); err != nil { + return err + } + + return AssertWorkNotFound(twoLabelWorkName) + }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + }) + + It("List works with options", func() { + By("list all works") + works, err := workClient.ManifestWorks(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, noLabelsWorkName, oneLabelWorkName, twoLabelWorkName)).ShouldNot(HaveOccurred()) + + By("list works by consumer name") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, noLabelsWorkName, oneLabelWorkName, twoLabelWorkName)).ShouldNot(HaveOccurred()) + + By("list works by nonexistent consumer") + works, err = workClient.ManifestWorks("nonexistent").List(ctx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items)).ShouldNot(HaveOccurred()) + + By("list works with nonexistent labels") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "test=false", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items)).ShouldNot(HaveOccurred()) + + By("list works with one label") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "test=true", + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, oneLabelWorkName, twoLabelWorkName)).ShouldNot(HaveOccurred()) + + By("list works with two labels") + works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{ + LabelSelector: "test=true,work.namespace=" + consumer_name, + }) + Expect(err).ShouldNot(HaveOccurred()) + Expect(AssertWorks(works.Items, twoLabelWorkName)).ShouldNot(HaveOccurred()) + }) + }) }) type WatchedResult struct { @@ -255,6 +342,25 @@ func AssertWorkNotFound(name string) error { return fmt.Errorf("the work %s still exists", name) } +func AssertWorks(works []workv1.ManifestWork, expected ...string) error { + workNames := sets.Set[string]{} + expectedNames := sets.Set[string]{}.Insert(expected...) + + for _, work := range works { + workNames.Insert(work.Name) + } + + if len(expectedNames) != len(workNames) { + return fmt.Errorf("expected %v, but got %v", expectedNames, workNames) + } + + if !equality.Semantic.DeepEqual(expectedNames, workNames) { + return fmt.Errorf("expected %v, but got %v", expectedNames, workNames) + } + + return nil +} + func NewManifestWork(name string) *workv1.ManifestWork { return &workv1.ManifestWork{ ObjectMeta: metav1.ObjectMeta{ @@ -270,6 +376,12 @@ func NewManifestWork(name string) *workv1.ManifestWork { } } +func NewManifestWorkWithLabels(name string, labels map[string]string) *workv1.ManifestWork { + work := NewManifestWork(name) + work.Labels = labels + return work +} + func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) { oldData, err := json.Marshal(old) if err != nil { @@ -297,6 +409,9 @@ func NewManifest(name string) workv1.Manifest { "metadata": map[string]interface{}{ "namespace": "default", "name": name, + "labels": map[string]string{ + "test": "true", + }, }, "data": map[string]string{ "test": rand.String(5),