Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

purge status events #232

Merged
merged 1 commit into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/maestro/server/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/controllers"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/db"

"github.com/openshift-online/maestro/pkg/logger"
Expand All @@ -18,6 +19,8 @@ func NewControllersServer(eventServer EventServer) *ControllersServer {
),
StatusController: controllers.NewStatusController(
env().Services.StatusEvents(),
dao.NewInstanceDao(&env().Database.SessionFactory),
dao.NewEventInstanceDao(&env().Database.SessionFactory),
),
}

Expand Down
90 changes: 54 additions & 36 deletions cmd/maestro/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,42 +133,16 @@ func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID strin
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := s.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = s.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
s.eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := s.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: s.instanceID,
})

return err
return broadcastStatusEvent(
ctx,
s.statusEventService,
s.resourceService,
s.eventInstanceDao,
s.eventBroadcaster,
s.instanceID,
eventID,
resourceID,
)
}

// handleStatusUpdate processes the resource status update from the agent.
Expand Down Expand Up @@ -263,3 +237,47 @@ func handleStatusUpdate(ctx context.Context, resource *api.Resource, resourceSer

return nil
}

func broadcastStatusEvent(ctx context.Context,
statusEventService services.StatusEventService,
resourceService services.ResourceService,
eventInstanceDao dao.EventInstanceDao,
eventBroadcaster *event.EventBroadcaster,
instanceID, eventID, resourceID string) error {
statusEvent, sErr := statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: instanceID,
})

return err
}
47 changes: 10 additions & 37 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,44 +429,17 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error {
// It does two things:
// 1. build the resource status and broadcast it to subscribers
// 2. add the event instance record to mark the event has been processed by the current instance
// TODO consider using a same way (MessageQueueEventServer.OnStatusUpdate) to handle this
func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error {
statusEvent, sErr := bkr.statusEventService.Get(ctx, eventID)
if sErr != nil {
return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error())
}

var resource *api.Resource
// check if the status event is delete event
if statusEvent.StatusEventType == api.StatusDeleteEventType {
// build resource with resource id and delete status
resource = &api.Resource{
Meta: api.Meta{
ID: resourceID,
},
Source: statusEvent.ResourceSource,
Type: statusEvent.ResourceType,
Payload: statusEvent.Payload,
Status: statusEvent.Status,
}
} else {
resource, sErr = bkr.resourceService.Get(ctx, resourceID)
if sErr != nil {
return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error())
}
}

// broadcast the resource status to subscribers
log.V(4).Infof("Broadcast the resource status %s", resource.ID)
bkr.eventBroadcaster.Broadcast(resource)

// add the event instance record
_, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
})

return err
return broadcastStatusEvent(
ctx,
bkr.statusEventService,
bkr.resourceService,
bkr.eventInstanceDao,
bkr.eventBroadcaster,
bkr.instanceID,
eventID,
resourceID,
)
}

// IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec.
Expand Down
1 change: 1 addition & 0 deletions cmd/maestro/server/healthcheck_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (s *HealthCheckServer) pulse(ctx context.Context) {
return
}
klog.Errorf("Unable to get maestro instance: %s", err.Error())
return
}
found.LastHeartbeat = time.Now()
_, err = s.instanceDao.Replace(ctx, found)
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (km *KindControllerManager) processNextEvent() bool {
}

