Skip to content

Commit

Permalink
add status controller and status events table.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jun 25, 2024
1 parent 52299c9 commit 44a199f
Show file tree
Hide file tree
Showing 23 changed files with 626 additions and 274 deletions.
1 change: 1 addition & 0 deletions cmd/maestro/environments/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func (e *Env) LoadServices() {
e.Services.Generic = NewGenericServiceLocator(e)
e.Services.Resources = NewResourceServiceLocator(e)
e.Services.Events = NewEventServiceLocator(e)
e.Services.StatusEvents = NewStatusEventServiceLocator(e)
e.Services.Consumers = NewConsumerServiceLocator(e)
}

Expand Down
8 changes: 8 additions & 0 deletions cmd/maestro/environments/service_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ func NewEventServiceLocator(env *Env) EventServiceLocator {
}
}

type StatusEventServiceLocator func() services.StatusEventService

func NewStatusEventServiceLocator(env *Env) StatusEventServiceLocator {
return func() services.StatusEventService {
return services.NewStatusEventService(dao.NewStatusEventDao(&env.Database.SessionFactory))
}
}

type ConsumerServiceLocator func() services.ConsumerService

func NewConsumerServiceLocator(env *Env) ConsumerServiceLocator {
Expand Down
9 changes: 5 additions & 4 deletions cmd/maestro/environments/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ type Handlers struct {
}

type Services struct {
Resources ResourceServiceLocator
Generic GenericServiceLocator
Events EventServiceLocator
Consumers ConsumerServiceLocator
Resources ResourceServiceLocator
Generic GenericServiceLocator
Events EventServiceLocator
StatusEvents StatusEventServiceLocator
Consumers ConsumerServiceLocator
}

type Clients struct {
Expand Down
29 changes: 22 additions & 7 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,34 @@ func NewControllersServer(pulseServer *PulseServer) *ControllersServer {
db.NewAdvisoryLockFactory(env().Database.SessionFactory),
env().Services.Events(),
),
StatusController: controllers.NewStatusController(
env().Services.StatusEvents(),
),
}

sourceClient := env().Clients.CloudEventsSource
s.KindControllerManager.Add(&controllers.ControllerConfig{
Source: "Resources",
Handlers: map[api.EventType][]controllers.ControllerHandlerFunc{
api.CreateEventType: {sourceClient.OnCreate},
api.UpdateEventType: {sourceClient.OnUpdate},
api.DeleteEventType: {sourceClient.OnDelete},
api.StatusUpdateEventType: {pulseServer.OnStatusUpdate},
api.CreateEventType: {sourceClient.OnCreate},
api.UpdateEventType: {sourceClient.OnUpdate},
api.DeleteEventType: {sourceClient.OnDelete},
},
})

s.StatusController.Add(map[api.StatusEventType][]controllers.StatusHandlerFunc{
api.StatusUpdateEventType: {pulseServer.OnStatusUpdate},
api.StatusDeleteEventType: {pulseServer.OnStatusUpdate},
})

return s
}

type ControllersServer struct {
KindControllerManager *controllers.KindControllerManager
DB db.SessionFactory
StatusController *controllers.StatusController

DB db.SessionFactory
}

// Start is a blocking call that starts this controller server
Expand All @@ -44,8 +53,14 @@ func (s ControllersServer) Start(ctx context.Context) {

log.Infof("Kind controller handling events")
go s.KindControllerManager.Run(ctx.Done())
log.Infof("Status controller handling events")
go s.StatusController.Run(ctx.Done())

log.Infof("Kind controller listening for events")
// blocking call
env().Database.SessionFactory.NewListener(ctx, "events", s.KindControllerManager.AddEvent)
go env().Database.SessionFactory.NewListener(ctx, "events", s.KindControllerManager.AddEvent)
log.Infof("Status controller listening for status events")
go env().Database.SessionFactory.NewListener(ctx, "status_events", s.StatusController.AddStatusEvent)

// block until the context is done
<-ctx.Done()
}
33 changes: 20 additions & 13 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,6 @@ func encode(resource *api.Resource) (*ce.Event, error) {
return api.JSONMAPToCloudEvent(resource.Status)
}

specEvt, err := api.JSONMAPToCloudEvent(resource.Payload)
if err != nil {
return nil, err
}

statusEvt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return nil, err
Expand All @@ -276,23 +271,35 @@ func encode(resource *api.Resource) (*ce.Event, error) {
evt.SetExtension(key, val)
}

