diff --git a/Makefile b/Makefile index 424361e6..d0c0f147 100755 --- a/Makefile +++ b/Makefile @@ -77,6 +77,8 @@ ENABLE_JWT ?= true ENABLE_AUTHZ ?= true ENABLE_OCM_MOCK ?= false +ENABLE_GRPC ?= false + # Enable set images POSTGRES_IMAGE ?= docker.io/library/postgres:14.2 MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18 @@ -298,6 +300,7 @@ cmds: --param="EXTERNAL_APPS_DOMAIN=${external_apps_domain}" \ --param="CONSUMER_NAME=$(consumer_name)" \ --param="ENABLE_OCM_MOCK=$(ENABLE_OCM_MOCK)" \ + --param="ENABLE_GRPC=$(ENABLE_GRPC)" \ > "templates/$*-template.json" @@ -409,5 +412,7 @@ e2e-test/teardown: e2e-test: e2e-test/teardown e2e-test/setup ginkgo --output-dir="${PWD}/test/e2e/report" --json-report=report.json --junit-report=report.xml \ ${PWD}/test/e2e/pkg -- -consumer_name=$(shell cat ${PWD}/test/e2e/.consumer_name) \ - -api-server=https://$(shell cat ${PWD}/test/e2e/.external_host_ip):30080 -kubeconfig=${PWD}/test/e2e/.kubeconfig + -api-server=https://$(shell cat ${PWD}/test/e2e/.external_host_ip):30080 \ + -grpc-server=$(shell cat ${PWD}/test/e2e/.external_host_ip):30090 \ + -kubeconfig=${PWD}/test/e2e/.kubeconfig .PHONY: e2e-test diff --git a/cmd/maestro/server/grpc_server.go b/cmd/maestro/server/grpc_server.go index 6c09f0d5..d0870add 100644 --- a/cmd/maestro/server/grpc_server.go +++ b/cmd/maestro/server/grpc_server.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "net" + "time" ce "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" cetypes "github.com/cloudevents/sdk-go/v2/types" "github.com/golang/glog" + "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" @@ -20,6 +22,7 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/client/cloudevents" @@ -182,9 +185,11 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv select { case err := <-errChan: + glog.Errorf("unregister client %s, error= %v", clientID, err) svr.eventBroadcaster.Unregister(clientID) return err case <-subServer.Context().Done(): + glog.V(10).Infof("unregister client %s", clientID) svr.eventBroadcaster.Unregister(clientID) return nil } @@ -246,7 +251,54 @@ func decode(eventDataType types.CloudEventsDataType, evt *ce.Event) (*api.Resour // encode translates a resource to a cloudevent func encode(resource *api.Resource) (*ce.Event, error) { - return api.JSONMAPToCloudEvent(resource.Status) + if resource.Type == api.ResourceTypeSingle { + // single resource, return the status directly + return api.JSONMAPToCloudEvent(resource.Status) + } + + specEvt, err := api.JSONMAPToCloudEvent(resource.Payload) + if err != nil { + return nil, err + } + + statusEvt, err := api.JSONMAPToCloudEvent(resource.Status) + if err != nil { + return nil, err + } + + // set basic fields + evt := ce.NewEvent() + evt.SetID(uuid.New().String()) + evt.SetTime(time.Now()) + evt.SetType(statusEvt.Type()) + evt.SetSource(statusEvt.Source()) + for key, val := range statusEvt.Extensions() { + evt.SetExtension(key, val) + } + + // set work meta back + if workMeta, ok := specEvt.Extensions()[codec.ExtensionWorkMeta]; ok { + evt.SetExtension(codec.ExtensionWorkMeta, workMeta) + } + + // set payloads + manifestBundleStatus := &workpayload.ManifestBundleStatus{} + if err := statusEvt.DataAs(manifestBundleStatus); err != nil { + return nil, err + } + + // set work spec back + manifestBundle := &workpayload.ManifestBundle{} + if err := specEvt.DataAs(manifestBundle); err != nil { + return nil, err + } + manifestBundleStatus.ManifestBundle = manifestBundle + + if err := evt.SetData(ce.ApplicationJSON, manifestBundleStatus); err != nil { + return nil, err + } + + return &evt, nil } // respondResyncStatusRequest responds to the status resync request by comparing the status hash of the resources diff --git a/cmd/maestro/server/pulse_server.go b/cmd/maestro/server/pulse_server.go index 096aa82b..e7c86a70 100644 --- a/cmd/maestro/server/pulse_server.go +++ b/cmd/maestro/server/pulse_server.go @@ -84,7 +84,7 @@ func (s *PulseServer) Start(ctx context.Context) { } func (s *PulseServer) pulse(ctx context.Context) { - log.V(4).Infof("Updating heartbeat for maestro instance: %s", s.instanceID) + log.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID) instance := &api.ServerInstance{ Meta: api.Meta{ ID: s.instanceID, @@ -98,7 +98,7 @@ func (s *PulseServer) pulse(ctx context.Context) { } func (s *PulseServer) checkInstances(ctx context.Context) { - log.V(4).Infof("Checking liveness of maestro instances") + log.V(10).Infof("Checking liveness of maestro instances") // lock the Instance with a fail-fast advisory lock context. // this allows concurrent processing of many instances by one or more maestro instances exclusively. lockOwnerID, acquired, err := s.lockFactory.NewNonBlockingLock(ctx, "maestro-instances-pulse-check", db.Instances) @@ -148,7 +148,8 @@ func (s *PulseServer) checkInstances(ctx context.Context) { // It runs asynchronously in the background until the provided context is canceled. func (s *PulseServer) startSubscription(ctx context.Context) { s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error { - log.V(1).Infof("received action %s for resource %s", action, resource.ID) + log.V(4).Infof("received action %s for resource %s", action, resource.ID) + switch action { case types.StatusModified: found, svcErr := s.resourceService.Get(ctx, resource.ID) @@ -192,18 +193,21 @@ func (s *PulseServer) startSubscription(ctx context.Context) { return svcErr } + log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID) + resource.Payload = found.Payload s.eventBroadcaster.Broadcast(resource) return nil } // update the resource status - _, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource) + updatedResource, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource) if svcErr != nil { return svcErr } // broadcast the resource status updated only when the resource is updated if updated { - s.eventBroadcaster.Broadcast(resource) + log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID) + s.eventBroadcaster.Broadcast(updatedResource) } default: return fmt.Errorf("unsupported action %s", action) diff --git a/data/generated/openapi/openapi.go b/data/generated/openapi/openapi.go index 0a98fcbd..6d9f251a 100755 --- a/data/generated/openapi/openapi.go +++ b/data/generated/openapi/openapi.go @@ -77,7 +77,7 @@ func (fi bindataFileInfo) Sys() interface{} { return nil } -var _openapiYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x5c\x5f\x8f\xdb\xb8\x11\x7f\xdf\x4f\x31\x40\x5b\x38\x39\xec\xda\x4e\xef\x0a\xb4\x46\x72\x40\x72\xbd\x14\x77\xc8\x25\x69\x36\x69\x1f\x8a\xc2\x4b\x93\x23\x8b\x89\x44\x2a\x24\xb5\x59\xa7\xed\x77\x2f\x48\xea\xbf\x25\xad\xec\xf3\xc6\xca\x9e\xf3\x92\x15\x35\x33\x9c\x21\x67\x7e\x1c\x92\x23\xcb\x04\x05\x49\xf8\x02\xbe\x9d\xce\xa7\xf3\x33\x2e\x02\xb9\x38\x03\x30\xdc\x44\xb8\x80\x98\xa0\x36\x4a\xc2\x25\xaa\x6b\x4e\x11\x9e\xbe\xfe\xe9\x0c\x80\xa1\xa6\x8a\x27\x86\x4b\xd1\x45\x72\x8d\x4a\xbb\xd7\xf3\xe9\x7c\xfa\xe8\x4c\xa3\xb2\x2d\x56\xf2\x05\xa4\x2a\x5a\x40\x68\x4c\xb2\x98\xcd\x22\x49\x49\x14\x4a\x6d\x16\x7f\x9e\xcf\xe7\x67\x00\x0d\xe9\x34\x55\x0a\x85\x01\x26\x63\xc2\x45\x9d\x5d\x2f\x66\x33\x92\xf0\xa9\x35\x41\x87\x3c\x30\x53\x2a\xe3\x6d\x11\xbf\x10\x2e\xe0\x41\xa2\x24\x4b\xa9\x6d\x79\x08\x5e\x9b\x76\x61\xda\x90\x35\xde\x26\xf2\xd2\x90\x35\x17\xeb\x5c\x50\x42\x4c\xe8\x6c\xb3\x12\x66\xd9\x80\xcc\xae\x1f\xcd\x14\x6a\x99\x2a\x8a\xee\x25\xc0\x1a\x8d\xff\x03\x40\xa7\x71\x4c\xd4\x66\x01\x6f\xd0\xa4\x4a\x68\x20\x10\x71\x6d\x40\x06\x50\x30\xe5\xa4\x48\x53\xc5\xcd\x26\x67\xb5\x6a\x3f\x43\xa2\x50\x2d\xe0\x5f\xff\xce\x1a\x15\xea\x44\x0a\x9d\xf7\x64\xff\x4d\xfe\x38\x9f\x4f\xca\xc7\x86\x09\x4f\xe1\xe7\xcb\x57\x2f\x81\x28\x45\x36\xd5\x5e\x41\xae\xde\x23\x35\xba\xc2\x47\xa5\x30\x28\x4c\x55\x14\x00\x49\x92\x88\x53\x62\x85\xcd\xde\x6b\x29\xea\x6f\x01\x34\x0d\x31\x26\xcd\x56\x80\xdf\x2b\x0c\x16\x30\xf9\xdd\x8c\xca\x38\x91\x02\x85\xd1\x33\x4f\xab\x67\x6f\x32\x1d\x5e\x70\x6d\x26\xa5\x1d\xdf\xcd\x1f\xf5\xd8\x91\x9a\x10\x8c\xfc\x80\x02\xb8\x06\x2e\xae\x49\xc4\xd9\x31\x94\xff\x51\x29\xa9\x6a\x5a\x7f\xdb\xad\xf5\x3b\x41\x52\x13\x4a\xc5\x3f\x23\x03\x23\x21\x41\x15\x48\x15\x83\x4c\x50\x39\xb5\xc6\x60\xc1\x9f\xfa\xfc\xe7\x9d\xc0\x9b\x04\xa9\x41\x06\x68\xf9\x40\x52\x17\xab\xc7\x1f\xfb\x84\x28\x12\xa3\xc9\xe0\x06\x5c\xbc\xb4\x31\x97\x74\xb3\x84\xac\x71\x32\x94\x58\xf3\xcf\x3b\x10\x23\x51\x34\x1c\x4c\x2e\x15\x43\xf5\x6c\x33\x98\x3e\xe0\x18\x31\xed\xc9\x13\x8b\xa2\x4d\x78\xf9\x41\x21\x31\x08\x04\x04\x7e\x2a\x62\x7c\x37\x60\xf9\x98\xa2\x36\xcf\x24\xab\xd0\xd5\x3c\x21\x8f\x5a\x60\xc4\x90\x82\xc4\xf2\x71\x85\x6c\x01\x46\xa5\x78\xd6\xe3\x12\xfd\x0e\xd1\xee\x0e\x43\x50\x64\xd2\x0b\x8d\x3d\x90\xe2\xc7\xec\x28\x8e\xdc\xd4\xdd\xe1\x48\x4f\x14\xfe\xc3\xa2\x9d\x53\xc1\x47\xa1\x1e\x4f\x18\x9e\x80\xfb\x88\x16\xfc\xa5\xdb\x82\x22\x5c\x49\xa4\x90\xb0\x0d\xe0\x0d\xd7\xc7\x59\xef\x77\x5a\x70\x9e\x0a\x48\xbb\xd6\x1c\xa0\x36\x64\x6d\x46\x66\x42\x6c\xc2\xdc\x71\x4c\xea\x4c\x05\x67\xff\xe1\xec\x7f\xdd\xf9\xe0\xdf\xd0\x00\x11\x65\x3a\xb6\xda\x40\x11\x16\x77\x93\x09\x16\x0e\x11\xc8\x54\xb0\x5a\x87\x5f\x74\xe8\x5a\xb1\xef\x04\x20\xc7\xb1\xe0\xbb\x6e\x0b\x5e\xca\xd2\x3b\x3f\x71\x13\x82\x4e\x90\xf2\x80\x23\x03\xce\xbe\x16\x34\x19\x6b\xfa\x9a\x10\x43\xc3\x2d\x50\x78\x97\x30\x97\xc5\x89\x3b\x4a\xe1\xbc\x7c\x56\xce\xeb\xc8\x52\xb9\xd7\x76\x54\xde\x78\x33\xfa\xd3\xba\x21\x38\x97\x66\xd6\xea\x94\x52\xd4\x3a\x48\xa3\x68\x33\x1a\xc0\x3b\x25\x7b\x5f\x58\xeb\x13\x56\x8f\xc2\x88\x7b\x98\xb1\x6e\xad\x31\x0e\x78\x6c\x96\x3a\x8a\x0c\xd5\x6a\x1b\xa1\xc1\xad\xd5\xe6\xaf\xae\x19\xc8\x9e\x8b\x4d\x1b\x2c\xf7\xb8\x68\x79\x7c\xe0\xba\xed\x80\xe5\x13\x32\x7e\x79\xad\x4f\xc8\x38\x02\x23\x76\x43\x18\x17\x43\x23\x42\x98\xe6\x59\xec\xad\x07\x9a\x9c\xf5\x6d\x9e\x2f\x56\xa9\x60\xd1\x7e\xd7\x29\x90\xf1\xde\xe9\x5e\xba\xf3\x56\xc5\x77\x3e\x86\xcb\x95\x67\x4e\x93\xd3\x15\xcb\x28\x20\xea\xab\xdc\xa3\xfe\x56\xaf\x58\x6e\x43\xa5\x5d\x4f\xf6\x3c\x24\x7c\xc1\x03\xbe\xac\xc7\x91\x9c\xf3\x79\x20\x3a\x81\xd0\x08\x2c\x18\x98\x27\x65\xfe\x73\x7f\xd2\xa5\xaf\x19\x50\xdb\x33\x25\x2a\x85\x4e\xe3\x42\xce\xb0\x14\xa9\x60\xfa\xa2\xb9\x51\xde\xeb\x31\x93\xa2\x1f\x32\x1d\x4e\xe9\xd0\x28\x90\xe8\xde\x44\xef\x8e\x09\xd1\x8e\x29\xd1\xce\x49\xd1\xee\x69\xd1\xc1\x6b\x4f\xf2\x68\xdf\x0d\x62\x6e\xbb\xb8\xc8\xe3\x77\x2c\x17\x16\xb9\x3e\x5f\x63\xed\x49\x53\xf7\xd3\xa1\xdb\x09\xc2\xf7\xb1\xa0\xe7\x24\xbf\x08\xd7\xaf\xec\x24\x7f\x78\xed\x49\x03\xe6\x8e\x63\x52\x67\x52\x38\x6c\x87\x5a\x24\x66\x77\xbf\x35\x2d\x1c\xe2\xc8\x7b\xd2\x56\xec\x3b\x01\xc8\x18\x77\xa3\x85\x77\x9e\xb6\xa1\x07\x3f\xac\xef\xaf\x3d\xb9\x9b\x14\x2e\xaf\x3d\xa1\x23\x4d\xe5\x0e\x52\x7b\x52\xe0\xdc\x58\x6a\x4f\x4e\xc9\xde\x18\xb4\x3e\x61\xf5\x28\x8c\xb8\x87\x19\x6b\x77\xed\xc9\x28\x32\xd4\xdb\x6b\x4f\xf6\x5b\x6c\x76\xac\x3d\x29\x8f\x0f\x4e\xb5\x27\x27\x64\x3c\xac\x05\xf7\x00\x19\xf7\xac\x3d\x19\x09\xc2\xec\x79\xa7\x52\xbe\xb1\x6c\x39\xee\x5c\x5a\xf9\x39\xb0\x64\xc0\x93\x49\x35\x9b\x04\xfd\x37\xc4\x67\x15\xbd\x71\x01\x2b\x47\x96\x35\xfa\x87\xe7\x52\xc5\xc4\x2c\xe0\xe7\x7f\xbe\x3d\xcb\x0d\xcc\x84\xbe\x72\xb7\x20\x6f\x30\x40\x85\x82\x62\x5d\xba\xbf\x22\xc9\x9a\x12\x65\x5d\xdd\xf0\x2a\xce\x71\x56\x1d\x27\xcf\xa4\x8d\xe2\x62\x5d\x34\x7f\xe0\xe2\x76\xa2\xd0\x0e\x50\x1f\xd1\x0b\x5e\x9e\xf4\x0e\xd4\x6d\x50\xc7\x09\x59\xe3\x36\x11\x17\x06\xd7\x15\x4f\xd2\xfc\xf3\x00\x2a\x23\x0d\x89\x6e\x23\x2b\x76\x16\x95\x15\xc5\x6a\x5a\x79\xb4\x3a\x55\x1e\x6d\xe7\x95\x47\xd7\x4b\xe5\x99\x1b\x8c\x7d\xd8\x3a\x27\xcc\xe5\x92\x28\x7a\x15\xf4\x7b\x60\xee\xbc\x0d\x17\x28\x4b\x14\x5a\x06\xba\x7d\xa8\x6d\xa4\x31\xac\x87\x4c\xeb\x70\x5b\xfb\xc9\x56\xcc\x75\x90\x16\xc8\xba\xac\xbb\x59\x0b\x83\x33\xbd\xea\x23\x3b\x98\x5f\xbd\x84\xdb\xc9\x66\x37\xf2\x6d\x8a\xb9\xbb\xc6\x5a\x7b\x0b\xe9\x60\x40\xc9\x0b\x17\x8e\x34\xb3\x82\xc4\xc3\x66\x36\xc7\xdf\xe5\x60\x8e\xfc\xd7\x1a\x5a\x68\x9b\xb1\x05\xfe\xbc\x13\xd9\x92\x98\x41\xb2\x01\x82\x0c\xf4\xec\xce\xf7\xc2\xf0\xb8\x5a\x94\x98\xed\x87\x0f\x23\x2c\xcb\xe2\x0e\x23\x2c\x26\x82\x07\xa8\x5b\x45\x35\xe6\x2b\xef\x79\x29\xfd\xda\x38\x84\xc3\x1b\xbe\xd4\x46\x11\x83\xeb\xcd\x20\x1e\x6d\x88\x49\x5b\x1d\xbd\x42\x5a\xfd\x11\x85\xfb\x12\x84\xf5\x2f\x65\xda\xbe\x0a\xda\x71\x49\x6a\x71\xf8\x76\x77\x6f\xf3\x82\xd6\x41\xe9\xf4\x80\x56\xea\x9e\xd9\xef\x9c\xd0\xb2\x70\xf3\xbe\x4d\x6b\xb5\x12\xac\xde\x76\x82\xda\xdf\x12\xd4\xfe\x1a\x3f\x3b\x18\x30\xe7\xca\x2c\xa9\x14\x01\x5f\xdf\x81\x4e\x83\x60\x3c\x3f\x9a\x68\x8d\x80\x3d\x63\xa0\x33\x0a\xba\xe2\xa0\x2d\x12\x7a\xe6\x38\x22\x2b\x8c\x86\x8e\x82\x33\x8a\x31\x6e\x27\x86\x44\xaf\x3b\xfa\xef\xed\xaf\x2b\x3c\x7a\x58\xfa\x1d\xb1\x3b\x48\xf6\x10\x59\xad\x2d\xdb\x6b\x16\xeb\x45\x69\x3b\x4f\x5d\x8f\x4b\x6e\xfb\x6f\x07\xf9\x2e\x97\x08\x6d\x17\x26\x3b\x2e\xcb\xdb\x0e\xd4\x61\xf3\xed\x8e\xd3\x98\xae\xe6\x61\x40\xb9\x91\x71\x1e\x5e\x5e\x3b\x73\xb1\x80\x84\x98\x30\x7b\xac\x1d\x79\xbc\x0d\x11\x38\xf3\xdf\x75\x50\xa9\x72\x96\xd6\x3b\xaa\xe6\xe1\xc5\x96\xfb\x54\x37\xbc\x5e\x87\xca\x76\xd3\x6a\xf1\x31\x45\xb5\x69\x53\xe3\x35\x59\x23\x88\x34\x5e\xa1\x2a\x75\xf1\xc5\x9c\x9f\x42\x14\xb5\x06\xbc\xa1\x88\x4c\x57\x4e\x98\x6c\x2f\xd5\xad\x6c\xbb\xa2\xcd\xb5\x88\x61\x40\xd2\xc8\x2c\xe0\x51\x99\x1a\x71\xc1\xe3\x34\x2e\x9b\xca\x71\x08\x48\xa4\xbd\xfc\xea\x86\xdd\x5b\x59\xe9\xba\xd7\xca\x5f\xc8\x8d\x15\xbf\x65\xa8\x06\x23\x41\xb9\x1a\xd6\x3d\x2d\xc8\x7e\x67\xae\x66\xc3\xbc\xcf\x06\x57\x4b\xd7\xb0\xc2\xb5\x75\xd8\xd1\x26\xa4\x61\xdd\x7f\x2f\x0a\x1d\x2e\xb3\xa9\xd1\xae\x80\xc4\x0b\x06\xaa\xb8\x41\xc5\xc9\xd4\x39\x9d\xde\x08\x43\x6e\xec\x18\x98\x90\xeb\xd2\x99\x81\x97\xe7\x84\x9a\xc7\x3c\x22\xca\x8e\x8e\x69\xb0\x20\x2c\x3f\x85\xa8\x70\x09\x34\x22\xa9\x46\xdb\x4a\x04\x5c\xfe\xfd\x85\x5b\x8b\x30\x46\x61\xce\xcb\xdc\x54\xe7\xc5\x2c\xd6\x54\x9d\x8b\x78\xaf\xa5\x00\x62\x8c\xe2\xab\xd4\xa0\x86\x19\x50\x19\xa5\xb1\xa8\x53\x11\x4a\x65\x2a\xcc\x14\x0a\x71\xcf\xa5\x02\xbc\x21\x71\x12\xe1\x39\x70\x01\xae\xd0\x30\x9b\x43\xc5\xf1\x1a\x2d\x28\x56\x79\xb5\x3f\x13\x25\x90\x6a\x54\x56\x78\x69\xa2\x21\xca\x9d\x30\x3a\x82\xab\x78\x73\xb5\x38\x2b\x5e\x5e\x5d\x5d\xe9\x8f\x51\xc5\x0a\xcf\x0c\x11\xff\x80\x30\x89\x37\x7f\x98\x54\x49\x4b\xbe\xb7\xdb\x83\x0e\x94\x08\x20\x91\x96\xb0\x42\x7f\x4a\x89\x0c\xa4\x0d\xac\xa8\xf6\x33\x09\xd3\x3d\x8c\xd4\xe9\xaa\x70\x03\xed\x01\x0f\x5d\xe1\xcb\x55\x20\xe5\x93\x15\x51\x57\xe7\x9d\x36\x55\x79\x97\x1e\x2b\xa7\x1f\x70\x03\x4f\x60\x12\x48\x39\x01\x22\x58\x2b\xcd\x35\x89\x52\xb4\x54\x2b\xa2\x3a\x46\xe1\x27\x3f\x7d\x55\xcf\x12\x13\x63\x41\xfa\x9a\x33\x64\xe7\x20\x15\x70\x4f\xe3\xa5\x71\x0d\x18\x27\x66\x73\x6e\xdb\xca\x23\xf7\xad\xb9\x34\x21\x31\xae\xc5\x4e\x08\x84\x44\x43\x82\x2a\xe6\xda\x26\xc1\x76\x80\x34\x22\x7c\xe2\x51\x04\xab\x72\x9e\x7d\x74\x23\x9b\x0e\xc5\xd2\xac\x78\xb5\x1e\xa2\x59\xe3\x1d\xc4\xa8\x9f\xdd\xd5\xe6\xe0\x51\x9a\x0b\x1e\x16\xa8\xab\xd4\xec\x1c\xac\x8d\x30\xdd\xd1\x81\x8b\x59\x75\xaf\xbd\xdf\xe6\x81\x36\x20\x14\x89\xa6\xed\xde\xf7\x4a\xed\xd7\x27\x2c\x89\x60\x4b\x08\xb8\xd2\x06\x86\x2b\x71\xee\x39\x5e\xf6\xea\x74\xa8\x88\x10\x12\xf0\x26\x89\x38\xe5\xc6\x9b\xe0\x01\xcc\x79\x7c\x0e\x2e\x83\x1d\xdd\xd7\x5c\xd7\xfd\xdc\xb7\x1d\xc6\xcd\x53\xa7\x8f\x76\xf7\xaf\x71\x4c\x2e\x34\x5a\xfb\x2d\xe6\xe5\xdf\x8a\xf8\xde\xec\x2c\xad\x70\x2b\x50\x01\x9e\xfb\xd7\x32\xb0\x40\x74\xa1\x8d\x4a\xa9\x49\x95\x95\x28\x5c\xe2\xe4\x32\x4f\x6d\x67\x03\x1e\x17\x6f\xbf\x9f\x3e\x76\x62\xbf\x07\x21\x8d\x3b\x68\x2e\x05\x3e\xd6\x26\x27\xfa\x06\x62\x24\x42\x3b\xaf\x70\xf4\x4e\x20\x14\x62\x0a\x9e\x1f\xbd\x23\x2f\xbc\x57\x13\x1a\xc2\x65\x05\x15\xad\xee\x6b\x34\xc0\xd9\xb9\xbb\xee\x38\x87\x24\x22\xe2\x01\x67\x4e\xc7\x0f\x5c\xb0\x87\xee\x2f\x0f\x9e\xf0\xa0\xe8\x4e\x3f\xac\x79\x57\xf1\xb7\xa4\xb1\x13\x58\x87\xf6\x8b\x8b\xd2\x75\x3c\xfb\x13\xce\xce\x5d\x87\xb6\xbf\x29\x67\xfe\x7f\xdb\xe1\x79\x06\xd4\xdf\xd4\xb9\xd0\xd0\xf0\x85\x7b\xf3\xa4\x56\xfd\x54\x76\xde\xeb\x30\xff\x0f\x00\x00\xff\xff\x5d\x61\x06\x2e\xdb\x57\x00\x00") +var _openapiYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xec\x5c\x5f\x8f\xdb\xb8\x11\x7f\xdf\x4f\x31\x40\x5b\x38\x39\xec\xda\x4e\xef\x0a\xb4\x46\x72\x40\x72\xbd\x14\x77\xc8\x25\x69\x36\x69\x1f\x8a\xc2\x4b\x93\x23\x8b\x89\x44\x2a\x24\xb5\x59\xa7\xed\x77\x2f\x48\xea\xbf\x25\xad\xec\xf3\xc6\xca\x9e\xf3\x92\x35\x35\x1c\xce\x90\xbf\xf9\x69\x48\x8e\x2d\x13\x14\x24\xe1\x0b\xf8\x76\x3a\x9f\xce\xcf\xb8\x08\xe4\xe2\x0c\xc0\x70\x13\xe1\x02\x62\x82\xda\x28\x09\x97\xa8\xae\x39\x45\x78\xfa\xfa\xa7\x33\x00\x86\x9a\x2a\x9e\x18\x2e\x45\x97\xc8\x35\x2a\xed\x1e\xcf\xa7\xf3\xe9\xa3\x33\x8d\xca\xb6\x58\xcd\x17\x90\xaa\x68\x01\xa1\x31\xc9\x62\x36\x8b\x24\x25\x51\x28\xb5\x59\xfc\x79\x3e\x9f\x9f\x01\x34\xb4\xd3\x54\x29\x14\x06\x98\x8c\x09\x17\xf5\xee\x7a\x31\x9b\x91\x84\x4f\xad\x0b\x3a\xe4\x81\x99\x52\x19\x6f\xab\xf8\x85\x70\x01\x0f\x12\x25\x59\x4a\x6d\xcb\x43\xf0\xd6\xb4\x2b\xd3\x86\xac\xf1\x36\x95\x97\x86\xac\xb9\x58\xe7\x8a\x12\x62\x42\xe7\x9b\xd5\x30\xcb\x26\x64\x76\xfd\x68\xa6\x50\xcb\x54\x51\x74\x0f\x01\xd6\x68\xfc\x1f\x00\x3a\x8d\x63\xa2\x36\x0b\x78\x83\x26\x55\x42\x03\x81\x88\x6b\x03\x32\x80\xa2\x53\x2e\x8a\x34\x55\xdc\x6c\xf2\xae\xd6\xec\x67\x48\x14\xaa\x05\xfc\xeb\xdf\x59\xa3\x42\x9d\x48\xa1\xf3\x91\xec\xbf\xc9\x1f\xe7\xf3\x49\xf9\xb1\xe1\xc2\x53\xf8\xf9\xf2\xd5\x4b\x20\x4a\x91\x4d\x75\x54\x90\xab\xf7\x48\x8d\xae\xf4\xa3\x52\x18\x14\xa6\xaa\x0a\x80\x24\x49\xc4\x29\xb1\xca\x66\xef\xb5\x14\xf5\xa7\x00\x9a\x86\x18\x93\x66\x2b\xc0\xef\x15\x06\x0b\x98\xfc\x6e\x46\x65\x9c\x48\x81\xc2\xe8\x99\x97\xd5\xb3\x37\x99\x0d\x2f\xb8\x36\x93\xd2\x8f\xef\xe6\x8f\x7a\xfc\x48\x4d\x08\x46\x7e\x40\x01\x5c\x03\x17\xd7\x24\xe2\xec\x18\xc6\xff\xa8\x94\x54\x35\xab\xbf\xed\xb6\xfa\x9d\x20\xa9\x09\xa5\xe2\x9f\x91\x81\x91\x90\xa0\x0a\xa4\x8a\x41\x26\xa8\x9c\x59\x63\xf0\xe0\x4f\x7d\xf8\x79\x27\xf0\x26\x41\x6a\x90\x01\xda\x7e\x20\xa9\x8b\xd5\xe3\xcf\x7d\x42\x14\x89\xd1\x64\x74\x03\x2e\x5e\xda\x3a\x97\x72\xb3\x84\xac\x71\x32\x54\x58\xf3\xcf\x3b\x08\x23\x51\x34\x1c\x2c\x2e\x15\x43\xf5\x6c\x33\x58\x3e\xe0\x18\x31\xed\xc5\x13\xcb\xa2\x4d\x7a\xf9\x41\x21\x31\x08\x04\x04\x7e\x2a\x62\x7c\x37\x62\xf9\x98\xa2\x36\xcf\x24\xab\xc8\xd5\x90\x90\x47\x2d\x30\x62\x48\x21\x62\xfb\x71\x85\x6c\x01\x46\xa5\x78\xd6\x03\x89\x7e\x40\xb4\xc3\x61\x08\x8b\x4c\x7a\xa9\xb1\x87\x52\xfc\x9c\x1d\x05\xc8\x4d\xdb\x1d\x8f\xf4\x44\xe1\x3f\x2c\xdb\x39\x13\x7c\x14\xea\xf1\x84\xe1\x89\xb8\x8f\xe8\xc1\x5f\xba\x3d\x28\xc2\x95\x44\x0a\x09\xdb\x00\xde\x70\x7d\x9c\xf7\xfd\x4e\x2f\x9c\xa7\x02\xd2\xae\x77\x0e\x50\x1b\xb2\x36\x23\x33\x21\x36\x69\xee\x38\x2e\x75\xa6\x82\xb3\xff\x70\xf6\xbf\xee\x7c\xf0\x6f\x68\x80\x88\x32\x1d\x5b\x6d\xa0\x08\x8b\xbb\xc9\x04\x0b\x40\x04\x32\x15\xac\x36\xe0\x17\x9d\xba\x56\xee\x3b\x11\xc8\x71\x3c\xf8\xae\xdb\x83\x97\xb2\x44\xe7\x27\x6e\x42\xd0\x09\x52\x1e\x70\x64\xc0\xd9\xd7\xc2\x26\x63\x4d\x5f\x13\x62\x68\xb8\x45\x0a\xef\x12\xe6\xb2\x38\x71\x47\x29\x9c\xd7\xcf\xca\x75\x1d\x59\x2a\xf7\xda\xce\xca\x1b\xef\x46\x7f\x5a\x37\x84\xe7\xd2\xcc\x5b\x9d\x52\x8a\x5a\x07\x69\x14\x6d\x46\x43\x78\xa7\x64\xef\x0b\x5b\x7d\xe2\xea\x51\x38\x71\x0f\x33\xd6\xad\x77\x8c\x23\x1e\x9b\xa5\x8e\x22\x43\xb5\xd6\x46\x68\x70\xeb\x6d\xf3\x57\xd7\x0c\x64\xcf\x97\x4d\x1b\x2d\xf7\x40\xb4\x3c\x3e\x70\xc3\x76\xd0\xf2\x89\x19\xbf\xbc\xd5\x27\x66\x1c\x81\x13\xbb\x31\x8c\x8b\xa1\x11\x31\x4c\xf3\x2c\xf6\xd6\x03\x4d\xce\xfa\x36\xcf\x17\xab\x54\xb0\x68\xbf\xeb\x14\xc8\xfa\xde\xe9\x5e\xba\xf3\x56\xc5\x0f\x3e\x86\xcb\x95\x67\xce\x92\xd3\x15\xcb\x28\x28\xea\xab\xdc\xa3\xfe\x56\xaf\x58\x6e\x63\xa5\x5d\x4f\xf6\x3c\x25\x7c\xc1\x03\xbe\x6c\xc4\x91\x9c\xf3\x79\x22\x3a\x91\xd0\x08\x3c\x18\x98\x27\x65\xf8\xb9\x3f\xe9\xd2\xd7\x4c\xa8\xed\x99\x12\x95\x42\xa7\x71\xa1\x67\x58\x8a\x54\x74\xfa\xa2\xb9\x51\x3e\xea\x31\x93\xa2\x1f\x32\x1b\x4e\xe9\xd0\x28\x98\xe8\xde\x44\xef\x8e\x09\xd1\x8e\x29\xd1\xce\x49\xd1\xee\x69\xd1\xc1\x6b\x4f\xf2\x68\xdf\x8d\x62\x6e\xbb\xb8\xc8\xe3\x77\x2c\x17\x16\xb9\x3d\x5f\x63\xed\x49\xd3\xf6\xd3\xa1\xdb\x89\xc2\xf7\xf1\xa0\xe7\x24\xbf\x08\xd7\xaf\xec\x24\x7f\x78\xed\x49\x83\xe6\x8e\xe3\x52\x67\x52\x38\x6c\x87\x5a\x24\x66\x77\xbf\x35\x2d\x00\x71\xe4\x3d\x69\x2b\xf7\x9d\x08\x64\x8c\xbb\xd1\x02\x9d\xa7\x6d\xe8\xc1\x0f\xeb\xfb\x6b\x4f\xee\x26\x85\xcb\x6b\x4f\xe8\x48\x53\xb9\x83\xd4\x9e\x14\x3c\x37\x96\xda\x93\x53\xb2\x37\x06\xab\x4f\x5c\x3d\x0a\x27\xee\x61\xc6\xda\x5d\x7b\x32\x8a\x0c\xf5\xf6\xda\x93\xfd\x5e\x36\x3b\xd6\x9e\x94\xc7\x07\xa7\xda\x93\x13\x33\x1e\xd6\x83\x7b\xc0\x8c\x7b\xd6\x9e\x8c\x84\x61\xf6\xbc\x53\x29\x9f\xd8\x6e\x39\xef\x5c\x5a\xfd\x39\xb1\x64\xc4\x93\x69\x35\x9b\x04\xfd\x77\x88\xcf\x2a\x76\xe3\x02\x56\x4e\x2c\x6b\xf4\x1f\x9e\x4b\x15\x13\xb3\x80\x9f\xff\xf9\xf6\x2c\x77\x30\x53\xfa\xca\xdd\x82\xbc\xc1\x00\x15\x0a\x8a\x75\xed\xfe\x8a\x24\x6b\x4a\x94\x85\xba\xe1\x55\x9e\xe3\xac\x3a\x4f\xbe\x93\x36\x8a\x8b\x75\xd1\xfc\x81\x8b\xdb\x85\x42\x3b\x41\x7d\x42\x2f\x78\x79\xd2\x3b\xd0\xb6\x41\x03\x27\x64\x8d\xdb\x42\x5c\x18\x5c\x57\x90\xa4\xf9\xe7\x01\x52\x46\x1a\x12\xdd\x26\x56\xec\x2c\x2a\x6f\x14\x6b\x69\xe5\xa3\xb5\xa9\xf2\xd1\x0e\x5e\xf9\xe8\x46\xa9\x7c\xe6\x06\x63\x1f\xb6\x0e\x84\xb9\x5e\x12\x45\xaf\x82\x7e\x04\xe6\xe0\x6d\x40\xa0\x2c\x51\x68\x99\xe8\xf6\xa9\xb6\x91\xc6\xb0\x1e\x32\xad\xd3\x6d\xfd\x27\x5b\x31\xd7\x21\x5a\x30\xeb\xb2\x0e\xb3\x96\x0e\xce\xf5\x2a\x46\x76\x70\xbf\x7a\x09\xb7\x93\xcf\x6e\xe6\xdb\x0c\x73\x77\x8d\xb5\xf6\x16\xd1\xc1\x84\x92\x17\x2e\x1c\x69\x65\x05\x89\x87\xad\x6c\xce\xbf\xcb\xc1\x3d\xf2\x5f\x6b\x68\x91\x6d\xc6\x16\xf8\xf3\x4e\x64\x4b\x62\x06\xe9\x06\x08\x32\xd2\xb3\x3b\xdf\x0b\xc3\xe3\x6a\x51\x62\xb6\x1f\x3e\x8c\xb2\x2c\x8b\x3b\x8c\xb2\x98\x08\x1e\xa0\x6e\x55\xd5\x58\xaf\x7c\xe4\xa5\xf4\xef\xc6\x21\x3d\xbc\xe3\x4b\x6d\x14\x31\xb8\xde\x0c\xea\xa3\x0d\x31\x69\x2b\xd0\x2b\xa2\xd5\x1f\x51\xb8\x2f\x41\x58\xff\xa6\x4c\xdb\xb7\x82\x76\x7c\x25\xb5\x00\xbe\x1d\xee\x6d\x28\x68\x9d\x94\x4e\x04\xb4\x4a\xf7\xac\x7e\xe7\x82\x96\x85\x9b\xf7\x6d\x59\xab\x95\x60\xf5\xb6\x13\xd5\xfe\x26\xa8\x16\x0d\x61\xc4\x90\x41\x24\x98\x47\xe4\xaf\x01\xe5\xc1\x58\x3c\x37\x66\x49\xa5\x08\xf8\xfa\x0e\x6c\x1a\xc4\xf9\xf9\x39\x46\x6b\xb8\xec\x19\x30\x9d\x21\xd3\x15\x34\x6d\x61\xd3\x03\x88\x88\xac\x30\x1a\x3a\x0b\xce\x29\xc6\xb8\x5d\x18\x12\xbd\xee\x18\xbf\x77\xbc\xae\x58\xea\xe9\xd2\x8f\xda\xee\x88\xda\x43\x65\xb5\x10\x6d\xaf\x55\xac\x57\xb0\xed\xbc\x74\x3d\x90\xdc\xc6\x6f\x87\xf8\x2e\x37\x0e\x6d\xb7\x2b\x3b\xbe\xc3\xb7\x01\xd4\xe1\xf3\xed\xc0\x69\x2c\x57\xf3\xe4\xa0\xdc\xf5\x38\x84\x97\x77\xd4\x5c\x2c\x20\x21\x26\xcc\x3e\xd6\xce\x47\xde\x86\x08\x9c\xf9\x2f\x81\x50\xa9\xf2\x2e\xad\x17\x5a\xcd\x93\x8e\x2d\xf8\x54\x77\xc7\xde\x86\xca\xde\xd4\x5a\xf1\x31\x45\xb5\x69\x33\xe3\x35\x59\x23\x88\x34\x5e\xa1\x2a\x6d\xf1\x95\x9f\x9f\x42\x14\xb5\x06\xbc\xa1\x88\x4c\x57\x8e\xa3\xec\x28\xd5\x7d\x6f\xbb\xa1\xcd\x17\x17\xc3\x80\xa4\x91\x59\xc0\xa3\x32\x8f\xe2\x82\xc7\x69\x5c\x36\x95\xf3\x10\x90\x48\x7b\xfd\xd5\xdd\xbd\xf7\xb2\x32\x74\xaf\x97\xbf\x90\x1b\xab\x7e\xcb\x51\x0d\x46\x82\x72\x05\xaf\x7b\x7a\x90\xfd\x28\x5d\xcd\x87\x79\x9f\x0f\xae\xf0\xae\xe1\x85\x6b\xeb\xf0\xa3\x4d\x49\xc3\xbb\xff\x5e\x14\x36\x5c\x66\x4b\xa3\x5d\xb5\x89\x57\x0c\x54\x71\x83\x8a\x93\xa9\x03\x9d\xde\x08\x43\x6e\xec\x1c\x98\x90\xeb\x12\xcc\xc0\xcb\x43\x45\xcd\x63\x1e\x11\x65\x67\xc7\x34\xba\x20\x2c\x3f\x85\xa8\x70\x09\x34\x22\xa9\x46\xdb\x4a\x04\x5c\xfe\xfd\x85\x7b\x17\x61\x8c\xc2\x9c\x97\x89\xac\xce\x2b\x5f\xac\xab\x3a\x57\xf1\x5e\x4b\x01\xc4\x18\xc5\x57\xa9\x41\x0d\x33\xa0\x32\x4a\x63\x51\x97\x22\x94\xca\x54\x98\x29\x14\xea\x9e\x4b\x05\x78\x43\xe2\x24\xc2\x73\xe0\x02\x5c\x55\x62\xb6\x86\x8a\xe3\x35\x5a\x52\xac\xf6\xd5\xfe\x00\x95\x40\xaa\x51\x59\xe5\xa5\x8b\x86\x28\x77\x1c\xe9\x04\xae\xe2\xcd\xd5\xe2\xac\x78\x78\x75\x75\xa5\x3f\x46\x15\x2f\x7c\x67\x88\xf8\x07\x84\x49\xbc\xf9\xc3\xa4\x2a\x5a\xf6\x7b\xbb\x3d\xe9\x40\x89\x00\x12\x69\x09\x2b\xf4\x47\x9a\xc8\x40\xda\xc0\x8a\x6a\xbf\xa9\x30\xdd\xc3\x49\x9d\xae\x0a\x18\x68\x4f\x78\xe8\xaa\x64\xae\x02\x29\x9f\xac\x88\xba\x3a\xef\xf4\xa9\xda\x77\xe9\xb9\x72\xfa\x01\x37\xf0\x04\x26\x81\x94\x13\x20\x82\xb5\xca\x5c\x93\x28\x45\x2b\xb5\x22\xaa\x63\x16\x7e\xf2\xcb\x57\x45\x96\x98\x18\x4b\xd2\xd7\x9c\x21\x3b\x07\xa9\x80\x7b\x19\xaf\x8d\x6b\xc0\x38\x31\x9b\x73\xdb\x56\x9e\xcf\x6f\xad\xa5\x09\x89\x71\x2d\x76\x41\x20\x24\x1a\x12\x54\x31\xd7\x36\x63\xb6\x13\xa4\x11\xe1\x13\x8f\x22\x58\x95\xeb\xec\xa3\x1b\xd9\x74\x28\x97\x66\x95\xae\xf5\x10\xcd\x1a\xef\x20\x46\xfd\xea\xae\x36\x07\x8f\xd2\x5c\xf1\xb0\x40\x5d\xa5\x66\xe7\x60\x6d\x84\xe9\x8e\x00\x2e\x56\xd5\x3d\xf6\xb8\xcd\x03\x6d\x40\x28\x12\x4d\xdb\xd1\xf7\x4a\xed\x37\x26\x2c\x89\x60\x4b\x08\xb8\xd2\x06\x86\x1b\x71\xee\x7b\xbc\xec\xb5\xe9\x50\x11\x21\x24\xe0\x4d\x12\x71\xca\x8d\x77\xc1\x13\x98\x43\x7c\x4e\x2e\x83\x81\xee\x0b\xb4\xeb\x38\xf7\x6d\x87\x81\x79\xea\xec\xd1\xee\xb2\x36\x8e\xc9\x85\x46\xeb\xbf\xe5\xbc\xfc\x8b\x25\x7e\x34\xbb\x4a\x2b\xdc\x0a\x54\x80\xe7\xfe\xb1\x0c\x2c\x11\x5d\x68\xa3\x52\x6a\x52\x65\x35\x0a\x97\x38\xb9\xcc\x53\xdb\xd5\x80\xc7\xc5\xd3\xef\xa7\x8f\x9d\xda\xef\x41\x48\xe3\x4e\xa5\x4b\x85\x8f\xb5\xc9\x85\xbe\x81\x18\x89\xd0\x0e\x15\x4e\xde\x29\x84\x42\x4d\xd1\xe7\x47\x0f\xe4\x85\x47\x35\xa1\x21\x5c\x56\x58\xd1\xda\xbe\x46\x03\x9c\x9d\xbb\xbb\x91\x73\x48\x22\x22\x1e\x70\xe6\x6c\xfc\xc0\x05\x7b\xe8\xfe\xf2\xe4\x09\x0f\x8a\xe1\xf4\xc3\x1a\xba\x8a\xbf\x25\x8d\x9d\xc2\x3a\xb5\x5f\x5c\x94\xd0\xf1\xdd\x9f\x70\x76\xee\x06\xb4\xe3\x4d\x39\xf3\xff\xdb\x01\xcf\x33\xa2\xfe\xa6\xde\x0b\x0d\x0d\x5f\xb8\x27\x4f\x6a\xa5\x52\xe5\xe0\xbd\x80\xf9\x7f\x00\x00\x00\xff\xff\x20\x8c\x16\x92\x08\x58\x00\x00") func openapiYamlBytes() ([]byte, error) { return bindataRead( @@ -92,7 +92,7 @@ func openapiYaml() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "openapi.yaml", size: 22491, mode: os.FileMode(493), modTime: time.Unix(1718089229, 0)} + info := bindataFileInfo{name: "openapi.yaml", size: 22536, mode: os.FileMode(493), modTime: time.Unix(1718269774, 0)} a := &asset{bytes: bytes, info: info} return a, nil } diff --git a/examples/manifestworkclient/README.md b/examples/manifestworkclient/README.md new file mode 100644 index 00000000..911bcb8a --- /dev/null +++ b/examples/manifestworkclient/README.md @@ -0,0 +1,324 @@ +# gRPC Source ManifestWork Client + +This example shows how to build a source ManifestWork client with `RESTFullAPIWatcherStore` and watch/create/get/update/delete works by the client. + +## Build the client + +Using sdk-go to build a source ManifestWork client with `RESTFullAPIWatcherStore` + +```golang +sourceID := "mw-client-example" + +workClient, err := work.NewClientHolderBuilder(grpcOptions). + WithClientID(fmt.Sprintf("%s-client", sourceID)). + WithSourceID(sourceID). + WithCodecs(codec.NewManifestBundleCodec()). + WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(ctx, maestroAPIClient, sourceID)). + WithResyncEnabled(false). + NewSourceClientHolder(ctx) + +if err != nil { + log.Fatal(err) +} + +// watch/create/patch/get/delete/list by workClient +``` + + +## Run the example + +1. Run `make e2e-test/setup` to prepare the environment + +2. Run the client-a `go run examples/manifestworkclient/client-a/main.go --consumer-name=$(cat test/e2e/.consumer_name)` to watch the status of works in a terminal + +3. Run the client-b `go run examples/manifestworkclient/client-b/main.go --consumer-name=$(cat test/e2e/.consumer_name)` to create/get/update/delete work in other new terminal + +The output of the client-b + +``` +the work 432999ad-b13e-4f24-8c0b-ceca24ad79ba/work-dpfqk (uid=b33b9c35-0dd4-5b4d-b419-fe9a21045fa7) is created +the work 432999ad-b13e-4f24-8c0b-ceca24ad79ba/work-dpfqk (uid=b33b9c35-0dd4-5b4d-b419-fe9a21045fa7) is updated +the work 432999ad-b13e-4f24-8c0b-ceca24ad79ba/work-dpfqk (uid=b33b9c35-0dd4-5b4d-b419-fe9a21045fa7) is deleted +``` + +The output of the client-a + +``` +watched work (uid=b33b9c35-0dd4-5b4d-b419-fe9a21045fa7) is modified +{ + "metadata": { + "name": "work-dpfqk", + "namespace": "432999ad-b13e-4f24-8c0b-ceca24ad79ba", + "uid": "b33b9c35-0dd4-5b4d-b419-fe9a21045fa7", + "resourceVersion": "2", + "creationTimestamp": null + "labels": { + "work.label": "example" + }, + "annotations": { + "work.annotations": "example" + } + }, + "spec": { + "workload": { + "manifests": [ + { + "apiVersion": "v1", + "data": { + "test": "zpchv" + }, + "kind": "ConfigMap", + "metadata": { + "name": "work-dpfqk", + "namespace": "default" + } + } + ] + } + }, + "status": { + "conditions": [ + { + "type": "Applied", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "AppliedManifestWorkComplete", + "message": "Apply manifest work complete" + }, + { + "type": "Available", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "ResourcesAvailable", + "message": "All resources are available" + } + ], + "resourceStatus": { + "manifests": [ + { + "resourceMeta": { + "ordinal": 0, + "group": "", + "version": "v1", + "kind": "ConfigMap", + "resource": "configmaps", + "name": "work-dpfqk", + "namespace": "default" + }, + "statusFeedback": {}, + "conditions": [ + { + "type": "Applied", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "AppliedManifestComplete", + "message": "Apply manifest complete" + }, + { + "type": "Available", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "ResourceAvailable", + "message": "Resource is available" + }, + { + "type": "StatusFeedbackSynced", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "NoStatusFeedbackSynced", + "message": "" + } + ] + } + ] + } + } +} +watched work (uid=b33b9c35-0dd4-5b4d-b419-fe9a21045fa7) is modified +{ + "metadata": { + "name": "work-dpfqk", + "namespace": "432999ad-b13e-4f24-8c0b-ceca24ad79ba", + "uid": "b33b9c35-0dd4-5b4d-b419-fe9a21045fa7", + "resourceVersion": "2", + "creationTimestamp": null, + "labels": { + "work.label": "example" + }, + "annotations": { + "work.annotations": "example" + } + }, + "spec": { + "workload": { + "manifests": [ + { + "apiVersion": "v1", + "data": { + "test": "zpchv" + }, + "kind": "ConfigMap", + "metadata": { + "name": "work-dpfqk", + "namespace": "default" + } + } + ] + } + }, + "status": { + "conditions": [ + { + "type": "Applied", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "AppliedManifestWorkComplete", + "message": "Apply manifest work complete" + }, + { + "type": "Available", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "ResourcesAvailable", + "message": "All resources are available" + } + ], + "resourceStatus": { + "manifests": [ + { + "resourceMeta": { + "ordinal": 0, + "group": "", + "version": "v1", + "kind": "ConfigMap", + "resource": "configmaps", + "name": "work-dpfqk", + "namespace": "default" + }, + "statusFeedback": {}, + "conditions": [ + { + "type": "Applied", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "AppliedManifestComplete", + "message": "Apply manifest complete" + }, + { + "type": "Available", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "ResourceAvailable", + "message": "Resource is available" + }, + { + "type": "StatusFeedbackSynced", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "NoStatusFeedbackSynced", + "message": "" + } + ] + } + ] + } + } +} +watched work (uid=b33b9c35-0dd4-5b4d-b419-fe9a21045fa7) is deleted +{ + "metadata": { + "name": "work-dpfqk", + "namespace": "432999ad-b13e-4f24-8c0b-ceca24ad79ba", + "uid": "b33b9c35-0dd4-5b4d-b419-fe9a21045fa7", + "resourceVersion": "1", + "creationTimestamp": null, + "labels": { + "work.label": "example" + }, + "annotations": { + "work.annotations": "example" + } + }, + "spec": { + "workload": { + "manifests": [ + { + "apiVersion": "v1", + "data": { + "test": "zpchv" + }, + "kind": "ConfigMap", + "metadata": { + "name": "work-dpfqk", + "namespace": "default" + } + } + ] + } + }, + "status": { + "conditions": [ + { + "type": "Applied", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "AppliedManifestWorkComplete", + "message": "Apply manifest work complete" + }, + { + "type": "Available", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "ResourcesAvailable", + "message": "All resources are available" + }, + { + "type": "Deleted", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:18Z", + "reason": "ManifestsDeleted", + "message": "The manifests are deleted from the cluster 432999ad-b13e-4f24-8c0b-ceca24ad79ba" + } + ], + "resourceStatus": { + "manifests": [ + { + "resourceMeta": { + "ordinal": 0, + "group": "", + "version": "v1", + "kind": "ConfigMap", + "resource": "configmaps", + "name": "work-dpfqk", + "namespace": "default" + }, + "statusFeedback": {}, + "conditions": [ + { + "type": "Applied", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "AppliedManifestComplete", + "message": "Apply manifest complete" + }, + { + "type": "Available", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "ResourceAvailable", + "message": "Resource is available" + }, + { + "type": "StatusFeedbackSynced", + "status": "True", + "lastTransitionTime": "2024-06-11T03:49:13Z", + "reason": "NoStatusFeedbackSynced", + "message": "" + } + ] + } + ] + } + } +} +``` diff --git a/examples/manifestworkclient/client-a/main.go b/examples/manifestworkclient/client-a/main.go new file mode 100644 index 00000000..db1ba428 --- /dev/null +++ b/examples/manifestworkclient/client-a/main.go @@ -0,0 +1,121 @@ +package main + +import ( + "context" + "crypto/tls" + "encoding/json" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/openshift-online/maestro/pkg/api/openapi" + "github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + + workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" +) + +const sourceID = "mw-client-example" + +var ( + maestroServerAddr = flag.String("maestro-server", "https://127.0.0.1:30080", "The Maestro server address") + grpcServerAddr = flag.String("grpc-server", "127.0.0.1:30090", "The GRPC server address") + consumerName = flag.String("consumer-name", "", "The Consumer Name") +) + +func main() { + flag.Parse() + + if len(*consumerName) == 0 { + log.Fatalf("the consumer_name is required") + } + + ctx, cancel := context.WithCancel(context.Background()) + + stopCh := make(chan os.Signal, 1) + signal.Notify(stopCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + defer cancel() + <-stopCh + }() + + maestroAPIClient := openapi.NewAPIClient(&openapi.Configuration{ + DefaultHeader: make(map[string]string), + UserAgent: "OpenAPI-Generator/1.0.0/go", + Debug: false, + Servers: openapi.ServerConfigurations{ + { + URL: *maestroServerAddr, + Description: "current domain", + }, + }, + OperationServers: map[string]openapi.ServerConfigurations{}, + HTTPClient: &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }}, + Timeout: 10 * time.Second, + }, + }) + + grpcOptions := grpc.NewGRPCOptions() + grpcOptions.URL = *grpcServerAddr + + workClient, err := work.NewClientHolderBuilder(grpcOptions). + WithClientID(fmt.Sprintf("%s-client-a", sourceID)). + WithSourceID(sourceID). + WithCodecs(codec.NewManifestBundleCodec()). + WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)). + WithResyncEnabled(false). + NewSourceClientHolder(ctx) + if err != nil { + log.Fatal(err) + } + + watcher, err := workClient.ManifestWorks(metav1.NamespaceAll).Watch(ctx, metav1.ListOptions{}) + if err != nil { + log.Fatal(err) + } + + go func() { + ch := watcher.ResultChan() + for { + select { + case <-ctx.Done(): + return + case event, ok := <-ch: + if !ok { + return + } + switch event.Type { + case watch.Modified: + fmt.Printf("watched work (uid=%s) is modified\n", event.Object.(*workv1.ManifestWork).UID) + PrintWork(event.Object.(*workv1.ManifestWork)) + case watch.Deleted: + fmt.Printf("watched work (uid=%s) is deleted\n", event.Object.(*workv1.ManifestWork).UID) + PrintWork(event.Object.(*workv1.ManifestWork)) + } + } + } + }() + + <-ctx.Done() +} + +func PrintWork(work *workv1.ManifestWork) { + workJson, err := json.MarshalIndent(work, "", " ") + if err != nil { + log.Fatal(err) + } + fmt.Printf("%s\n", string(workJson)) +} diff --git a/examples/manifestworkclient/client-b/main.go b/examples/manifestworkclient/client-b/main.go new file mode 100644 index 00000000..64eb5db0 --- /dev/null +++ b/examples/manifestworkclient/client-b/main.go @@ -0,0 +1,175 @@ +package main + +import ( + "context" + "crypto/tls" + "encoding/json" + "flag" + "log" + "net/http" + "time" + + "fmt" + + jsonpatch "github.com/evanphx/json-patch" + "github.com/openshift-online/maestro/pkg/api/openapi" + "github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" +) + +const sourceID = "mw-client-example" + +var ( + maestroServerAddr = flag.String("maestro-server", "https://127.0.0.1:30080", "The Maestro server address") + grpcServerAddr = flag.String("grpc-server", "127.0.0.1:30090", "The GRPC server address") + consumerName = flag.String("consumer-name", "", "The Consumer Name") +) + +func main() { + flag.Parse() + + if len(*consumerName) == 0 { + log.Fatalf("the consumer_name is required") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + maestroAPIClient := openapi.NewAPIClient(&openapi.Configuration{ + DefaultHeader: make(map[string]string), + UserAgent: "OpenAPI-Generator/1.0.0/go", + Debug: false, + Servers: openapi.ServerConfigurations{ + { + URL: *maestroServerAddr, + Description: "current domain", + }, + }, + OperationServers: map[string]openapi.ServerConfigurations{}, + HTTPClient: &http.Client{ + Transport: &http.Transport{TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }}, + Timeout: 10 * time.Second, + }, + }) + + grpcOptions := grpc.NewGRPCOptions() + grpcOptions.URL = *grpcServerAddr + + workClient, err := work.NewClientHolderBuilder(grpcOptions). + WithClientID(fmt.Sprintf("%s-client-b", sourceID)). + WithSourceID(sourceID). + WithCodecs(codec.NewManifestBundleCodec()). + WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(maestroAPIClient, sourceID)). + WithResyncEnabled(false). + NewSourceClientHolder(ctx) + if err != nil { + log.Fatal(err) + } + + // use workClient to create/get/patch/delete work + workName := "work-" + rand.String(5) + _, err = workClient.ManifestWorks(*consumerName).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) + if err != nil { + log.Fatal(err) + } + + <-time.After(5 * time.Second) + + work, err := workClient.ManifestWorks(*consumerName).Get(ctx, workName, metav1.GetOptions{}) + if err != nil { + log.Fatal(err) + } + fmt.Printf("the work %s/%s (uid=%s) is created\n", *consumerName, workName, work.UID) + + newWork := work.DeepCopy() + newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)} + patchData, err := ToWorkPatch(work, newWork) + if err != nil { + log.Fatal(err) + } + _, err = workClient.ManifestWorks(*consumerName).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) + if err != nil { + log.Fatal(err) + } + fmt.Printf("the work %s/%s (uid=%s) is updated\n", *consumerName, workName, work.UID) + + <-time.After(5 * time.Second) + + err = workClient.ManifestWorks(*consumerName).Delete(ctx, workName, metav1.DeleteOptions{}) + if err != nil { + log.Fatal(err) + } + fmt.Printf("the work %s/%s (uid=%s) is deleted\n", *consumerName, workName, work.UID) +} + +func NewManifestWork(name string) *workv1.ManifestWork { + return &workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "work.label": "example", + }, + Annotations: map[string]string{ + "work.annotations": "example", + }, + }, + Spec: workv1.ManifestWorkSpec{ + Workload: workv1.ManifestsTemplate{ + Manifests: []workv1.Manifest{ + NewManifest(name), + }, + }, + }, + } +} + +func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) { + oldData, err := json.Marshal(old) + if err != nil { + return nil, err + } + + newData, err := json.Marshal(new) + if err != nil { + return nil, err + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return nil, err + } + + return patchBytes, nil +} + +func NewManifest(name string) workv1.Manifest { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": name, + }, + "data": map[string]string{ + "test": rand.String(5), + }, + }, + } + objectStr, _ := obj.MarshalJSON() + manifest := workv1.Manifest{} + manifest.Raw = objectStr + return manifest +} diff --git a/go.mod b/go.mod index df00c0fa..4026979b 100755 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.15.3-0.20240329120647-e6a74efbacbf github.com/deckarep/golang-set/v2 v2.6.0 github.com/docker/go-healthcheck v0.1.0 + github.com/evanphx/json-patch v5.9.0+incompatible github.com/getsentry/sentry-go v0.20.0 github.com/ghodss/yaml v1.0.0 github.com/go-gormigrate/gormigrate/v2 v2.0.0 @@ -37,13 +38,14 @@ require ( gorm.io/datatypes v1.2.0 gorm.io/driver/postgres v1.5.0 gorm.io/gorm v1.24.7-0.20230306060331-85eaf9eeda11 + k8s.io/api v0.29.4 k8s.io/apimachinery v0.29.4 k8s.io/client-go v0.29.4 k8s.io/component-base v0.29.3 k8s.io/klog/v2 v2.120.1 open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4 - open-cluster-management.io/sdk-go v0.13.1-0.20240606075054-7671bb086504 + open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6 ) require ( @@ -66,7 +68,6 @@ require ( github.com/docker/distribution v2.8.1+incompatible // indirect github.com/eclipse/paho.golang v0.11.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/evanphx/json-patch v5.9.0+incompatible // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-logr/logr v1.4.1 // indirect @@ -149,7 +150,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gorm.io/driver/mysql v1.4.7 // indirect - k8s.io/api v0.29.4 // indirect k8s.io/apiextensions-apiserver v0.29.3 // indirect k8s.io/apiserver v0.29.3 // indirect k8s.io/kms v0.29.3 // indirect diff --git a/go.sum b/go.sum index 61940ad0..726793bb 100755 --- a/go.sum +++ b/go.sum @@ -1173,16 +1173,12 @@ k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/A k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0gQBEuevE/AaBsHY= k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba h1:UsXnD4/N7pxYupPgoLvTq8wO73V72vD2D2ZkDd4iws0= -open-cluster-management.io/api v0.13.1-0.20240521030453-9d94703b9eba/go.mod h1:yrNuMMpciXjXPnj2yznb6LTyrGliiTrFZAJDp/Ck3c4= open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc h1:tcfncubZRFphYtDXBE7ApBNlSnj1RNazhW+8F01XYYg= open-cluster-management.io/api v0.13.1-0.20240605083248-f9e7f50520fc/go.mod h1:ltijKJhDifrPH0csvCUmFt5lzaERv+BBfh6X3l83rT0= -open-cluster-management.io/ocm v0.13.1-0.20240606073633-61a74bb348a4 h1:waZV6MKK01jRUmUyLyLRLZUjCdaCvs/AJwqQPNcEHoo= -open-cluster-management.io/ocm v0.13.1-0.20240606073633-61a74bb348a4/go.mod h1:em/6OHu/z7vkwoQanhC3i3lA8bQiKuBQlik6cyk3E5o= open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4 h1:Z3gwbMUZmblGTHFx6iOO1zlCJc2FJcAJsa2RASTKDqA= open-cluster-management.io/ocm v0.13.1-0.20240612012446-8e792c14d8f4/go.mod h1:RuYCuKuVJzNxRBkSoQnxyJxyUqOyCH388DlR/QDr7rE= -open-cluster-management.io/sdk-go v0.13.1-0.20240606075054-7671bb086504 h1:65KSUUpUapTbaeMx+MuxCqXRyrR3KGtOSFuoJwoXmMA= -open-cluster-management.io/sdk-go v0.13.1-0.20240606075054-7671bb086504/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= +open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6 h1:/nPyxceSdi66Vs+A9LJ+7X6kLe4xK98dBMi4adZv1d0= +open-cluster-management.io/sdk-go v0.13.1-0.20240607073142-990fcdba50a6/go.mod h1:muWzHWsgK8IsopltwTnsBjf4DN9IcC9rF0G2uEq/Pjw= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/openapi/openapi.yaml b/openapi/openapi.yaml index 6f13455d..cd66b885 100755 --- a/openapi/openapi.yaml +++ b/openapi/openapi.yaml @@ -641,6 +641,8 @@ components: deleted_at: type: string format: date-time + metadata: + type: object manifests: type: array items: diff --git a/pkg/api/openapi/api/openapi.yaml b/pkg/api/openapi/api/openapi.yaml index 0cb3048a..ed2aaebd 100644 --- a/pkg/api/openapi/api/openapi.yaml +++ b/pkg/api/openapi/api/openapi.yaml @@ -1047,6 +1047,8 @@ components: deleted_at: format: date-time type: string + metadata: + type: object manifests: items: type: object diff --git a/pkg/api/openapi/docs/ResourceBundle.md b/pkg/api/openapi/docs/ResourceBundle.md index 354756e5..2c8b5728 100644 --- a/pkg/api/openapi/docs/ResourceBundle.md +++ b/pkg/api/openapi/docs/ResourceBundle.md @@ -13,6 +13,7 @@ Name | Type | Description | Notes **CreatedAt** | Pointer to **time.Time** | | [optional] **UpdatedAt** | Pointer to **time.Time** | | [optional] **DeletedAt** | Pointer to **time.Time** | | [optional] +**Metadata** | Pointer to **map[string]interface{}** | | [optional] **Manifests** | Pointer to **[]map[string]interface{}** | | [optional] **DeleteOption** | Pointer to **map[string]interface{}** | | [optional] **ManifestConfigs** | Pointer to **[]map[string]interface{}** | | [optional] @@ -262,6 +263,31 @@ SetDeletedAt sets DeletedAt field to given value. HasDeletedAt returns a boolean if a field has been set. +### GetMetadata + +`func (o *ResourceBundle) GetMetadata() map[string]interface{}` + +GetMetadata returns the Metadata field if non-nil, zero value otherwise. + +### GetMetadataOk + +`func (o *ResourceBundle) GetMetadataOk() (*map[string]interface{}, bool)` + +GetMetadataOk returns a tuple with the Metadata field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetMetadata + +`func (o *ResourceBundle) SetMetadata(v map[string]interface{})` + +SetMetadata sets Metadata field to given value. + +### HasMetadata + +`func (o *ResourceBundle) HasMetadata() bool` + +HasMetadata returns a boolean if a field has been set. + ### GetManifests `func (o *ResourceBundle) GetManifests() []map[string]interface{}` diff --git a/pkg/api/openapi/docs/ResourceBundleAllOf.md b/pkg/api/openapi/docs/ResourceBundleAllOf.md index 254a2c12..cde4a5b8 100644 --- a/pkg/api/openapi/docs/ResourceBundleAllOf.md +++ b/pkg/api/openapi/docs/ResourceBundleAllOf.md @@ -10,6 +10,7 @@ Name | Type | Description | Notes **CreatedAt** | Pointer to **time.Time** | | [optional] **UpdatedAt** | Pointer to **time.Time** | | [optional] **DeletedAt** | Pointer to **time.Time** | | [optional] +**Metadata** | Pointer to **map[string]interface{}** | | [optional] **Manifests** | Pointer to **[]map[string]interface{}** | | [optional] **DeleteOption** | Pointer to **map[string]interface{}** | | [optional] **ManifestConfigs** | Pointer to **[]map[string]interface{}** | | [optional] @@ -184,6 +185,31 @@ SetDeletedAt sets DeletedAt field to given value. HasDeletedAt returns a boolean if a field has been set. +### GetMetadata + +`func (o *ResourceBundleAllOf) GetMetadata() map[string]interface{}` + +GetMetadata returns the Metadata field if non-nil, zero value otherwise. + +### GetMetadataOk + +`func (o *ResourceBundleAllOf) GetMetadataOk() (*map[string]interface{}, bool)` + +GetMetadataOk returns a tuple with the Metadata field if it's non-nil, zero value otherwise +and a boolean to check if the value has been set. + +### SetMetadata + +`func (o *ResourceBundleAllOf) SetMetadata(v map[string]interface{})` + +SetMetadata sets Metadata field to given value. + +### HasMetadata + +`func (o *ResourceBundleAllOf) HasMetadata() bool` + +HasMetadata returns a boolean if a field has been set. + ### GetManifests `func (o *ResourceBundleAllOf) GetManifests() []map[string]interface{}` diff --git a/pkg/api/openapi/model_resource_bundle.go b/pkg/api/openapi/model_resource_bundle.go index 15e11398..686b5920 100644 --- a/pkg/api/openapi/model_resource_bundle.go +++ b/pkg/api/openapi/model_resource_bundle.go @@ -29,6 +29,7 @@ type ResourceBundle struct { CreatedAt *time.Time `json:"created_at,omitempty"` UpdatedAt *time.Time `json:"updated_at,omitempty"` DeletedAt *time.Time `json:"deleted_at,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` Manifests []map[string]interface{} `json:"manifests,omitempty"` DeleteOption map[string]interface{} `json:"delete_option,omitempty"` ManifestConfigs []map[string]interface{} `json:"manifest_configs,omitempty"` @@ -340,6 +341,38 @@ func (o *ResourceBundle) SetDeletedAt(v time.Time) { o.DeletedAt = &v } +// GetMetadata returns the Metadata field value if set, zero value otherwise. +func (o *ResourceBundle) GetMetadata() map[string]interface{} { + if o == nil || IsNil(o.Metadata) { + var ret map[string]interface{} + return ret + } + return o.Metadata +} + +// GetMetadataOk returns a tuple with the Metadata field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *ResourceBundle) GetMetadataOk() (map[string]interface{}, bool) { + if o == nil || IsNil(o.Metadata) { + return map[string]interface{}{}, false + } + return o.Metadata, true +} + +// HasMetadata returns a boolean if a field has been set. +func (o *ResourceBundle) HasMetadata() bool { + if o != nil && !IsNil(o.Metadata) { + return true + } + + return false +} + +// SetMetadata gets a reference to the given map[string]interface{} and assigns it to the Metadata field. +func (o *ResourceBundle) SetMetadata(v map[string]interface{}) { + o.Metadata = v +} + // GetManifests returns the Manifests field value if set, zero value otherwise. func (o *ResourceBundle) GetManifests() []map[string]interface{} { if o == nil || IsNil(o.Manifests) { @@ -505,6 +538,9 @@ func (o ResourceBundle) ToMap() (map[string]interface{}, error) { if !IsNil(o.DeletedAt) { toSerialize["deleted_at"] = o.DeletedAt } + if !IsNil(o.Metadata) { + toSerialize["metadata"] = o.Metadata + } if !IsNil(o.Manifests) { toSerialize["manifests"] = o.Manifests } diff --git a/pkg/api/openapi/model_resource_bundle_all_of.go b/pkg/api/openapi/model_resource_bundle_all_of.go index 0eee6e65..f6d84ef5 100644 --- a/pkg/api/openapi/model_resource_bundle_all_of.go +++ b/pkg/api/openapi/model_resource_bundle_all_of.go @@ -26,6 +26,7 @@ type ResourceBundleAllOf struct { CreatedAt *time.Time `json:"created_at,omitempty"` UpdatedAt *time.Time `json:"updated_at,omitempty"` DeletedAt *time.Time `json:"deleted_at,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` Manifests []map[string]interface{} `json:"manifests,omitempty"` DeleteOption map[string]interface{} `json:"delete_option,omitempty"` ManifestConfigs []map[string]interface{} `json:"manifest_configs,omitempty"` @@ -241,6 +242,38 @@ func (o *ResourceBundleAllOf) SetDeletedAt(v time.Time) { o.DeletedAt = &v } +// GetMetadata returns the Metadata field value if set, zero value otherwise. +func (o *ResourceBundleAllOf) GetMetadata() map[string]interface{} { + if o == nil || IsNil(o.Metadata) { + var ret map[string]interface{} + return ret + } + return o.Metadata +} + +// GetMetadataOk returns a tuple with the Metadata field value if set, nil otherwise +// and a boolean to check if the value has been set. +func (o *ResourceBundleAllOf) GetMetadataOk() (map[string]interface{}, bool) { + if o == nil || IsNil(o.Metadata) { + return map[string]interface{}{}, false + } + return o.Metadata, true +} + +// HasMetadata returns a boolean if a field has been set. +func (o *ResourceBundleAllOf) HasMetadata() bool { + if o != nil && !IsNil(o.Metadata) { + return true + } + + return false +} + +// SetMetadata gets a reference to the given map[string]interface{} and assigns it to the Metadata field. +func (o *ResourceBundleAllOf) SetMetadata(v map[string]interface{}) { + o.Metadata = v +} + // GetManifests returns the Manifests field value if set, zero value otherwise. func (o *ResourceBundleAllOf) GetManifests() []map[string]interface{} { if o == nil || IsNil(o.Manifests) { @@ -397,6 +430,9 @@ func (o ResourceBundleAllOf) ToMap() (map[string]interface{}, error) { if !IsNil(o.DeletedAt) { toSerialize["deleted_at"] = o.DeletedAt } + if !IsNil(o.Metadata) { + toSerialize["metadata"] = o.Metadata + } if !IsNil(o.Manifests) { toSerialize["manifests"] = o.Manifests } diff --git a/pkg/api/presenters/resource.go b/pkg/api/presenters/resource.go index 904cbd8c..c582bc09 100755 --- a/pkg/api/presenters/resource.go +++ b/pkg/api/presenters/resource.go @@ -73,7 +73,7 @@ func PresentResource(resource *api.Resource) (*openapi.Resource, error) { // PresentResourceBundle converts a resource from the API to the openapi representation. func PresentResourceBundle(resource *api.Resource) (*openapi.ResourceBundle, error) { - manifestBundle, err := api.DecodeManifestBundle(resource.Payload) + metadata, manifestBundle, err := api.DecodeManifestBundle(resource.Payload) if err != nil { return nil, err } @@ -122,6 +122,7 @@ func PresentResourceBundle(resource *api.Resource) (*openapi.ResourceBundle, err } manifestConfigs = append(manifestConfigs, m) } + res := &openapi.ResourceBundle{ Id: reference.Id, Kind: reference.Kind, @@ -131,6 +132,7 @@ func PresentResourceBundle(resource *api.Resource) (*openapi.ResourceBundle, err Version: openapi.PtrInt32(resource.Version), CreatedAt: openapi.PtrTime(resource.CreatedAt), UpdatedAt: openapi.PtrTime(resource.UpdatedAt), + Metadata: metadata, Manifests: manifests, DeleteOption: deleteOption, ManifestConfigs: manifestConfigs, diff --git a/pkg/api/resource_bundle_types.go b/pkg/api/resource_bundle_types.go index a27745c8..8ad881fe 100755 --- a/pkg/api/resource_bundle_types.go +++ b/pkg/api/resource_bundle_types.go @@ -19,22 +19,35 @@ type ResourceBundleStatus struct { // DecodeManifestBundle converts a CloudEvent JSONMap representation of a list of resource manifest // into manifest bundle payload. -func DecodeManifestBundle(manifest datatypes.JSONMap) (*workpayload.ManifestBundle, error) { +func DecodeManifestBundle(manifest datatypes.JSONMap) (map[string]any, *workpayload.ManifestBundle, error) { if len(manifest) == 0 { - return nil, nil + return nil, nil, nil } evt, err := JSONMAPToCloudEvent(manifest) if err != nil { - return nil, fmt.Errorf("failed to convert resource manifest to cloudevent: %v", err) + return nil, nil, fmt.Errorf("failed to convert resource manifest to cloudevent: %v", err) + } + + metaData := map[string]any{} + extensions := evt.Extensions() + if meta, ok := extensions["metadata"]; ok { + metaJson, err := cloudeventstypes.ToString(meta) + if err != nil { + return nil, nil, err + } + + if err := json.Unmarshal([]byte(metaJson), &metaData); err != nil { + return nil, nil, err + } } eventPayload := &workpayload.ManifestBundle{} if err := evt.DataAs(eventPayload); err != nil { - return nil, fmt.Errorf("failed to decode cloudevent payload as resource manifest bundle: %v", err) + return nil, nil, fmt.Errorf("failed to decode cloudevent payload as resource manifest bundle: %v", err) } - return eventPayload, nil + return metaData, eventPayload, nil } // DecodeManifestBundleToObjects converts a CloudEvent JSONMap representation of a list of resource manifest @@ -44,7 +57,7 @@ func DecodeManifestBundleToObjects(manifest datatypes.JSONMap) ([]map[string]int return nil, nil } - eventPayload, err := DecodeManifestBundle(manifest) + _, eventPayload, err := DecodeManifestBundle(manifest) if err != nil { return nil, err } diff --git a/pkg/api/resource_bundle_types_test.go b/pkg/api/resource_bundle_types_test.go index a021ca99..f2592b53 100644 --- a/pkg/api/resource_bundle_types_test.go +++ b/pkg/api/resource_bundle_types_test.go @@ -62,7 +62,7 @@ func TestDecodeManifestBundle(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - got, err := DecodeManifestBundle(c.input) + _, got, err := DecodeManifestBundle(c.input) if err != nil { if err.Error() != c.expectedErrorMsg { t.Errorf("expected %#v but got: %#v", c.expectedErrorMsg, err) diff --git a/pkg/auth/authz_middleware_mock.go b/pkg/auth/authz_middleware_mock.go index de672870..5e88ecba 100755 --- a/pkg/auth/authz_middleware_mock.go +++ b/pkg/auth/authz_middleware_mock.go @@ -1,8 +1,9 @@ package auth import ( - "github.com/golang/glog" "net/http" + + "github.com/golang/glog" ) type authzMiddlewareMock struct{} @@ -15,7 +16,7 @@ func NewAuthzMiddlewareMock() AuthorizationMiddleware { func (a authzMiddlewareMock) AuthorizeApi(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - glog.Infof("Mock authz allows / for %q/%q", r.Method, r.URL) + glog.V(10).Infof("Mock authz allows / for %q/%q", r.Method, r.URL) next.ServeHTTP(w, r) }) } diff --git a/pkg/client/cloudevents/grpcsource/util.go b/pkg/client/cloudevents/grpcsource/util.go new file mode 100644 index 00000000..966e701c --- /dev/null +++ b/pkg/client/cloudevents/grpcsource/util.go @@ -0,0 +1,97 @@ +package grpcsource + +import ( + "encoding/json" + + "github.com/openshift-online/maestro/pkg/api/openapi" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" +) + +// ToManifestWork converts an openapi.ResourceBundle object to workv1.ManifestWork object +func ToManifestWork(rb *openapi.ResourceBundle) (*workv1.ManifestWork, error) { + work := &workv1.ManifestWork{} + + // get meta from resource + metaJson, err := marshal(rb.Metadata) + if err != nil { + return nil, err + } + objectMeta := metav1.ObjectMeta{} + if err := json.Unmarshal(metaJson, &objectMeta); err != nil { + return nil, err + } + work.ObjectMeta = objectMeta + + // get spec from resource + manifests := []workv1.Manifest{} + for _, manifest := range rb.Manifests { + raw, err := marshal(manifest) + if err != nil { + return nil, err + } + manifests = append(manifests, workv1.Manifest{RawExtension: runtime.RawExtension{Raw: raw}}) + } + work.Spec.Workload.Manifests = manifests + + if len(rb.DeleteOption) != 0 { + optionJson, err := marshal(rb.DeleteOption) + if err != nil { + return nil, err + } + option := &workv1.DeleteOption{} + if err := json.Unmarshal(optionJson, option); err != nil { + return nil, err + } + work.Spec.DeleteOption = option + } + + configs := []workv1.ManifestConfigOption{} + for _, manifestConfig := range rb.ManifestConfigs { + configJson, err := marshal(manifestConfig) + if err != nil { + return nil, err + } + config := workv1.ManifestConfigOption{} + if err := json.Unmarshal(configJson, &config); err != nil { + return nil, err + } + configs = append(configs, config) + + } + work.Spec.ManifestConfigs = configs + + // get status from resource + if len(rb.Status) != 0 { + status, err := json.Marshal(rb.Status) + if err != nil { + return nil, err + } + manifestStatus := &payload.ManifestBundleStatus{} + if err := json.Unmarshal(status, manifestStatus); err != nil { + return nil, err + } + + work.Status = workv1.ManifestWorkStatus{ + Conditions: manifestStatus.Conditions, + ResourceStatus: workv1.ManifestResourceStatus{ + Manifests: manifestStatus.ResourceStatus, + }, + } + } + + return work, nil +} + +func marshal(obj map[string]any) ([]byte, error) { + unstructuredObj := unstructured.Unstructured{Object: obj} + data, err := unstructuredObj.MarshalJSON() + if err != nil { + return nil, err + } + + return data, nil +} diff --git a/pkg/client/cloudevents/grpcsource/util_test.go b/pkg/client/cloudevents/grpcsource/util_test.go new file mode 100644 index 00000000..a4fd4f7b --- /dev/null +++ b/pkg/client/cloudevents/grpcsource/util_test.go @@ -0,0 +1,132 @@ +package grpcsource + +import ( + "testing" + + "github.com/openshift-online/maestro/pkg/api/openapi" + "k8s.io/apimachinery/pkg/api/equality" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + workv1 "open-cluster-management.io/api/work/v1" +) + +func TestToManifestWork(t *testing.T) { + workload, err := marshal(map[string]interface{}{"a": "b"}) + if err != nil { + t.Fatal(err) + } + + cases := []struct { + name string + input *openapi.ResourceBundle + expected *workv1.ManifestWork + }{ + { + name: "covert a resource bundle - has empty fields", + input: &openapi.ResourceBundle{ + Metadata: map[string]interface{}{ + "name": "test", + "namespace": "testns", + }, + Manifests: []map[string]interface{}{ + {"a": "b"}, + }, + DeleteOption: map[string]any{}, + ManifestConfigs: []map[string]interface{}{}, + Status: nil, + }, + expected: &workv1.ManifestWork{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "testns", + }, + Spec: workv1.ManifestWorkSpec{ + Workload: workv1.ManifestsTemplate{ + Manifests: []workv1.Manifest{ + { + RawExtension: runtime.RawExtension{ + Raw: workload, + }, + }, + }, + }, + }, + }, + }, + { + name: "covert a resource bundle", + input: &openapi.ResourceBundle{ + Metadata: map[string]interface{}{ + "name": "test", + "namespace": "testns", + }, + Manifests: []map[string]interface{}{ + {"a": "b"}, + }, + DeleteOption: map[string]any{ + "propagationPolicy": "Foreground", + }, + ManifestConfigs: []map[string]interface{}{ + { + "resourceIdentifier": map[string]interface{}{ + "name": "test", + }, + }, + }, + Status: map[string]interface{}{ + "conditions": []map[string]interface{}{ + { + "type": "Test", + }, + }, + }, + }, + expected: &workv1.ManifestWork{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "testns", + }, + Spec: workv1.ManifestWorkSpec{ + Workload: workv1.ManifestsTemplate{ + Manifests: []workv1.Manifest{ + { + RawExtension: runtime.RawExtension{ + Raw: workload, + }, + }, + }, + }, + DeleteOption: &workv1.DeleteOption{ + PropagationPolicy: workv1.DeletePropagationPolicyTypeForeground, + }, + ManifestConfigs: []workv1.ManifestConfigOption{ + { + ResourceIdentifier: workv1.ResourceIdentifier{Name: "test"}, + }, + }, + }, + Status: workv1.ManifestWorkStatus{ + Conditions: []v1.Condition{ + { + Type: "Test", + }, + }, + }, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + work, err := ToManifestWork(c.input) + if err != nil { + t.Errorf("unexpected error %v", err) + } + + if !equality.Semantic.DeepEqual(c.expected, work) { + t.Errorf("expected %v, but got %v", c.expected, work) + } + }) + } +} diff --git a/pkg/client/cloudevents/grpcsource/watcherstore.go b/pkg/client/cloudevents/grpcsource/watcherstore.go new file mode 100644 index 00000000..4ae5e020 --- /dev/null +++ b/pkg/client/cloudevents/grpcsource/watcherstore.go @@ -0,0 +1,160 @@ +package grpcsource + +import ( + "context" + "fmt" + "net/http" + "sync" + + "github.com/openshift-online/maestro/pkg/api/openapi" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" +) + +// RESTFulAPIWatcherStore implements the WorkClientWatcherStore interface, it is +// used to build a source work client. The work client uses this store to +// - get/list works from Maestro server via RESTfull APIs +// - receive the work status update and send the updated work to the watch channel +type RESTFulAPIWatcherStore struct { + sync.RWMutex + + result chan watch.Event + done chan struct{} + watcherStopped bool + + sourceID string + apiClient *openapi.APIClient +} + +var _ store.WorkClientWatcherStore = &RESTFulAPIWatcherStore{} + +func NewRESTFullAPIWatcherStore(apiClient *openapi.APIClient, sourceID string) *RESTFulAPIWatcherStore { + return &RESTFulAPIWatcherStore{ + result: make(chan watch.Event), + done: make(chan struct{}), + watcherStopped: false, + + sourceID: sourceID, + apiClient: apiClient, + } +} + +// ResultChan implements watch interface. +func (m *RESTFulAPIWatcherStore) ResultChan() <-chan watch.Event { + return m.result +} + +// Stop implements watch interface. +func (m *RESTFulAPIWatcherStore) Stop() { + // Call Close() exactly once by locking and setting a flag. + m.Lock() + defer m.Unlock() + // closing a closed channel always panics, therefore check before closing + select { + case <-m.done: + close(m.result) + default: + m.watcherStopped = true + close(m.done) + } +} + +// HandleReceivedWork sends the received works to the watch channel +func (m *RESTFulAPIWatcherStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error { + if m.isWatcherStopped() { + // watcher is stopped, do nothing. + return nil + } + + switch action { + case types.StatusModified: + watchType := watch.Modified + if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) { + watchType = watch.Deleted + } + + m.result <- watch.Event{Type: watchType, Object: work} + return nil + default: + return fmt.Errorf("unknown resource action %s", action) + } +} + +func (m *RESTFulAPIWatcherStore) Get(namespace, name string) (*workv1.ManifestWork, error) { + id := utils.UID(m.sourceID, namespace, name) + rb, resp, err := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesIdGet(context.Background(), id).Execute() + if err != nil { + if resp != nil && resp.StatusCode == http.StatusNotFound { + return nil, errors.NewNotFound(common.ManifestWorkGR, id) + } + + return nil, err + } + + return ToManifestWork(rb) +} + +func (m *RESTFulAPIWatcherStore) List(opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { + works := []*workv1.ManifestWork{} + + apiRequest := m.apiClient.DefaultApi.ApiMaestroV1ResourceBundlesGet(context.Background()). + Search(fmt.Sprintf("source = '%s'", m.sourceID)) + + // TODO filter works by labels + + rbs, _, err := apiRequest.Execute() + if err != nil { + return nil, err + } + + for _, rb := range rbs.Items { + work, err := ToManifestWork(&rb) + if err != nil { + return nil, err + } + + works = append(works, work) + } + + return works, nil +} + +func (m *RESTFulAPIWatcherStore) ListAll() ([]*workv1.ManifestWork, error) { + return m.List(metav1.ListOptions{}) +} + +func (m *RESTFulAPIWatcherStore) Add(work *workv1.ManifestWork) error { + // do nothing + return nil +} + +func (m *RESTFulAPIWatcherStore) Update(work *workv1.ManifestWork) error { + // do nothing + return nil +} + +func (m *RESTFulAPIWatcherStore) Delete(work *workv1.ManifestWork) error { + // do nothing + return nil +} + +func (m *RESTFulAPIWatcherStore) HasInitiated() bool { + return true +} + +func (m *RESTFulAPIWatcherStore) isWatcherStopped() bool { + m.RLock() + defer m.RUnlock() + + return m.watcherStopped +} diff --git a/pkg/client/cloudevents/source_client.go b/pkg/client/cloudevents/source_client.go index 52563fb2..85588b9f 100644 --- a/pkg/client/cloudevents/source_client.go +++ b/pkg/client/cloudevents/source_client.go @@ -56,7 +56,7 @@ func (s *SourceClientImpl) OnCreate(ctx context.Context, id string) error { return err } - logger.Infof("Publishing resource %s for db row insert", resource.ID) + logger.V(4).Infof("Publishing resource %s for db row insert", resource.ID) eventType := cetypes.CloudEventsType{ CloudEventsDataType: s.Codec.EventDataType(), SubResource: cetypes.SubResourceSpec, @@ -81,7 +81,7 @@ func (s *SourceClientImpl) OnUpdate(ctx context.Context, id string) error { return err } - logger.Infof("Publishing resource %s for db row update", resource.ID) + logger.V(4).Infof("Publishing resource %s for db row update", resource.ID) eventType := cetypes.CloudEventsType{ CloudEventsDataType: s.Codec.EventDataType(), SubResource: cetypes.SubResourceSpec, @@ -110,7 +110,7 @@ func (s *SourceClientImpl) OnDelete(ctx context.Context, id string) error { if resource.Meta.DeletedAt.Time.IsZero() { return fmt.Errorf("resource %s has not been marked as deleting", resource.ID) } - logger.Infof("Publishing resource %s for db row delete", resource.ID) + logger.V(4).Infof("Publishing resource %s for db row delete", resource.ID) eventType := cetypes.CloudEventsType{ CloudEventsDataType: s.Codec.EventDataType(), SubResource: cetypes.SubResourceSpec, @@ -134,7 +134,7 @@ func (s *SourceClientImpl) Subscribe(ctx context.Context, handlers ...cegeneric. func (s *SourceClientImpl) Resync(ctx context.Context, consumers []string) error { logger := logger.NewOCMLogger(ctx) - logger.Infof("Resyncing resource status from consumers") + logger.V(4).Infof("Resyncing resource status from consumers %v", consumers) for _, consumer := range consumers { if err := s.CloudEventSourceClient.Resync(ctx, consumer); err != nil { diff --git a/pkg/db/advisory_locks.go b/pkg/db/advisory_locks.go index 712e0f9c..6872640c 100755 --- a/pkg/db/advisory_locks.go +++ b/pkg/db/advisory_locks.go @@ -106,7 +106,7 @@ func (f *AdvisoryLockFactory) NewAdvisoryLock(ctx context.Context, id string, lo return *lock.uuid, fmt.Errorf(errMsg) } - log.V(4).Info(fmt.Sprintf("Locked advisory lock id=%s type=%s - owner=%s", id, lockType, *lock.uuid)) + log.V(10).Info(fmt.Sprintf("Locked advisory lock id=%s type=%s - owner=%s", id, lockType, *lock.uuid)) f.lockStore.add(*lock.uuid, lock) return *lock.uuid, nil } @@ -130,7 +130,7 @@ func (f *AdvisoryLockFactory) NewNonBlockingLock(ctx context.Context, id string, return *lock.uuid, false, fmt.Errorf(errMsg) } - log.V(4).Info(fmt.Sprintf("Locked non blocking advisory lock id=%s type=%s - owner=%s", id, lockType, *lock.uuid)) + log.V(10).Info(fmt.Sprintf("Locked non blocking advisory lock id=%s type=%s - owner=%s", id, lockType, *lock.uuid)) f.lockStore.add(*lock.uuid, lock) return *lock.uuid, acquired, nil } @@ -165,7 +165,7 @@ func (f *AdvisoryLockFactory) Unlock(ctx context.Context, uuid string) { // the resolving UUID belongs to a service call that did *not* initiate the lock. // we can safely ignore this, knowing the top-most func in the call stack // will provide the correct UUID. - log.V(4).Info(fmt.Sprintf("Caller not lock owner. Owner %s", uuid)) + log.V(10).Info(fmt.Sprintf("Caller not lock owner. Owner %s", uuid)) return } @@ -183,7 +183,7 @@ func (f *AdvisoryLockFactory) Unlock(ctx context.Context, uuid string) { UpdateAdvisoryLockCountMetric(lockType, "OK") UpdateAdvisoryLockDurationMetric(lockType, "OK", lock.startTime) - log.V(4).Info(fmt.Sprintf("Unlocked lock id=%s type=%s - owner=%s", lockID, lockType, uuid)) + log.V(10).Info(fmt.Sprintf("Unlocked lock id=%s type=%s - owner=%s", lockID, lockType, uuid)) f.lockStore.delete(uuid) } diff --git a/pkg/db/db_session/default.go b/pkg/db/db_session/default.go index e31a3412..6d39fde4 100755 --- a/pkg/db/db_session/default.go +++ b/pkg/db/db_session/default.go @@ -106,7 +106,7 @@ func waitForNotification(ctx context.Context, l *pq.Listener, callback func(id s return case n := <-l.Notify: if n != nil { - logger.Infof("Received data from channel [%s] : %s", n.Channel, n.Extra) + logger.V(4).Infof("Received event from channel [%s] : %s", n.Channel, n.Extra) callback(n.Extra) } case <-time.After(10 * time.Second): diff --git a/pkg/event/event.go b/pkg/event/event.go index 507985c3..5ac7a56d 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/golang/glog" "github.com/google/uuid" "github.com/openshift-online/maestro/pkg/api" ) @@ -50,6 +51,8 @@ func (h *EventBroadcaster) Register(source string, handler resourceHandler) (str errChan: errChan, } + glog.V(4).Infof("register a broadcaster client %s (source=%s)", id, source) + return id, errChan } @@ -69,6 +72,8 @@ func (h *EventBroadcaster) Broadcast(res *api.Resource) { // Start starts the event broadcaster and waits for events to broadcast. func (h *EventBroadcaster) Start(ctx context.Context) { + glog.Infof("Starting event broadcaster") + for { select { case <-ctx.Done(): diff --git a/templates/service-template.yml b/templates/service-template.yml index 8daef2f6..5f7ca150 100755 --- a/templates/service-template.yml +++ b/templates/service-template.yml @@ -99,6 +99,11 @@ parameters: description: HTTP server bind port value: "8000" +- name: ENABLE_GRPC + displayName: Enable gRPC + description: Enable gRPC server + value: "false" + - name: GRPC_SERVER_BINDPORT displayName: gRPC Server Bindport description: gRPC server bind port @@ -294,6 +299,7 @@ objects: - --ocm-debug=${OCM_DEBUG} - --https-cert-file=/secrets/tls/tls.crt - --https-key-file=/secrets/tls/tls.key + - --enable-grpc-server=${ENABLE_GRPC} - --grpc-tls-cert-file=/secrets/tls/tls.crt - --grpc-tls-key-file=/secrets/tls/tls.key - --acl-file=/configs/authentication/acl.yml diff --git a/test/e2e/pkg/sourceclient_test.go b/test/e2e/pkg/sourceclient_test.go new file mode 100644 index 00000000..b3e91291 --- /dev/null +++ b/test/e2e/pkg/sourceclient_test.go @@ -0,0 +1,208 @@ +package e2e_test + +import ( + "context" + "encoding/json" + "fmt" + "time" + + jsonpatch "github.com/evanphx/json-patch" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/openshift-online/maestro/pkg/client/cloudevents/grpcsource" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/watch" + + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" +) + +var _ = Describe("gRPC Source ManifestWork Client Test", func() { + Context("Watch work status with gRPC source ManifestWork client", func() { + var ctx context.Context + var cancel context.CancelFunc + + var sourceID string + var grpcOptions *grpc.GRPCOptions + + var watchedWorks []*workv1.ManifestWork + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) + + sourceID = "sourceclient-test" + rand.String(5) + + watchedWorks = []*workv1.ManifestWork{} + + grpcOptions = grpc.NewGRPCOptions() + grpcOptions.URL = grpcServerAddress + + workClient, err := work.NewClientHolderBuilder(grpcOptions). + WithClientID(fmt.Sprintf("%s-watcher", sourceID)). + WithSourceID(sourceID). + WithCodecs(codec.NewManifestBundleCodec()). + WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(apiClient, sourceID)). + WithResyncEnabled(false). + NewSourceClientHolder(ctx) + Expect(err).ShouldNot(HaveOccurred()) + + watcher, err := workClient.ManifestWorks(consumer_name).Watch(ctx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + go func() { + ch := watcher.ResultChan() + for { + select { + case <-ctx.Done(): + return + case event, ok := <-ch: + if !ok { + return + } + switch event.Type { + case watch.Modified: + if work, ok := event.Object.(*workv1.ManifestWork); ok { + watchedWorks = append(watchedWorks, work) + } + case watch.Deleted: + if work, ok := event.Object.(*workv1.ManifestWork); ok { + watchedWorks = append(watchedWorks, work) + } + } + } + } + }() + }) + + AfterEach(func() { + cancel() + }) + + It("The work status should be watched", func() { + workClient, err := work.NewClientHolderBuilder(grpcOptions). + WithClientID(fmt.Sprintf("%s-client", sourceID)). + WithSourceID(sourceID). + WithCodecs(codec.NewManifestBundleCodec()). + WithWorkClientWatcherStore(grpcsource.NewRESTFullAPIWatcherStore(apiClient, sourceID)). + WithResyncEnabled(false). + NewSourceClientHolder(ctx) + Expect(err).ShouldNot(HaveOccurred()) + + By("create a work") + workName := "work-" + rand.String(5) + _, err = workClient.ManifestWorks(consumer_name).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + By("list the works") + works, err := workClient.ManifestWorks(consumer_name).List(ctx, metav1.ListOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(len(works.Items) == 1).To(BeTrue()) + + // wait for few seconds to ensure the work status is updated by agent + <-time.After(5 * time.Second) + + By("update a work") + work, err := workClient.ManifestWorks(consumer_name).Get(ctx, workName, metav1.GetOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + newWork := work.DeepCopy() + newWork.Spec.Workload.Manifests = []workv1.Manifest{NewManifest(workName)} + patchData, err := ToWorkPatch(work, newWork) + Expect(err).ShouldNot(HaveOccurred()) + _, err = workClient.ManifestWorks(consumer_name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + // wait for few seconds to ensure the work status is updated by agent + <-time.After(5 * time.Second) + + By("delete a work") + err = workClient.ManifestWorks(consumer_name).Delete(ctx, workName, metav1.DeleteOptions{}) + Expect(err).ShouldNot(HaveOccurred()) + + Eventually(func() error { + if len(watchedWorks) < 2 { + return fmt.Errorf("unexpected watched works %v", watchedWorks) + } + + hasDeletedWork := false + for _, watchedWork := range watchedWorks { + if meta.IsStatusConditionTrue(watchedWork.Status.Conditions, common.ManifestsDeleted) { + hasDeletedWork = true + break + } + } + + if !hasDeletedWork { + return fmt.Errorf("expected the deleted works is watched, but failed") + } + + return nil + }, 30*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) + + }) + }) +}) + +func NewManifestWork(name string) *workv1.ManifestWork { + return &workv1.ManifestWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: workv1.ManifestWorkSpec{ + Workload: workv1.ManifestsTemplate{ + Manifests: []workv1.Manifest{ + NewManifest(name), + }, + }, + }, + } +} + +func ToWorkPatch(old, new *workv1.ManifestWork) ([]byte, error) { + oldData, err := json.Marshal(old) + if err != nil { + return nil, err + } + + newData, err := json.Marshal(new) + if err != nil { + return nil, err + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return nil, err + } + + return patchBytes, nil +} + +func NewManifest(name string) workv1.Manifest { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "namespace": "default", + "name": name, + }, + "data": map[string]string{ + "test": rand.String(5), + }, + }, + } + objectStr, _ := obj.MarshalJSON() + manifest := workv1.Manifest{} + manifest.Raw = objectStr + return manifest +} diff --git a/test/e2e/pkg/suite_test.go b/test/e2e/pkg/suite_test.go index 3872974a..9978e543 100644 --- a/test/e2e/pkg/suite_test.go +++ b/test/e2e/pkg/suite_test.go @@ -19,13 +19,14 @@ import ( ) var ( - apiServerAddress string - kubeconfig string - consumer_name string - kubeClient *kubernetes.Clientset - apiClient *openapi.APIClient - helper *test.Helper - T *testing.T + apiServerAddress string + grpcServerAddress string + kubeconfig string + consumer_name string + kubeClient *kubernetes.Clientset + apiClient *openapi.APIClient + helper *test.Helper + T *testing.T ) func TestE2E(t *testing.T) { @@ -37,6 +38,7 @@ func TestE2E(t *testing.T) { func init() { klog.SetOutput(GinkgoWriter) flag.StringVar(&apiServerAddress, "api-server", "", "Maestro API server address") + flag.StringVar(&grpcServerAddress, "grpc-server", "", "Maestro gRPC server address") flag.StringVar(&consumer_name, "consumer_name", "", "Consumer name is used to identify the consumer") flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig file") } diff --git a/test/e2e/setup/e2e_setup.sh b/test/e2e/setup/e2e_setup.sh index 1858386e..189ee557 100755 --- a/test/e2e/setup/e2e_setup.sh +++ b/test/e2e/setup/e2e_setup.sh @@ -30,6 +30,8 @@ nodes: extraPortMappings: - containerPort: 30080 hostPort: 30080 + - containerPort: 30090 + hostPort: 30090 EOF export KUBECONFIG=${PWD}/test/e2e/.kubeconfig @@ -60,6 +62,7 @@ kubectl $1 apply -f ./test/e2e/setup/service-ca/ # 4. deploy maestro into maestro namespace export ENABLE_JWT=false export ENABLE_OCM_MOCK=true +export ENABLE_GRPC=true kubectl create namespace $namespace || true make template \ deploy-secrets \ @@ -70,6 +73,9 @@ make template \ # expose the maestro server via nodeport kubectl patch service maestro -n $namespace -p '{"spec":{"type":"NodePort", "ports": [{"nodePort": 30080, "port": 8000, "targetPort": 8000}]}}' --type merge +# expose the maestro grpc server via nodeport +kubectl patch service maestro-grpc -n $namespace -p '{"spec":{"type":"NodePort", "ports": [{"nodePort": 30090, "port": 8090, "targetPort": 8090}]}}' --type merge + # 5. create a consumer export external_host_ip="127.0.0.1" echo $external_host_ip > ./test/e2e/.external_host_ip @@ -78,7 +84,7 @@ kubectl wait deployment maestro -n $namespace --for condition=Available=True --t sleep 5 # wait 5 seconds for the service ready # the consumer name is not specified, the consumer id will be used as the consumer name -export consumer_name=$(curl -k -X POST -H "Content-Type: application/json" https://${external_host_ip}:30080/api/maestro/v1/consumers -d '{}' | jq '.id') +export consumer_name=$(curl -k -X POST -H "Content-Type: application/json" https://${external_host_ip}:30080/api/maestro/v1/consumers -d '{}' | jq -r '.id') echo $consumer_name > ./test/e2e/.consumer_name # 6. deploy maestro agent into maestro-agent namespace