Skip to content

Commit

Permalink
support listing works with its labels
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jun 21, 2024
1 parent d112d69 commit d80a6e1
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 4 deletions.
44 changes: 41 additions & 3 deletions pkg/api/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
workv1 "open-cluster-management.io/api/work/v1"
cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
)

type ResourceType string
Expand Down Expand Up @@ -93,9 +94,35 @@ type ResourcePatchRequest struct{}

// JSONMAPToCloudEvent converts a JSONMap (resource manifest or status) to a CloudEvent
func JSONMAPToCloudEvent(res datatypes.JSONMap) (*cloudevents.Event, error) {
resJSON, err := res.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
var err error
var resJSON []byte

if metadata, ok := res[codec.ExtensionWorkMeta]; ok {
// cloudevents require its extension value as string, so we need convert the metadata object
// to string back

// ensure the original resource will be not changed
resCopy := datatypes.JSONMap{}
for key, value := range res {
resCopy[key] = value
}

metaJson, err := json.Marshal(metadata)
if err != nil {
return nil, err
}

resCopy[codec.ExtensionWorkMeta] = string(metaJson)

resJSON, err = resCopy.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
}
} else {
resJSON, err = res.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSONMAP to cloudevent JSON: %v", err)
}
}

evt := &cloudevents.Event{}
Expand All @@ -118,6 +145,17 @@ func CloudEventToJSONMap(evt *cloudevents.Event) (datatypes.JSONMap, error) {
return nil, fmt.Errorf("failed to unmarshal cloudevent JSON to JSONMAP: %v", err)
}

if metadata, ok := res[codec.ExtensionWorkMeta]; ok {
// cloudevents treat its extension value as string, so we need convert metadata extension
// to an object for supporting to query the resources with metadata
objectMeta := map[string]any{}

if err := json.Unmarshal([]byte(fmt.Sprintf("%s", metadata)), &objectMeta); err != nil {
return nil, err
}
res[codec.ExtensionWorkMeta] = objectMeta
}

return res, nil
}

Expand Down
37 changes: 37 additions & 0 deletions pkg/client/cloudevents/grpcsource/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package grpcsource

import (
"encoding/json"
"fmt"
"strings"

"github.com/openshift-online/maestro/pkg/api/openapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
)
Expand Down Expand Up @@ -86,6 +90,39 @@ func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) {
return work, nil
}

func ToLabelSearch(opts metav1.ListOptions) (string, bool, error) {
if len(opts.LabelSelector) == 0 {
return "", false, nil
}

labelSelector, err := labels.Parse(opts.LabelSelector)
if err != nil {
return "", false, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err)
}

requirements, selectable := labelSelector.Requirements()
if !selectable {
return "", false, nil
}

labels := []string{}
for _, requirement := range requirements {
if requirement.Operator() != selection.Equals {
// TODO support more operators
return "", false, fmt.Errorf("unsupported operator %s", requirement.Operator())
}

values := requirement.Values()
if len(values) != 1 {
return "", false, fmt.Errorf("")
}

labels = append(labels, fmt.Sprintf(`"%s":"%s"`, requirement.Key(), values.List()[0]))
}

return fmt.Sprintf(`payload -> 'metadata' @> '{"labels":{%s}}'`, strings.Join(labels, ",")), true, nil
}

func marshal(obj map[string]any) ([]byte, error) {
unstructuredObj := unstructured.Unstructured{Object: obj}
data, err := unstructuredObj.MarshalJSON()
Expand Down
65 changes: 65 additions & 0 deletions pkg/client/cloudevents/grpcsource/util_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package grpcsource

import (
"fmt"
"testing"

"github.com/openshift-online/maestro/pkg/api/openapi"
"k8s.io/apimachinery/pkg/api/equality"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"

workv1 "open-cluster-management.io/api/work/v1"
Expand Down Expand Up @@ -130,3 +132,66 @@ func TestToManifestWork(t *testing.T) {
})
}
}

func TestToLabelSearch(t *testing.T) {
cases := []struct {
name string
opts v1.ListOptions
expectedSelectable bool
expectedLabelSearch string
expectedErr error
}{
{
name: "no label selector",
opts: v1.ListOptions{},
expectedSelectable: false,
expectedLabelSearch: "",
expectedErr: nil,
},
{
name: "selector everything",
opts: v1.ListOptions{LabelSelector: labels.Everything().String()},
expectedSelectable: false,
expectedLabelSearch: "",
expectedErr: nil,
},
{
name: "unsupported selector operator",
opts: v1.ListOptions{LabelSelector: "a"},
expectedSelectable: false,
expectedLabelSearch: "",
expectedErr: fmt.Errorf("unsupported operator exists"),
},
{
name: "one selector",
opts: v1.ListOptions{LabelSelector: "a=b"},
expectedSelectable: true,
expectedLabelSearch: "payload -> 'data' -> 'manifests' @> '[{\"metadata\":{\"labels\":{\"a\":\"b\"}}}]'",
expectedErr: nil,
},
{
name: "multiple selector",
opts: v1.ListOptions{LabelSelector: "a=b,c=d"},
expectedSelectable: true,
expectedLabelSearch: "payload -> 'data' -> 'manifests' @> '[{\"metadata\":{\"labels\":{\"a\":\"b\",\"c\":\"d\"}}}]'",
expectedErr: nil,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
labelSearch, selectable, err := ToLabelSearch(c.opts)
if c.expectedSelectable != selectable {
t.Errorf("expected %v, but got %v", c.expectedSelectable, selectable)
}

if c.expectedLabelSearch != labelSearch {
t.Errorf("expected %s, but got %s", c.expectedLabelSearch, labelSearch)
}

if err != nil && err.Error() != c.expectedErr.Error() {
t.Errorf("expected %v, but got %v", c.expectedErr, err)
}
})
}
}
12 changes: 11 additions & 1 deletion pkg/client/cloudevents/grpcsource/watcherstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,22 @@ func (m *RESTFulAPIWatcherStore) List(namespace string, opts metav1.ListOptions)
size = int32(opts.Limit)
}

