Skip to content

Commit

Permalink
purge status events
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
skeeey committed Dec 17, 2024
1 parent f859783 commit 8508b45
Show file tree
Hide file tree
Showing 15 changed files with 368 additions and 100 deletions.
3 changes: 3 additions & 0 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"

"github.com/openshift-online/maestro/pkg/logger"
Expand All @@ -18,6 +19,8 @@ func NewControllersServer(eventServer EventServer) *ControllersServer {
),
StatusController: controllers.NewStatusController(
env().Services.StatusEvents(),
dao.NewInstanceDao(&env().Database.SessionFactory),
dao.NewEventInstanceDao(&env().Database.SessionFactory),
),
}

Expand Down
90 changes: 54 additions & 36 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,42 +133,16 @@ func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID strin
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = s.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
s.eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := s.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: s.instanceID,
})

return err
return broadcastStatusEvent(
ctx,
s.statusEventService,
s.resourceService,
s.eventInstanceDao,
s.eventBroadcaster,
s.instanceID,
eventID,
resourceID,
)
}

// handleStatusUpdate processes the resource status update from the agent.
Expand Down Expand Up @@ -263,3 +237,47 @@ func handleStatusUpdate(ctx context.Context, resource *api.Resource, resourceSer

return nil
}

func broadcastStatusEvent(ctx context.Context,
statusEventService services.StatusEventService,
resourceService services.ResourceService,
eventInstanceDao dao.EventInstanceDao,
eventBroadcaster *event.EventBroadcaster,
instanceID, eventID, resourceID string) error {
statusEvent, sErr := statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: instanceID,
})

return err
}
47 changes: 10 additions & 37 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,44 +429,17 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
// TODO consider using a same way (MessageQueueEventServer.OnStatusUpdate) to handle this
func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := bkr.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = bkr.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
bkr.eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
})

return err
return broadcastStatusEvent(
ctx,
bkr.statusEventService,
bkr.resourceService,
bkr.eventInstanceDao,
bkr.eventBroadcaster,
bkr.instanceID,
eventID,
resourceID,
)
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (km *KindControllerManager) processNextEvent() bool {
}

