Skip to content

Commit

Permalink
building grpc source work client with maestro restful api
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Jun 13, 2024
1 parent 8bbe6dd commit 4caece9
Show file tree
Hide file tree
Showing 30 changed files with 1,482 additions and 45 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ ENABLE_JWT ?= true
ENABLE_AUTHZ ?= true
ENABLE_OCM_MOCK ?= false

ENABLE_GRPC ?= false

# Enable set images
POSTGRES_IMAGE ?= docker.io/library/postgres:14.2
MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18
Expand Down Expand Up @@ -298,6 +300,7 @@ cmds:
--param="EXTERNAL_APPS_DOMAIN=${external_apps_domain}" \
--param="CONSUMER_NAME=$(consumer_name)" \
--param="ENABLE_OCM_MOCK=$(ENABLE_OCM_MOCK)" \
--param="ENABLE_GRPC=$(ENABLE_GRPC)" \
> "templates/$*-template.json"


Expand Down Expand Up @@ -409,5 +412,7 @@ e2e-test/teardown:
e2e-test: e2e-test/teardown e2e-test/setup
ginkgo --output-dir="${PWD}/test/e2e/report" --json-report=report.json --junit-report=report.xml \
${PWD}/test/e2e/pkg -- -consumer_name=$(shell cat ${PWD}/test/e2e/.consumer_name) \
-api-server=https://$(shell cat ${PWD}/test/e2e/.external_host_ip):30080 -kubeconfig=${PWD}/test/e2e/.kubeconfig
-api-server=https://$(shell cat ${PWD}/test/e2e/.external_host_ip):30080 \
-grpc-server=$(shell cat ${PWD}/test/e2e/.external_host_ip):30090 \
-kubeconfig=${PWD}/test/e2e/.kubeconfig
.PHONY: e2e-test
54 changes: 53 additions & 1 deletion cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"fmt"
"net"
"time"

ce "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cetypes "github.com/cloudevents/sdk-go/v2/types"
"github.com/golang/glog"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
Expand All @@ -20,6 +22,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
Expand Down Expand Up @@ -182,9 +185,11 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv

select {
case err := <-errChan:
glog.Errorf("unregister client %s, error= %v", clientID, err)
svr.eventBroadcaster.Unregister(clientID)
return err
case <-subServer.Context().Done():
glog.V(10).Infof("unregister client %s", clientID)
svr.eventBroadcaster.Unregister(clientID)
return nil
}
Expand Down Expand Up @@ -246,7 +251,54 @@ func decode(eventDataType types.CloudEventsDataType, evt *ce.Event) (*api.Resour

// encode translates a resource to a cloudevent
func encode(resource *api.Resource) (*ce.Event, error) {
return api.JSONMAPToCloudEvent(resource.Status)
if resource.Type == api.ResourceTypeSingle {
// single resource, return the status directly
return api.JSONMAPToCloudEvent(resource.Status)
}

specEvt, err := api.JSONMAPToCloudEvent(resource.Payload)
if err != nil {
return nil, err
}

statusEvt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return nil, err
}

// set basic fields
evt := ce.NewEvent()
evt.SetID(uuid.New().String())
evt.SetTime(time.Now())
evt.SetType(statusEvt.Type())
evt.SetSource(statusEvt.Source())
for key, val := range statusEvt.Extensions() {
evt.SetExtension(key, val)
}

// set work meta back
if workMeta, ok := specEvt.Extensions()[codec.ExtensionWorkMeta]; ok {
evt.SetExtension(codec.ExtensionWorkMeta, workMeta)
}

// set payloads
manifestBundleStatus := &workpayload.ManifestBundleStatus{}
if err := statusEvt.DataAs(manifestBundleStatus); err != nil {
return nil, err
}

// set work spec back
manifestBundle := &workpayload.ManifestBundle{}
if err := specEvt.DataAs(manifestBundle); err != nil {
return nil, err
}
manifestBundleStatus.ManifestBundle = manifestBundle

if err := evt.SetData(ce.ApplicationJSON, manifestBundleStatus); err != nil {
return nil, err
}

return &evt, nil
}

// respondResyncStatusRequest responds to the status resync request by comparing the status hash of the resources
Expand Down
14 changes: 9 additions & 5 deletions cmd/maestro/server/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *PulseServer) Start(ctx context.Context) {
}

func (s *PulseServer) pulse(ctx context.Context) {
log.V(4).Infof("Updating heartbeat for maestro instance: %s", s.instanceID)
log.V(10).Infof("Updating heartbeat for maestro instance: %s", s.instanceID)
instance := &api.ServerInstance{
Meta: api.Meta{
ID: s.instanceID,
Expand All @@ -98,7 +98,7 @@ func (s *PulseServer) pulse(ctx context.Context) {
}

func (s *PulseServer) checkInstances(ctx context.Context) {
log.V(4).Infof("Checking liveness of maestro instances")
log.V(10).Infof("Checking liveness of maestro instances")
// lock the Instance with a fail-fast advisory lock context.
// this allows concurrent processing of many instances by one or more maestro instances exclusively.
lockOwnerID, acquired, err := s.lockFactory.NewNonBlockingLock(ctx, "maestro-instances-pulse-check", db.Instances)
Expand Down Expand Up @@ -148,7 +148,8 @@ func (s *PulseServer) checkInstances(ctx context.Context) {
// It runs asynchronously in the background until the provided context is canceled.
func (s *PulseServer) startSubscription(ctx context.Context) {
s.sourceClient.Subscribe(ctx, func(action types.ResourceAction, resource *api.Resource) error {
log.V(1).Infof("received action %s for resource %s", action, resource.ID)
log.V(4).Infof("received action %s for resource %s", action, resource.ID)

switch action {
case types.StatusModified:
found, svcErr := s.resourceService.Get(ctx, resource.ID)
Expand Down Expand Up @@ -192,18 +193,21 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
return svcErr
}

log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID)
resource.Payload = found.Payload
s.eventBroadcaster.Broadcast(resource)
return nil
}
// update the resource status
_, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource)
updatedResource, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource)
if svcErr != nil {
return svcErr
}

// broadcast the resource status updated only when the resource is updated
if updated {
s.eventBroadcaster.Broadcast(resource)
log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID)
s.eventBroadcaster.Broadcast(updatedResource)
}
default:
return fmt.Errorf("unsupported action %s", action)
Expand Down
4 changes: 2 additions & 2 deletions data/generated/openapi/openapi.go

Large diffs are not rendered by default.

Loading

0 comments on commit 4caece9

Please sign in to comment.