// set work meta back
if workMeta, ok := specEvt.Extensions()[codec.ExtensionWorkMeta]; ok {
// set work meta back from status event
if workMeta, ok := statusEvt.Extensions()[codec.ExtensionWorkMeta]; ok {
evt.SetExtension(codec.ExtensionWorkMeta, workMeta)
}

// set payloads
// manifest bundle status from the resource status
manifestBundleStatus := &workpayload.ManifestBundleStatus{}
if err := statusEvt.DataAs(manifestBundleStatus); err != nil {
return nil, err
}

// set work spec back
manifestBundle := &workpayload.ManifestBundle{}
if err := specEvt.DataAs(manifestBundle); err != nil {
return nil, err
if len(resource.Payload) > 0 {
specEvt, err := api.JSONMAPToCloudEvent(resource.Payload)
if err != nil {
return nil, err
}

// set work meta back from spec event
if workMeta, ok := specEvt.Extensions()[codec.ExtensionWorkMeta]; ok {
evt.SetExtension(codec.ExtensionWorkMeta, workMeta)
}

// set work spec back from spec event
manifestBundle := &workpayload.ManifestBundle{}
if err := specEvt.DataAs(manifestBundle); err != nil {
return nil, err
}
manifestBundleStatus.ManifestBundle = manifestBundle
}
manifestBundleStatus.ManifestBundle = manifestBundle

if err := evt.SetData(ce.ApplicationJSON, manifestBundleStatus); err != nil {
return nil, err
Expand Down
198 changes: 89 additions & 109 deletions cmd/maestro/server/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ var log = logger.NewOCMLogger(context.Background())
// checking the liveness of Maestro instances, triggering status resync based on
// instances' status and other conditions.
type PulseServer struct {
instanceID string
pulseInterval int64
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
eventBroadcaster *event.EventBroadcaster
resourceService services.ResourceService
eventService services.EventService
sourceClient cloudevents.SourceClient
statusDispatcher dispatcher.Dispatcher
instanceID string
pulseInterval int64
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
lockFactory db.LockFactory
eventBroadcaster *event.EventBroadcaster
resourceService services.ResourceService
statusEventService services.StatusEventService
sourceClient cloudevents.SourceClient
statusDispatcher dispatcher.Dispatcher
}

func NewPulseServer(eventBroadcaster *event.EventBroadcaster) *PulseServer {
Expand All @@ -52,16 +52,16 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) *PulseServer {
}
sessionFactory := env().Database.SessionFactory
return &PulseServer{
instanceID: env().Config.MessageBroker.ClientID,
pulseInterval: env().Config.PulseServer.PulseInterval,
instanceDao: dao.NewInstanceDao(&sessionFactory),
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
eventBroadcaster: eventBroadcaster,
resourceService: env().Services.Resources(),
eventService: env().Services.Events(),
sourceClient: env().Clients.CloudEventsSource,
statusDispatcher: statusDispatcher,
instanceID: env().Config.MessageBroker.ClientID,
pulseInterval: env().Config.PulseServer.PulseInterval,
instanceDao: dao.NewInstanceDao(&sessionFactory),
eventInstanceDao: dao.NewEventInstanceDao(&sessionFactory),
lockFactory: db.NewAdvisoryLockFactory(sessionFactory),
eventBroadcaster: eventBroadcaster,
resourceService: env().Services.Resources(),
statusEventService: env().Services.StatusEvents(),
sourceClient: env().Clients.CloudEventsSource,
statusDispatcher: statusDispatcher,
}
}

Expand Down Expand Up @@ -170,71 +170,59 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
return fmt.Errorf("unmatched consumer name %s for resource %s", resource.ConsumerName, resource.ID)
}

// set the resource source back for broadcast
// set the resource source and type back for broadcast
resource.Source = found.Source
resource.Type = found.Type

if !s.statusDispatcher.Dispatch(resource.ConsumerName) {
// the resource is not owned by the current instance, skip
log.V(4).Infof("skipping resource status update %s as it is not owned by the current instance", resource.ID)
return nil
}

// // convert the resource status to cloudevent
// evt, err := api.JSONMAPToCloudEvent(resource.Status)
// if err != nil {
// return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
// }

// // decode the cloudevent data as manifest status
// statusPayload := &workpayload.ManifestStatus{}
// if err := evt.DataAs(statusPayload); err != nil {
// return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
// }

// // if the resource has been deleted from agent, delete it from maestro
// if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
// if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil {
// return svcErr
// }
// convert the resource status to cloudevent
evt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
}