func (km *KindControllerManager) syncEvents() {
logger.Infof("purge all reconciled events")
// delete the reconciled events from the database firstly
if err := km.events.DeleteAllReconciledEvents(context.Background()); err != nil {
// this process is called periodically, so if the error happened, we will wait for the next cycle to handle
Expand All @@ -210,6 +211,7 @@ func (km *KindControllerManager) syncEvents() {
return
}

logger.Infof("sync all unreconciled events")
unreconciledEvents, err := km.events.FindAllUnreconciledEvents(context.Background())
if err != nil {
logger.Error(fmt.Sprintf("Failed to list unreconciled events from db, %v", err))
Expand Down
73 changes: 56 additions & 17 deletions pkg/controllers/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/services"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand All @@ -16,16 +17,22 @@ const StatusEventID ControllerHandlerContextKey = "status_event"
type StatusHandlerFunc func(ctx context.Context, eventID, sourceID string) error

type StatusController struct {
controllers map[api.StatusEventType][]StatusHandlerFunc
statusEvents services.StatusEventService
eventsQueue workqueue.RateLimitingInterface
controllers map[api.StatusEventType][]StatusHandlerFunc
statusEvents services.StatusEventService
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
eventsQueue workqueue.RateLimitingInterface
}

func NewStatusController(statusEvents services.StatusEventService) *StatusController {
func NewStatusController(statusEvents services.StatusEventService,
instanceDao dao.InstanceDao,
eventInstanceDao dao.EventInstanceDao) *StatusController {
return &StatusController{
controllers: map[api.StatusEventType][]StatusHandlerFunc{},
statusEvents: statusEvents,
eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"),
controllers: map[api.StatusEventType][]StatusHandlerFunc{},
statusEvents: statusEvents,
instanceDao: instanceDao,
eventInstanceDao: eventInstanceDao,
eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"),
}
}

Expand All @@ -38,9 +45,8 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) {
logger.Infof("Starting status event controller")
defer sc.eventsQueue.ShutDown()

// TODO: start a goroutine to sync all status events periodically
// use a jitter to avoid multiple instances syncing the events at the same time
// go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh)
go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh)

// start a goroutine to handle the status event from the event queue
// the .Until will re-kick the runWorker one second after the runWorker completes
Expand All @@ -51,35 +57,35 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) {
logger.Infof("Shutting down status event controller")
}

func (sm *StatusController) runWorker() {
func (sc *StatusController) runWorker() {
// hot loop until we're told to stop. processNextEvent will automatically wait until there's work available, so
// we don't worry about secondary waits
for sm.processNextEvent() {
for sc.processNextEvent() {
}
}

// processNextEvent deals with one key off the queue.
func (sm *StatusController) processNextEvent() bool {
func (sc *StatusController) processNextEvent() bool {
// pull the next status event item from queue.
// events queue blocks until it can return an item to be processed
key, quit := sm.eventsQueue.Get()
key, quit := sc.eventsQueue.Get()
if quit {
// the current queue is shutdown and becomes empty, quit this process
return false
}
defer sm.eventsQueue.Done(key)
defer sc.eventsQueue.Done(key)

if err := sm.handleStatusEvent(key.(string)); err != nil {
if err := sc.handleStatusEvent(key.(string)); err != nil {
logger.Error(fmt.Sprintf("Failed to handle the event %v, %v ", key, err))

// we failed to handle the status event, we should requeue the item to work on later
// this method will add a backoff to avoid hotlooping on particular items
sm.eventsQueue.AddRateLimited(key)
sc.eventsQueue.AddRateLimited(key)
return true
}

// we handle the status event successfully, tell the queue to stop tracking history for this status event
sm.eventsQueue.Forget(key)
sc.eventsQueue.Forget(key)
return true
}

Expand Down Expand Up @@ -131,3 +137,36 @@ func (sc *StatusController) add(ev api.StatusEventType, fns []StatusHandlerFunc)

sc.controllers[ev] = append(sc.controllers[ev], fns...)
}

func (sc *StatusController) syncStatusEvents() {
ctx := context.Background()

readyInstanceIDs, err := sc.instanceDao.FindReadyIDs(ctx)
if err != nil {
logger.Error(fmt.Sprintf("Failed to find ready instances from db, %v", err))
return
}
logger.Infof("purge status events on the ready instances: %s", readyInstanceIDs)

// find the status events that already were dispatched to all ready instances
statusEventIDs, err := sc.eventInstanceDao.GetEventsAssociatedWithInstances(ctx, readyInstanceIDs)
if err != nil {
logger.Error(fmt.Sprintf("Failed to find handled status events from db, %v", err))
return
}

// batch delete the handled status events
batchSize := 500
for i := 0; i < len(statusEventIDs); i += batchSize {
end := i + batchSize
if end > len(statusEventIDs) {
end = len(statusEventIDs)
}
batch := statusEventIDs[i:end]
logger.Infof("purge handled status events %s", statusEventIDs)
if err := sc.statusEvents.DeleteAllEvents(ctx, batch); err != nil {
logger.Error(fmt.Sprintf("Failed to delete handled status events from db, %v", err))
return
}
}
}
36 changes: 36 additions & 0 deletions pkg/dao/event_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
type EventInstanceDao interface {
Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error)
Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error)
All(ctx context.Context) (api.EventInstanceList, error)

GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error)
}

var _ EventInstanceDao = &sqlEventInstanceDao{}
Expand Down Expand Up @@ -43,3 +46,36 @@ func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.Eve
}
return eventInstance, nil
}

func (d *sqlEventInstanceDao) All(ctx context.Context) (api.EventInstanceList, error) {
g2 := (*d.sessionFactory).New(ctx)
eventInstances := api.EventInstanceList{}
if err := g2.Find(&eventInstances).Error; err != nil {
return nil, err
}
return eventInstances, nil
}

func (d *sqlEventInstanceDao) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) {
var eventIDs []string

instanceCount := len(instanceIDs)
if instanceCount == 0 {
return eventIDs, nil
}

g2 := (*d.sessionFactory).New(ctx)

// Currently, the instance table should be small, if the instance table become to large,
// consider using join to optimize
if err := g2.Table("event_instances").
Select("event_id").
Where("instance_id IN ?", instanceIDs).
Group("event_id").
Having("COUNT(DISTINCT instance_id) = ?", instanceCount).
Scan(&eventIDs).Error; err != nil {
return nil, err
}

return eventIDs, nil
}
14 changes: 14 additions & 0 deletions pkg/dao/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type InstanceDao interface {
DeleteByIDs(ctx context.Context, ids []string) error
FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error)
FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error)
FindReadyIDs(ctx context.Context) ([]string, error)
All(ctx context.Context) (api.ServerInstanceList, error)
}

Expand Down Expand Up @@ -127,3 +128,16 @@ func (d *sqlInstanceDao) All(ctx context.Context) (api.ServerInstanceList, error
}
return instances, nil
}

func (d *sqlInstanceDao) FindReadyIDs(ctx context.Context) ([]string, error) {
g2 := (*d.sessionFactory).New(ctx)
instances := api.ServerInstanceList{}
if err := g2.Where("ready = ?", true).Find(&instances).Error; err != nil {
return nil, err
}
ids := make([]string, len(instances))
for i, instance := range instances {
ids[i] = instance.ID
}
return ids, nil
}
Loading

0 comments on commit 8508b45

Please sign in to comment.