// TODO filter works by labels
labelSearch, selectable, err := ToLabelSearch(opts)
if err != nil {
return nil, err
}

search := []string{fmt.Sprintf("source = '%s'", m.sourceID)}
if namespace != metav1.NamespaceAll {
search = append(search, fmt.Sprintf("consumer_name = '%s'", namespace))
}

if selectable {
search = append(search, labelSearch)
}

klog.V(4).Infof("search=%s", strings.Join(search, " and "))

rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()).
Search(strings.Join(search, " and ")).
Page(page).
Expand Down
115 changes: 115 additions & 0 deletions test/e2e/pkg/sourceclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
. "github.com/onsi/gomega"
"github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"

workv1 "open-cluster-management.io/api/work/v1"
Expand Down Expand Up @@ -163,6 +165,91 @@ var _ = Describe("gRPC Source ManifestWork Client Test", func() {
}, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred())
})
})

Context("List works with gRPC source ManifestWork client", func() {
var noLabelsWorkName string
var oneLabelWorkName string
var twoLabelWorkName string

BeforeEach(func() {
// prepare works firstly
noLabelsWorkName = "work-list-test" + rand.String(5)
work := NewManifestWork(noLabelsWorkName)
_, err := workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())

oneLabelWorkName = "work-list-test" + rand.String(5)
work = NewManifestWorkWithLabels(oneLabelWorkName, map[string]string{"test": "true"})
_, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())

twoLabelWorkName = "work-list-test" + rand.String(5)
work = NewManifestWorkWithLabels(twoLabelWorkName, map[string]string{"test": "true", "work.namespace": consumer_name})
_, err = workClient.ManifestWorks(consumer_name).Create(ctx, work, metav1.CreateOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

AfterEach(func() {
err := workClient.ManifestWorks(consumer_name).Delete(ctx, noLabelsWorkName, metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())

err = workClient.ManifestWorks(consumer_name).Delete(ctx, oneLabelWorkName, metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())

err = workClient.ManifestWorks(consumer_name).Delete(ctx, twoLabelWorkName, metav1.DeleteOptions{})
Expect(err).ShouldNot(HaveOccurred())

Eventually(func() error {
if err := AssertWorkNotFound(noLabelsWorkName); err != nil {
return err
}

if err := AssertWorkNotFound(oneLabelWorkName); err != nil {
return err
}

return AssertWorkNotFound(twoLabelWorkName)
}, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred())
})

It("List works with options", func() {
By("list all works")
works, err := workClient.ManifestWorks(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
Expect(err).ShouldNot(HaveOccurred())
Expect(AssertWorks(works.Items, noLabelsWorkName, oneLabelWorkName, twoLabelWorkName)).ShouldNot(HaveOccurred())

By("list works by consumer name")
works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{})
Expect(err).ShouldNot(HaveOccurred())
Expect(AssertWorks(works.Items, noLabelsWorkName, oneLabelWorkName, twoLabelWorkName)).ShouldNot(HaveOccurred())

By("list works by nonexistent consumer")
works, err = workClient.ManifestWorks("nonexistent").List(ctx, metav1.ListOptions{})
Expect(err).ShouldNot(HaveOccurred())
Expect(AssertWorks(works.Items)).ShouldNot(HaveOccurred())

By("list works with nonexistent labels")
works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{
LabelSelector: "test=false",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(AssertWorks(works.Items)).ShouldNot(HaveOccurred())

By("list works with one label")
works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{
LabelSelector: "test=true",
})
Expect(err).ShouldNot(HaveOccurred())
Expect(AssertWorks(works.Items, oneLabelWorkName, twoLabelWorkName)).ShouldNot(HaveOccurred())

By("list works with two labels")
works, err = workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{
LabelSelector: "test=true,work.namespace=" + consumer_name,
})
Expect(err).ShouldNot(HaveOccurred())
Expect(AssertWorks(works.Items, twoLabelWorkName)).ShouldNot(HaveOccurred())
})
})
})

type WatchedResult struct {
Expand Down Expand Up @@ -255,6 +342,25 @@ func AssertWorkNotFound(name string) error {
return fmt.Errorf("the work %s still exists", name)
}

func AssertWorks(works []workv1.ManifestWork, expected ...string) error {
workNames := sets.Set[string]{}
expectedNames := sets.Set[string]{}.Insert(expected...)

for _, work := range works {
workNames.Insert(work.Name)
}

if len(expectedNames) != len(workNames) {
return fmt.Errorf("expected %v, but got %v", expectedNames, workNames)
}

if !equality.Semantic.DeepEqual(expectedNames, workNames) {
return fmt.Errorf("expected %v, but got %v", expectedNames, workNames)
}

return nil
}

func NewManifestWork(name string) *workv1.ManifestWork {
return &workv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -270,6 +376,12 @@ func NewManifestWork(name string) *workv1.ManifestWork {
}
}

func NewManifestWorkWithLabels(name string, labels map[string]string) *workv1.ManifestWork {
work := NewManifestWork(name)
work.Labels = labels
return work
}

func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) {
oldData, err := json.Marshal(old)
if err != nil {
Expand Down Expand Up @@ -297,6 +409,9 @@ func NewManifest(name string) workv1.Manifest {
"metadata": map[string]interface{}{
"namespace": "default",
"name": name,
"labels": map[string]string{
"test": "true",
},
},
"data": map[string]string{
"test": rand.String(5),
Expand Down

0 comments on commit d80a6e1

Please sign in to comment.