diff --git a/cmd/maestro/agent/cmd.go b/cmd/maestro/agent/cmd.go index 39f54be9..f204449a 100644 --- a/cmd/maestro/agent/cmd.go +++ b/cmd/maestro/agent/cmd.go @@ -30,7 +30,7 @@ func NewAgentCommand() *cobra.Command { agentOption.MaxJSONRawLength = maxJSONRawLength agentOption.CloudEventsClientCodecs = []string{"manifest", "manifestbundle"} cfg := spoke.NewWorkAgentConfig(commonOptions, agentOption) - cmdConfig := commonOptions.CommoOpts. + cmdConfig := commonOptions.CommonOpts. NewControllerCommandConfig("maestro-agent", version.Get(), cfg.RunWorkloadAgent) cmd := cmdConfig.NewCommandWithContext(context.TODO()) @@ -68,6 +68,6 @@ func NewAgentCommand() *cobra.Command { func addFlags(fs *pflag.FlagSet) { fs.StringVar(&commonOptions.SpokeClusterName, "consumer-name", commonOptions.SpokeClusterName, "Name of the consumer") - fs.BoolVar(&commonOptions.CommoOpts.CmdConfig.DisableLeaderElection, "disable-leader-election", + fs.BoolVar(&commonOptions.CommonOpts.CmdConfig.DisableLeaderElection, "disable-leader-election", true, "Disable leader election.") } diff --git a/examples/manifestworkclient/README.md b/examples/manifestworkclient/README.md index 911bcb8a..25b1ecf0 100644 --- a/examples/manifestworkclient/README.md +++ b/examples/manifestworkclient/README.md @@ -1,24 +1,21 @@ # gRPC Source ManifestWork Client -This example shows how to build a source ManifestWork client with `RESTFullAPIWatcherStore` and watch/create/get/update/delete works by the client. +This example shows how to build a source ManifestWork client with Maestro gRPC service and watch/create/get/update/delete works by this client. ## Build the client -Using sdk-go to build a source ManifestWork client with `RESTFullAPIWatcherStore` - ```golang sourceID := "mw-client-example" -workClient, err := work.NewClientHolderBuilder(grpcOptions). - WithClientID(fmt.Sprintf("%s-client", sourceID)). - WithSourceID(sourceID). - WithCodecs(codec.NewManifestBundleCodec()). - WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(ctx, maestroAPIClient, sourceID)). - WithResyncEnabled(false). - NewSourceClientHolder(ctx) +workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient( + ctx, + maestroAPIClient, + maestroGRPCOptions, + sourceID, +) if err != nil { - log.Fatal(err) + log.Fatal(err) } // watch/create/patch/get/delete/list by workClient diff --git a/examples/manifestworkclient/client-a/main.go b/examples/manifestworkclient/client-a/main.go index db1ba428..b168790b 100644 --- a/examples/manifestworkclient/client-a/main.go +++ b/examples/manifestworkclient/client-a/main.go @@ -21,8 +21,6 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" ) const sourceID = "mw-client-example" @@ -31,6 +29,7 @@ var ( maestroServerAddr = flag.String("maestro-server", "https://127.0.0.1:30080", "The Maestro server address") grpcServerAddr = flag.String("grpc-server", "127.0.0.1:30090", "The GRPC server address") consumerName = flag.String("consumer-name", "", "The Consumer Name") + printWorkDetails = flag.Bool("print-work-details", false, "Print work details") ) func main() { @@ -71,13 +70,12 @@ func main() { grpcOptions := grpc.NewGRPCOptions() grpcOptions.URL = *grpcServerAddr - workClient, err := work.NewClientHolderBuilder(grpcOptions). - WithClientID(fmt.Sprintf("%s-client-a", sourceID)). - WithSourceID(sourceID). - WithCodecs(codec.NewManifestBundleCodec()). - WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)). - WithResyncEnabled(false). - NewSourceClientHolder(ctx) + workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient( + ctx, + maestroAPIClient, + grpcOptions, + sourceID, + ) if err != nil { log.Fatal(err) } @@ -99,11 +97,9 @@ func main() { } switch event.Type { case watch.Modified: - fmt.Printf("watched work (uid=%s) is modified\n", event.Object.(*workv1.ManifestWork).UID) - PrintWork(event.Object.(*workv1.ManifestWork)) + Print(event, *printWorkDetails) case watch.Deleted: - fmt.Printf("watched work (uid=%s) is deleted\n", event.Object.(*workv1.ManifestWork).UID) - PrintWork(event.Object.(*workv1.ManifestWork)) + Print(event, *printWorkDetails) } } } @@ -112,10 +108,15 @@ func main() { <-ctx.Done() } -func PrintWork(work *workv1.ManifestWork) { - workJson, err := json.MarshalIndent(work, "", " ") - if err != nil { - log.Fatal(err) +func Print(event watch.Event, printDetails bool) { + work := event.Object.(*workv1.ManifestWork) + fmt.Printf("watched work (uid=%s) is %s\n", work.UID, event.Type) + + if printDetails { + workJson, err := json.MarshalIndent(work, "", " ") + if err != nil { + log.Fatal(err) + } + fmt.Printf("%s\n", string(workJson)) } - fmt.Printf("%s\n", string(workJson)) } diff --git a/examples/manifestworkclient/client-b/main.go b/examples/manifestworkclient/client-b/main.go index 64eb5db0..7c925394 100644 --- a/examples/manifestworkclient/client-b/main.go +++ b/examples/manifestworkclient/client-b/main.go @@ -23,8 +23,6 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" ) const sourceID = "mw-client-example" @@ -67,13 +65,12 @@ func main() { grpcOptions := grpc.NewGRPCOptions() grpcOptions.URL = *grpcServerAddr - workClient, err := work.NewClientHolderBuilder(grpcOptions). - WithClientID(fmt.Sprintf("%s-client-b", sourceID)). - WithSourceID(sourceID). - WithCodecs(codec.NewManifestBundleCodec()). - WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)). - WithResyncEnabled(false). - NewSourceClientHolder(ctx) + workClient, err := grpcsource.NewMaestroGRPCSourceWorkClient( + ctx, + maestroAPIClient, + grpcOptions, + sourceID, + ) if err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 1e55a5ea..ec12e08a 100755 --- a/go.mod +++ b/go.mod @@ -45,8 +45,8 @@ require ( k8s.io/component-base v0.29.3 k8s.io/klog/v2 v2.120.1 open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc - open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4 - open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6 + open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 + open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd ) require ( diff --git a/go.sum b/go.sum index a3b67dca..5681fe25 100755 --- a/go.sum +++ b/go.sum @@ -823,10 +823,10 @@ k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0g k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZRFphYtDXBE7ApBNlSnj1RNazhW+8F01XYYg= open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0= -open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4 h1:Z3gwbMUZmblGTHFx6iOO1zlCJc2FJcAJsa2RASTKDqA= -open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4/go.mod h1:RuYCuKuVJzNxRBkSoQnxyJxyUqOyCH388DlR/QDr7rE= -open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6 h1:/nPyxceSdi66Vs+A9LJ+7X6kLe4xK98dBMi4adZv1d0= -open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= +open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33 h1:7uPjyn1x25QZIzfZqeSFfZdNrzc2hlHm6t/JKYKu9fI= +open-cluster-management.io/ocm v0.13.1-0.20240618054845-e2a7b9e78b33/go.mod h1:KzUwhPZAg6Wq+4xRu10fVVpqNADyz5CtRW4ziqIC2z4= +open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd h1:kTVZOR7bTdh4ID7EoliyGhPR5CItpx8GehN581IxoPA= +open-cluster-management.io/sdk-go v0.13.1-0.20240618022514-b2c1dd175afd/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.17.3 h1:65QmN7r3FWgTxDMz9fvGnO1kbf2nu+acg9p2R9oYYYk= diff --git a/pkg/client/cloudevents/grpcsource/client.go b/pkg/client/cloudevents/grpcsource/client.go new file mode 100644 index 00000000..b8b57ca7 --- /dev/null +++ b/pkg/client/cloudevents/grpcsource/client.go @@ -0,0 +1,88 @@ +package grpcsource + +import ( + "context" + "fmt" + + "github.com/openshift-online/maestro/pkg/api/openapi" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + + workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" + sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client" + sourcecodec "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" +) + +func NewMaestroGRPCSourceWorkClient( + ctx context.Context, + apiClient *openapi.APIClient, + opts *grpc.GRPCOptions, + sourceID string, +) (workv1client.WorkV1Interface, error) { + if len(sourceID) == 0 { + return nil, fmt.Errorf("source id is required") + } + + options, err := generic.BuildCloudEventsSourceOptions(opts, fmt.Sprintf("%s-maestro", sourceID), sourceID) + if err != nil { + return nil, err + } + + watcherStore := newRESTFulAPIWatcherStore(ctx, apiClient, sourceID) + + cloudEventsClient, err := generic.NewCloudEventSourceClient[*workv1.ManifestWork]( + ctx, + options, + nil, // resync is disabled, so lister is not required + nil, // resync is disabled, so status hash is not required + sourcecodec.NewManifestBundleCodec(), + ) + if err != nil { + return nil, err + } + + cloudEventsClient.Subscribe(ctx, watcherStore.HandleReceivedWork) + + // start a go routine to receive client reconnect signal + go func() { + for { + select { + case <-ctx.Done(): + return + case <-cloudEventsClient.ReconnectedChan(): + // reconnect happened, sync the works for current watchers + if err := watcherStore.Sync(); err != nil { + klog.Errorf("failed to sync the works %v", err) + } + } + } + }() + + manifestWorkClient := sourceclient.NewManifestWorkSourceClient(sourceID, cloudEventsClient, watcherStore) + return &WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}, nil + +} + +// WorkV1ClientWrapper wraps a ManifestWork client to a WorkV1Interface +type WorkV1ClientWrapper struct { + ManifestWorkClient *sourceclient.ManifestWorkSourceClient +} + +var _ workv1client.WorkV1Interface = &WorkV1ClientWrapper{} + +func (c *WorkV1ClientWrapper) ManifestWorks(namespace string) workv1client.ManifestWorkInterface { + c.ManifestWorkClient.SetNamespace(namespace) + return c.ManifestWorkClient +} + +func (c *WorkV1ClientWrapper) AppliedManifestWorks() workv1client.AppliedManifestWorkInterface { + return nil +} + +func (c *WorkV1ClientWrapper) RESTClient() rest.Interface { + return nil +} diff --git a/pkg/client/cloudevents/grpcsource/watch.go b/pkg/client/cloudevents/grpcsource/watch.go new file mode 100644 index 00000000..b704cb22 --- /dev/null +++ b/pkg/client/cloudevents/grpcsource/watch.go @@ -0,0 +1,79 @@ +package grpcsource + +import ( + "sync" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" + workv1 "open-cluster-management.io/api/work/v1" +) + +// workWatcher implements the watch.Interface. +type workWatcher struct { + sync.RWMutex + + result chan watch.Event + done chan struct{} + stopped bool + + namespace string +} + +var _ watch.Interface = &workWatcher{} + +func newWorkWatcher(namespace string) *workWatcher { + return &workWatcher{ + result: make(chan watch.Event), + done: make(chan struct{}), + namespace: namespace, + } +} + +// ResultChan implements Interface. +func (w *workWatcher) ResultChan() <-chan watch.Event { + return w.result +} + +// Stop implements Interface. +func (w *workWatcher) Stop() { + // Call Close() exactly once by locking and setting a flag. + w.Lock() + defer w.Unlock() + // closing a closed channel always panics, therefore check before closing + select { + case <-w.done: + close(w.result) + default: + w.stopped = true + close(w.done) + } +} + +// Receive an event and sends down the result channel. +func (w *workWatcher) Receive(evt watch.Event) { + if w.isStopped() { + // this watcher is stopped, do nothing. + return + } + + work, ok := evt.Object.(*workv1.ManifestWork) + if !ok { + klog.Errorf("unknown event object type %T", evt.Object) + return + } + + if w.namespace != metav1.NamespaceAll && w.namespace != work.Namespace { + klog.V(4).Infof("ignore the work %s/%s for the watcher %s", work.Namespace, work.Name, w.namespace) + return + } + + w.result <- evt +} + +func (w *workWatcher) isStopped() bool { + w.RLock() + defer w.RUnlock() + + return w.stopped +} diff --git a/pkg/client/cloudevents/grpcsource/watcherstore.go b/pkg/client/cloudevents/grpcsource/watcherstore.go index ac81aa18..262bb75b 100644 --- a/pkg/client/cloudevents/grpcsource/watcherstore.go +++ b/pkg/client/cloudevents/grpcsource/watcherstore.go @@ -4,14 +4,19 @@ import ( "context" "fmt" "net/http" + "strings" "sync" + "time" "github.com/openshift-online/maestro/pkg/api/openapi" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" workv1 "open-cluster-management.io/api/work/v1" @@ -28,54 +33,74 @@ import ( type RESTFulAPIWatcherStore struct { sync.RWMutex - result chan watch.Event - done chan struct{} - watcherStopped bool - sourceID string apiClient *openapi.APIClient + + watchers map[string]*workWatcher + workQueue cache.Queue } var _ store.WorkClientWatcherStore = &RESTFulAPIWatcherStore{} -func NewRESTFullAPIWatcherStore(apiClient *openapi.APIClient, sourceID string) *RESTFulAPIWatcherStore { - return &RESTFulAPIWatcherStore{ - result: make(chan watch.Event), - done: make(chan struct{}), - watcherStopped: false, - +func newRESTFulAPIWatcherStore(ctx context.Context, apiClient *openapi.APIClient, sourceID string) *RESTFulAPIWatcherStore { + s := &RESTFulAPIWatcherStore{ sourceID: sourceID, apiClient: apiClient, + watchers: make(map[string]*workWatcher), + workQueue: cache.NewFIFO(func(obj interface{}) (string, error) { + work, ok := obj.(*workv1.ManifestWork) + if !ok { + return "", fmt.Errorf("unknown object type %T", obj) + } + + // ensure there is only one object in the queue for a work + return string(work.UID), nil + }), } -} -// ResultChan implements watch interface. -func (m *RESTFulAPIWatcherStore) ResultChan() <-chan watch.Event { - return m.result + // start a goroutine to send works to the watcher + go wait.Until(s.process, time.Second, ctx.Done()) + + return s } -// Stop implements watch interface. -func (m *RESTFulAPIWatcherStore) Stop() { - // Call Close() exactly once by locking and setting a flag. - m.Lock() - defer m.Unlock() - // closing a closed channel always panics, therefore check before closing - select { - case <-m.done: - close(m.result) - default: - m.watcherStopped = true - close(m.done) +// GetWatcher returns a watcher to the source work client with a specified namespace (consumer name). +// Using `metav1.NamespaceAll` to specify all namespaces. +func (m *RESTFulAPIWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { + // Only list works from maestro server with the given namespace when a watcher is required + search := []string{fmt.Sprintf("source = '%s'", m.sourceID)} + if namespace != metav1.NamespaceAll { + search = append(search, fmt.Sprintf("consumer_name = '%s'", namespace)) + } + + rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()). + Search(strings.Join(search, " and ")). + Page(1). + Size(-1). + Execute() + if err != nil { + return nil, err + } + + watcher := m.registerWatcher(namespace) + + // save the works to a queue + for _, rb := range rbs.Items { + work, err := ToManifestWork(&rb) + if err != nil { + return nil, err + } + + if err := m.workQueue.Add(work); err != nil { + return nil, err + } } + + return watcher, nil } // HandleReceivedWork sends the received works to the watch channel func (m *RESTFulAPIWatcherStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error { - if m.isWatcherStopped() { - // watcher is stopped, do nothing. - return nil - } - switch action { case types.StatusModified: watchType := watch.Modified @@ -83,13 +108,14 @@ func (m *RESTFulAPIWatcherStore) HandleReceivedWork(action types.ResourceAction, watchType = watch.Deleted } - m.result <- watch.Event{Type: watchType, Object: work} + m.sendWatchEvent(watch.Event{Type: watchType, Object: work}) return nil default: return fmt.Errorf("unknown resource action %s", action) } } +// Get a work from maestro server with its namespace and name func (m *RESTFulAPIWatcherStore) Get(namespace, name string) (*workv1.ManifestWork, error) { id := utils.UID(m.sourceID, namespace, name) rb, resp, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesIdGet(context.Background(), id).Execute() @@ -104,22 +130,30 @@ func (m *RESTFulAPIWatcherStore) Get(namespace, name string) (*workv1.ManifestWo return ToManifestWork(rb) } -func (m *RESTFulAPIWatcherStore) List(opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { +// List works from maestro server with a specified namespace and list options. +// Using `metav1.NamespaceAll` to specify all namespace +func (m *RESTFulAPIWatcherStore) List(namespace string, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { works := []*workv1.ManifestWork{} + // TODO consider how to support configuring page + var page int32 = 1 + var size int32 = -1 if opts.Limit > 0 { size = int32(opts.Limit) } - apiRequest := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()). - Search(fmt.Sprintf("source = '%s'", m.sourceID)). - Page(1). // TODO consider how to support this - Size(size) - // TODO filter works by labels + search := []string{fmt.Sprintf("source = '%s'", m.sourceID)} + if namespace != metav1.NamespaceAll { + search = append(search, fmt.Sprintf("consumer_name = '%s'", namespace)) + } - rbs, _, err := apiRequest.Execute() + rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()). + Search(strings.Join(search, " and ")). + Page(page). + Size(size). + Execute() if err != nil { return nil, err } @@ -137,7 +171,7 @@ func (m *RESTFulAPIWatcherStore) List(opts metav1.ListOptions) ([]*workv1.Manife } func (m *RESTFulAPIWatcherStore) ListAll() ([]*workv1.ManifestWork, error) { - return m.List(metav1.ListOptions{}) + return m.List(metav1.NamespaceAll, metav1.ListOptions{}) } func (m *RESTFulAPIWatcherStore) Add(work *workv1.ManifestWork) error { @@ -159,9 +193,106 @@ func (m *RESTFulAPIWatcherStore) HasInitiated() bool { return true } -func (m *RESTFulAPIWatcherStore) isWatcherStopped() bool { +func (m *RESTFulAPIWatcherStore) Sync() error { + m.RLock() + defer m.RUnlock() + + if len(m.watchers) == 0 { + // there are no watchers, do nothing + return nil + } + + hasAll := false + namespaces := []string{} + for namespace := range m.watchers { + if namespace == metav1.NamespaceAll { + hasAll = true + break + } + + namespaces = append(namespaces, fmt.Sprintf("consumer_name = '%s'", namespace)) + } + + search := []string{fmt.Sprintf("source = '%s'", m.sourceID)} + if !hasAll { + search = append(search, namespaces...) + } + + rbs, _, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()). + Search(strings.Join(search, " or ")). + Page(1). + Size(-1). + Execute() + if err != nil { + return err + } + + // save the works to a queue + for _, rb := range rbs.Items { + work, err := ToManifestWork(&rb) + if err != nil { + return err + } + + if err := m.workQueue.Add(work); err != nil { + return err + } + } + + return nil +} + +// process drains the work queue and send the work to the watch channel. +func (m *RESTFulAPIWatcherStore) process() { + for { + // this will be blocked until the work queue has works + obj, err := m.workQueue.Pop(func(interface{}, bool) error { + // do nothing + return nil + }) + if err != nil { + if err == cache.ErrFIFOClosed { + return + } + + klog.Warningf("failed to pop the %v requeue it, %v", obj, err) + // this is the safe way to re-enqueue. + if err := m.workQueue.AddIfNotPresent(obj); err != nil { + klog.Errorf("failed to requeue the obj %v, %v", obj, err) + return + } + } + + work, ok := obj.(*workv1.ManifestWork) + if !ok { + klog.Errorf("unknown the object type %T from the event queue", obj) + return + } + + m.sendWatchEvent(watch.Event{Type: watch.Modified, Object: work}) + } +} + +func (m *RESTFulAPIWatcherStore) registerWatcher(namespace string) watch.Interface { + m.Lock() + defer m.Unlock() + + watcher, ok := m.watchers[namespace] + if ok { + return watcher + } + + watcher = newWorkWatcher(namespace) + m.watchers[namespace] = watcher + return watcher +} + +func (m *RESTFulAPIWatcherStore) sendWatchEvent(evt watch.Event) { m.RLock() defer m.RUnlock() - return m.watcherStopped + for _, w := range m.watchers { + // this will be blocked until this work is consumed + w.Receive(evt) + } } diff --git a/test/e2e/pkg/sourceclient_test.go b/test/e2e/pkg/sourceclient_test.go index e065ddbd..5e45689a 100644 --- a/test/e2e/pkg/sourceclient_test.go +++ b/test/e2e/pkg/sourceclient_test.go @@ -1,14 +1,18 @@ package e2e_test import ( + "context" "encoding/json" "fmt" + "strings" "time" jsonpatch "github.com/evanphx/json-patch" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -23,58 +27,68 @@ import ( var _ = Describe("gRPC Source ManifestWork Client Test", func() { Context("Watch work status with gRPC source ManifestWork client", func() { - var watchedWorks []*workv1.ManifestWork + var watcherCtx context.Context + var watcherCancel context.CancelFunc + + var firstInitWorkName string + var secondInitWorkName string BeforeEach(func() { - watchedWorks = []*workv1.ManifestWork{} - - watcher, err := workClient.ManifestWorks(consumer_name).Watch(ctx, metav1.ListOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - - go func() { - ch := watcher.ResultChan() - for { - select { - case <-ctx.Done(): - return - case event, ok := <-ch: - if !ok { - return - } - switch event.Type { - case watch.Modified: - if work, ok := event.Object.(*workv1.ManifestWork); ok { - watchedWorks = append(watchedWorks, work) - } - case watch.Deleted: - if work, ok := event.Object.(*workv1.ManifestWork); ok { - watchedWorks = append(watchedWorks, work) - } - } - } + watcherCtx, watcherCancel = context.WithCancel(context.Background()) + + // prepare two works firstly + firstInitWorkName = "first-init-work-" + rand.String(5) + secondInitWorkName = "second-init-work-" + rand.String(5) + + _, err := workClient.ManifestWorks(consumer_name).Create(ctx, NewManifestWork(firstInitWorkName), metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, NewManifestWork(secondInitWorkName), metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + }) + + AfterEach(func() { + err := workClient.ManifestWorks(consumer_name).Delete(ctx, firstInitWorkName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + err = workClient.ManifestWorks(consumer_name).Delete(ctx, secondInitWorkName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() error { + if err := AssertWorkNotFound(firstInitWorkName); err != nil { + return err } - }() + + return AssertWorkNotFound(secondInitWorkName) + }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + + watcherCancel() }) It("The work status should be watched", func() { - - By("create a work") - workName := "work-" + rand.String(5) - _, err := workClient.ManifestWorks(consumer_name).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) + By("create a work client for watch") + watcherClient, err := grpcsource.NewMaestroGRPCSourceWorkClient( + watcherCtx, + apiClient, + grpcOptions, + sourceID, + ) Expect(err).ShouldNot(HaveOccurred()) - // wait for few seconds to ensure the creation is finished - <-time.After(5 * time.Second) + By("start watching") + watcher, err := watcherClient.ManifestWorks(consumer_name).Watch(watcherCtx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + result := StartWatch(watcherCtx, watcher) - By("list the works") - works, err := workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{}) + By("create a work by work client") + workName := "work-" + rand.String(5) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) - Expect(len(works.Items) == 1).To(BeTrue()) - // wait for few seconds to ensure the work status is updated by agent + // wait for few seconds to ensure the creation is finished <-time.After(5 * time.Second) - By("update a work") + By("update a work by work client") work, err := workClient.ManifestWorks(consumer_name).Get(ctx, workName, metav1.GetOptions{}) Expect(err).ShouldNot(HaveOccurred()) @@ -88,34 +102,159 @@ var _ = Describe("gRPC Source ManifestWork Client Test", func() { // wait for few seconds to ensure the work status is updated by agent <-time.After(5 * time.Second) - By("delete a work") + By("delete the work by work client") err = workClient.ManifestWorks(consumer_name).Delete(ctx, workName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { - if len(watchedWorks) < 2 { - return fmt.Errorf("unexpected watched works %v", watchedWorks) - } + return AssertWatchResult(result) + }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + }) - hasDeletedWork := false - for _, watchedWork := range watchedWorks { - if meta.IsStatusConditionTrue(watchedWork.Status.Conditions, common.ManifestsDeleted) { - hasDeletedWork = true - break - } - } + It("The watchers with different namespace", func() { + watcherClient, err := grpcsource.NewMaestroGRPCSourceWorkClient( + watcherCtx, + apiClient, + grpcOptions, + sourceID, + ) + Expect(err).ShouldNot(HaveOccurred()) - if !hasDeletedWork { - return fmt.Errorf("expected the deleted works is watched, but failed") - } + By("start watching works from all consumers") + allConsumerWatcher, err := watcherClient.ManifestWorks(metav1.NamespaceAll).Watch(watcherCtx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + allConsumerWatcherResult := StartWatch(watcherCtx, allConsumerWatcher) - return nil + By("start watching works from consumer" + consumer_name) + consumerWatcher, err := watcherClient.ManifestWorks(consumer_name).Watch(watcherCtx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + consumerWatcherResult := StartWatch(watcherCtx, consumerWatcher) + + By("start watching works from an other consumer") + otherConsumerWatcher, err := watcherClient.ManifestWorks("other").Watch(watcherCtx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + otherConsumerWatcherResult := StartWatch(watcherCtx, otherConsumerWatcher) + + By("create a work by work client") + workName := "work-" + rand.String(5) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + // wait for few seconds to ensure the creation is finished + <-time.After(5 * time.Second) + + By("delete the work by work client") + err = workClient.ManifestWorks(consumer_name).Delete(ctx, workName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() error { + return AssertWatchResult(allConsumerWatcherResult) }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + Eventually(func() error { + return AssertWatchResult(consumerWatcherResult) + }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + + Consistently(func() error { + if len(otherConsumerWatcherResult.WatchedWorks) != 0 { + return fmt.Errorf("unexpected watched works") + } + return nil + }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) }) }) }) +type WatchedResult struct { + WatchedWorks []*workv1.ManifestWork +} + +func StartWatch(ctx context.Context, watcher watch.Interface) *WatchedResult { + result := &WatchedResult{WatchedWorks: []*workv1.ManifestWork{}} + go func() { + ch := watcher.ResultChan() + for { + select { + case <-ctx.Done(): + return + case event, ok := <-ch: + if !ok { + return + } + + switch event.Type { + case watch.Modified: + if work, ok := event.Object.(*workv1.ManifestWork); ok { + result.WatchedWorks = append(result.WatchedWorks, work) + } + case watch.Deleted: + if work, ok := event.Object.(*workv1.ManifestWork); ok { + result.WatchedWorks = append(result.WatchedWorks, work) + } + } + } + } + }() + + return result +} + +func AssertWatchResult(result *WatchedResult) error { + hasFirstInitWork := false + hasSecondInitWork := false + hasWork := false + hasDeletedWork := false + + for _, watchedWork := range result.WatchedWorks { + if strings.HasPrefix(watchedWork.Name, "first-init-work-") { + hasFirstInitWork = true + } + + if strings.HasPrefix(watchedWork.Name, "second-init-work-") { + hasSecondInitWork = true + } + + if strings.HasPrefix(watchedWork.Name, "work-") { + hasWork = true + } + + if meta.IsStatusConditionTrue(watchedWork.Status.Conditions, common.ManifestsDeleted) { + hasDeletedWork = true + } + } + + if !hasFirstInitWork { + return fmt.Errorf("expected the first init works is watched, but failed") + } + + if !hasSecondInitWork { + return fmt.Errorf("expected the second init works is watched, but failed") + } + + if !hasWork { + return fmt.Errorf("expected the works is watched, but failed") + } + + if !hasDeletedWork { + return fmt.Errorf("expected the deleted works is watched, but failed") + } + + return nil +} + +func AssertWorkNotFound(name string) error { + _, err := workClient.ManifestWorks(consumer_name).Get(ctx, name, metav1.GetOptions{}) + if errors.IsNotFound(err) { + return nil + } + + if err != nil { + return err + } + + return fmt.Errorf("the work %s still exists", name) +} + func NewManifestWork(name string) *workv1.ManifestWork { return &workv1.ManifestWork{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/e2e/pkg/suite_test.go b/test/e2e/pkg/suite_test.go index cf2e6b48..4f1342f7 100644 --- a/test/e2e/pkg/suite_test.go +++ b/test/e2e/pkg/suite_test.go @@ -16,9 +16,8 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -40,7 +39,7 @@ var ( grpcClient pbv1.CloudEventServiceClient helper *test.Helper T *testing.T - workClient *work.ClientHolder + workClient workv1client.WorkV1Interface grpcOptions *grpcoptions.GRPCOptions cancel context.CancelFunc ctx context.Context @@ -119,13 +118,12 @@ var _ = BeforeSuite(func() { grpcOptions = grpcoptions.NewGRPCOptions() grpcOptions.URL = grpcServerAddress - workClient, err = work.NewClientHolderBuilder(grpcOptions). - WithClientID(fmt.Sprintf("%s-watcher", sourceID)). - WithSourceID(sourceID). - WithCodecs(codec.NewManifestBundleCodec()). - WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(apiClient, sourceID)). - WithResyncEnabled(false). - NewSourceClientHolder(ctx) + workClient, err = grpcsource.NewMaestroGRPCSourceWorkClient( + ctx, + apiClient, + grpcOptions, + sourceID, + ) Expect(err).ShouldNot(HaveOccurred()) }) diff --git a/test/helper.go b/test/helper.go index 11513045..6d6bb5e7 100755 --- a/test/helper.go +++ b/test/helper.go @@ -15,14 +15,18 @@ import ( "github.com/openshift-online/maestro/pkg/controllers" "github.com/openshift-online/maestro/pkg/event" "github.com/openshift-online/maestro/pkg/logger" + + workinformers "open-cluster-management.io/api/client/work/informers/externalversions" workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" "github.com/bxcodec/faker/v3" "github.com/golang-jwt/jwt/v4" @@ -262,15 +266,26 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu workCodec = codec.NewManifestCodec(nil) } - clientHolder, informer, err := work.NewClientHolderBuilder(mqttOptions). + watcherStore := store.NewAgentInformerWatcherStore() + + clientHolder, err := work.NewClientHolderBuilder(mqttOptions). WithClientID(clusterName). WithClusterName(clusterName). WithCodecs(workCodec). - NewAgentClientHolderWithInformer(ctx) + WithWorkClientWatcherStore(watcherStore). + NewAgentClientHolder(ctx) if err != nil { glog.Fatalf("Unable to create work agent holder: %s", err) } + factory := workinformers.NewSharedInformerFactoryWithOptions( + clientHolder.WorkInterface(), + 5*time.Minute, + workinformers.WithNamespace(clusterName), + ) + informer := factory.Work().V1().ManifestWorks() + watcherStore.SetStore(informer.Informer().GetStore()) + go informer.Informer().Run(ctx.Done()) helper.WorkAgentHolder = clientHolder diff --git a/test/integration/pulse_server_test.go b/test/integration/pulse_server_test.go index a14ae4c9..7553c03a 100644 --- a/test/integration/pulse_server_test.go +++ b/test/integration/pulse_server_test.go @@ -77,13 +77,6 @@ func TestPulseServer(t *testing.T) { return err } - if len(list) == 0 { - // no work synced yet, resync it now - if _, err := agentWorkClient.List(ctx, metav1.ListOptions{}); err != nil { - return err - } - } - // ensure there is only one work was synced on the cluster if len(list) != 1 { return fmt.Errorf("unexpected work list %v", list) @@ -94,6 +87,7 @@ func TestPulseServer(t *testing.T) { if err != nil { return err } + return nil }, 3*time.Second, 1*time.Second).Should(Succeed()) diff --git a/test/integration/resource_test.go b/test/integration/resource_test.go index b6900da1..e11436bb 100755 --- a/test/integration/resource_test.go +++ b/test/integration/resource_test.go @@ -102,13 +102,6 @@ func TestResourcePost(t *testing.T) { return err } - if len(list) == 0 { - // no work synced yet, resync it now - if _, err := agentWorkClient.List(ctx, metav1.ListOptions{}); err != nil { - return err - } - } - // ensure there is only one work was synced on the cluster if len(list) != 1 { return fmt.Errorf("unexpected work list %v", list) @@ -332,13 +325,6 @@ func TestResourcePatch(t *testing.T) { return err } - if len(list) == 0 { - // no work synced yet, resync it now - if _, err := agentWorkClient.List(ctx, metav1.ListOptions{}); err != nil { - return err - } - } - // ensure there is only one work was synced on the cluster if len(list) != 1 { return fmt.Errorf("unexpected work list %v", list)