// log.V(4).Infof("Broadcast:: the resource %s is deleted", resource.ID)
// resource.Payload = found.Payload
// s.eventBroadcaster.Broadcast(resource)
// return nil
// }
// update the resource status
_, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource)
if svcErr != nil {
return fmt.Errorf("failed to update resource status %s: %s", resource.ID, svcErr.Error())
// decode the cloudevent data as manifest status
statusPayload := &workpayload.ManifestStatus{}
if err := evt.DataAs(statusPayload); err != nil {
return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
}

// // broadcast the resource status updated only when the resource is updated
// if updated {
// log.V(4).Infof("Broadcast:: the resource %s is updated", resource.ID)
// s.eventBroadcaster.Broadcast(updatedResource)
// }
if updated {
evt, sErr := s.eventService.Create(ctx, &api.Event{
Source: "Resources",
SourceID: resource.ID,
EventType: api.StatusUpdateEventType,
// if the resource has been deleted from agent, create status event and delete it from maestro
if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
_, sErr := s.statusEventService.Create(ctx, &api.StatusEvent{
ResourceID: resource.ID,
ResourceSource: resource.Source,
ResourceType: resource.Type,
Status: resource.Status,
StatusEventType: api.StatusDeleteEventType,
})
if sErr != nil {
return fmt.Errorf("failed to create event for resource status update %s: %s", resource.ID, sErr.Error())
return fmt.Errorf("failed to create status event for resource status delete %s: %s", resource.ID, sErr.Error())
}
instances, err := s.instanceDao.All(ctx)
if err != nil {
return fmt.Errorf("failed to get all maestro instances: %s", err)
if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil {
return fmt.Errorf("failed to delete resource %s: %s", resource.ID, svcErr.Error())
}
eventInstanceList := make([]*api.EventInstance, len(instances))
for i, instance := range instances {
eventInstanceList[i] = &api.EventInstance{
EventID: evt.ID,
InstanceID: instance.ID,
}
} else {
// update the resource status
_, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource)
if svcErr != nil {
return fmt.Errorf("failed to update resource status %s: %s", resource.ID, svcErr.Error())
}
if err := s.eventInstanceDao.CreateList(ctx, eventInstanceList); err != nil {
return fmt.Errorf("failed to create event instances for resource status update %s: %s", resource.ID, err.Error())

// create the status event only when the resource is updated
if updated {
_, sErr := s.statusEventService.Create(ctx, &api.StatusEvent{
ResourceID: resource.ID,
StatusEventType: api.StatusUpdateEventType,
})
if sErr != nil {
return fmt.Errorf("failed to create status event for resource status update %s: %s", resource.ID, sErr.Error())
}
}
}
default:
Expand All @@ -245,52 +233,44 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
})
}

// On StatusUpdate does three things:
// 1. Broadcast the resource status update to subscribers
// 2. Mark the event instance as done
// 3. If the resource has been deleted from agent, delete it from maestro if all event instances are done
// On StatusUpdate will be called on each new status event inserted into db.
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *PulseServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
resource, sErr := s.resourceService.Get(ctx, resourceID)
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}

// broadcast the resource status updated to subscribers
s.eventBroadcaster.Broadcast(resource)

// mark the event instance as done
if err := s.eventInstanceDao.MarkAsDone(ctx, eventID, s.instanceID); err != nil {
return fmt.Errorf("failed to mark event instance (%s, %s) as done for resource status update %s: %s", eventID, s.instanceID, resourceID, err.Error())
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

// convert the resource status to cloudevent
cloudevt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return fmt.Errorf("failed to convert resource status to cloudevent: %v", err)
}

// decode the cloudevent data as manifest status
statusPayload := &workpayload.ManifestStatus{}
if err := cloudevt.DataAs(statusPayload); err != nil {
return fmt.Errorf("failed to decode cloudevent data as resource status: %v", err)
}

// if the resource has been deleted from agent, delete it from maestro
deleted := false
if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
deleted = true
}
if deleted {
count, err := s.eventInstanceDao.GetUnhandleEventInstances(ctx, eventID)
if err != nil {
return fmt.Errorf("failed to get unhandled event instances for event %s: %s", eventID, err.Error())
var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Status: statusEvent.Status,
}
if count == 0 {
if sErr := s.resourceService.Delete(ctx, resourceID); sErr != nil {
return fmt.Errorf("failed to delete resource %s: %s", resourceID, sErr.Error())
}
} else {
resource, sErr = s.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

return nil
// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
s.eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := s.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: s.instanceID,
})

return err
}
Loading

0 comments on commit 44a199f

Please sign in to comment.