func (km *KindControllerManager) syncEvents() {
logger.Infof("purge all reconciled events")
// delete the reconciled events from the database firstly
if err := km.events.DeleteAllReconciledEvents(context.Background()); err != nil {
// this process is called periodically, so if the error happened, we will wait for the next cycle to handle
Expand All @@ -210,6 +211,7 @@ func (km *KindControllerManager) syncEvents() {
return
}

logger.Infof("sync all unreconciled events")
unreconciledEvents, err := km.events.FindAllUnreconciledEvents(context.Background())
if err != nil {
logger.Error(fmt.Sprintf("Failed to list unreconciled events from db, %v", err))
Expand Down
79 changes: 62 additions & 17 deletions pkg/controllers/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/services"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand All @@ -16,16 +17,22 @@ const StatusEventID ControllerHandlerContextKey = "status_event"
type StatusHandlerFunc func(ctx context.Context, eventID, sourceID string) error

type StatusController struct {
controllers map[api.StatusEventType][]StatusHandlerFunc
statusEvents services.StatusEventService
eventsQueue workqueue.RateLimitingInterface
controllers map[api.StatusEventType][]StatusHandlerFunc
statusEvents services.StatusEventService
instanceDao dao.InstanceDao
eventInstanceDao dao.EventInstanceDao
eventsQueue workqueue.RateLimitingInterface
}

func NewStatusController(statusEvents services.StatusEventService) *StatusController {
func NewStatusController(statusEvents services.StatusEventService,
instanceDao dao.InstanceDao,
eventInstanceDao dao.EventInstanceDao) *StatusController {
return &StatusController{
controllers: map[api.StatusEventType][]StatusHandlerFunc{},
statusEvents: statusEvents,
eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"),
controllers: map[api.StatusEventType][]StatusHandlerFunc{},
statusEvents: statusEvents,
instanceDao: instanceDao,
eventInstanceDao: eventInstanceDao,
eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"),
}
}

Expand All @@ -38,9 +45,8 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) {
logger.Infof("Starting status event controller")
defer sc.eventsQueue.ShutDown()

// TODO: start a goroutine to sync all status events periodically
// use a jitter to avoid multiple instances syncing the events at the same time
// go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh)
go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh)

// start a goroutine to handle the status event from the event queue
// the .Until will re-kick the runWorker one second after the runWorker completes
Expand All @@ -51,35 +57,35 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) {
logger.Infof("Shutting down status event controller")
}

func (sm *StatusController) runWorker() {
func (sc *StatusController) runWorker() {
// hot loop until we're told to stop. processNextEvent will automatically wait until there's work available, so
// we don't worry about secondary waits
for sm.processNextEvent() {
for sc.processNextEvent() {
}
}

// processNextEvent deals with one key off the queue.
func (sm *StatusController) processNextEvent() bool {
func (sc *StatusController) processNextEvent() bool {
// pull the next status event item from queue.
// events queue blocks until it can return an item to be processed
key, quit := sm.eventsQueue.Get()
key, quit := sc.eventsQueue.Get()
if quit {
// the current queue is shutdown and becomes empty, quit this process
return false
}
defer sm.eventsQueue.Done(key)
defer sc.eventsQueue.Done(key)

if err := sm.handleStatusEvent(key.(string)); err != nil {
if err := sc.handleStatusEvent(key.(string)); err != nil {
logger.Error(fmt.Sprintf("Failed to handle the event %v, %v ", key, err))

// we failed to handle the status event, we should requeue the item to work on later
// this method will add a backoff to avoid hotlooping on particular items
sm.eventsQueue.AddRateLimited(key)
sc.eventsQueue.AddRateLimited(key)
return true
}

// we handle the status event successfully, tell the queue to stop tracking history for this status event
sm.eventsQueue.Forget(key)
sc.eventsQueue.Forget(key)
return true
}

Expand Down Expand Up @@ -131,3 +137,42 @@ func (sc *StatusController) add(ev api.StatusEventType, fns []StatusHandlerFunc)

sc.controllers[ev] = append(sc.controllers[ev], fns...)
}

func (sc *StatusController) syncStatusEvents() {
ctx := context.Background()

readyInstanceIDs, err := sc.instanceDao.FindReadyIDs(ctx)
if err != nil {
logger.Error(fmt.Sprintf("Failed to find ready instances from db, %v", err))
return
}
logger.Infof("purge status events on the ready instances: %s", readyInstanceIDs)

// find the status events that already were dispatched to all ready instances
statusEventIDs, err := sc.eventInstanceDao.GetEventsAssociatedWithInstances(ctx, readyInstanceIDs)
if err != nil {
logger.Error(fmt.Sprintf("Failed to find handled status events from db, %v", err))
return
}

// batch delete the handled status events
batches := batchStatusEventIDs(statusEventIDs, 500)
for _, batch := range batches {
if err := sc.statusEvents.DeleteAllEvents(ctx, batch); err != nil {
logger.Error(fmt.Sprintf("Failed to delete handled status events from db, %v", err))
return
}
}
}

func batchStatusEventIDs(statusEventIDs []string, batchSize int) [][]string {
batches := [][]string{}
for i := 0; i < len(statusEventIDs); i += batchSize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need to add UT to test this method. It may have problem if the length of statusEventIDs is 800.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

end := i + batchSize
if end > len(statusEventIDs) {
end = len(statusEventIDs)
}
batches = append(batches, statusEventIDs[i:end])
}
return batches
}
64 changes: 64 additions & 0 deletions pkg/controllers/status_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package controllers

import (
"testing"
)

func TestBatchStatusEventIDs(t *testing.T) {
const batchSize = 500

cases := []struct {
name string
statusEventIDs []string
expected [][]string
}{
{
name: "empty input",
statusEventIDs: []string{},
expected: [][]string{},
},
{
name: "single batch less than batch size",
statusEventIDs: make([]string, 499),
expected: [][]string{make([]string, 499)},
},
{
name: "single batch equal to batch size",
statusEventIDs: make([]string, batchSize),
expected: [][]string{make([]string, batchSize)},
},
{
name: "multiple batches full",
statusEventIDs: make([]string, batchSize*2),
expected: [][]string{make([]string, batchSize), make([]string, batchSize)},
},
{
name: "multiple batches partial last",
statusEventIDs: make([]string, batchSize+100),
expected: [][]string{make([]string, batchSize), make([]string, 100)},
},
{
name: "multiple batches full partial last",
statusEventIDs: make([]string, batchSize*2+300),
expected: [][]string{make([]string, batchSize), make([]string, batchSize), make([]string, 300)},
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
result := batchStatusEventIDs(tt.statusEventIDs, batchSize)

// Ensure the number of batches is correct
if len(result) != len(tt.expected) {
t.Errorf("number of batches mismatch, got %d, want %d", len(result), len(tt.expected))
}

// Check the length of each batch
for i := range result {
if len(result[i]) != len(tt.expected[i]) {
t.Errorf("length of batch %d mismatch, got %d, want %d", i+1, len(result[i]), len(tt.expected[i]))
}
}
})
}
}
Loading
Loading