From 8f850939c66db4374f0f5866dfc708afb89ce486 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 20 Dec 2024 14:05:04 +0800 Subject: [PATCH] purge status events (#232) Signed-off-by: Wei Liu --- cmd/maestro/server/controllers.go | 3 + cmd/maestro/server/event_server.go | 90 ++++++---- cmd/maestro/server/grpc_broker.go | 47 ++---- cmd/maestro/server/healthcheck_server.go | 1 + pkg/controllers/framework.go | 2 + pkg/controllers/status_controller.go | 79 +++++++-- pkg/controllers/status_controller_test.go | 64 +++++++ pkg/dao/event_instances.go | 36 ++++ pkg/dao/instance.go | 14 ++ pkg/dao/mocks/instance.go | 13 ++ pkg/dao/status_event.go | 14 ++ .../migrations/202311151856_add_consumers.go | 2 +- .../202412181141_alter_event_instances.go | 51 ++++++ pkg/db/migrations/migration_structs.go | 20 ++- pkg/services/status_event.go | 8 + test/helper.go | 21 ++- test/integration/controller_test.go | 157 ++++++++++++++++++ test/registration.go | 6 +- 18 files changed, 521 insertions(+), 107 deletions(-) create mode 100644 pkg/controllers/status_controller_test.go create mode 100644 pkg/db/migrations/202412181141_alter_event_instances.go diff --git a/cmd/maestro/server/controllers.go b/cmd/maestro/server/controllers.go index d7d9cb68..dcf713b4 100755 --- a/cmd/maestro/server/controllers.go +++ b/cmd/maestro/server/controllers.go @@ -5,6 +5,7 @@ import ( "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/controllers" + "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/logger" @@ -18,6 +19,8 @@ func NewControllersServer(eventServer EventServer) *ControllersServer { ), StatusController: controllers.NewStatusController( env().Services.StatusEvents(), + dao.NewInstanceDao(&env().Database.SessionFactory), + dao.NewEventInstanceDao(&env().Database.SessionFactory), ), } diff --git a/cmd/maestro/server/event_server.go b/cmd/maestro/server/event_server.go index cae4207c..5514a3a3 100644 --- a/cmd/maestro/server/event_server.go +++ b/cmd/maestro/server/event_server.go @@ -133,42 +133,16 @@ func (s *MessageQueueEventServer) OnDelete(ctx context.Context, resourceID strin // 1. build the resource status and broadcast it to subscribers // 2. add the event instance record to mark the event has been processed by the current instance func (s *MessageQueueEventServer) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { - statusEvent, sErr := s.statusEventService.Get(ctx, eventID) - if sErr != nil { - return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) - } - - var resource *api.Resource - // check if the status event is delete event - if statusEvent.StatusEventType == api.StatusDeleteEventType { - // build resource with resource id and delete status - resource = &api.Resource{ - Meta: api.Meta{ - ID: resourceID, - }, - Source: statusEvent.ResourceSource, - Type: statusEvent.ResourceType, - Payload: statusEvent.Payload, - Status: statusEvent.Status, - } - } else { - resource, sErr = s.resourceService.Get(ctx, resourceID) - if sErr != nil { - return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) - } - } - - // broadcast the resource status to subscribers - log.V(4).Infof("Broadcast the resource status %s", resource.ID) - s.eventBroadcaster.Broadcast(resource) - - // add the event instance record - _, err := s.eventInstanceDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: s.instanceID, - }) - - return err + return broadcastStatusEvent( + ctx, + s.statusEventService, + s.resourceService, + s.eventInstanceDao, + s.eventBroadcaster, + s.instanceID, + eventID, + resourceID, + ) } // handleStatusUpdate processes the resource status update from the agent. @@ -263,3 +237,47 @@ func handleStatusUpdate(ctx context.Context, resource *api.Resource, resourceSer return nil } + +func broadcastStatusEvent(ctx context.Context, + statusEventService services.StatusEventService, + resourceService services.ResourceService, + eventInstanceDao dao.EventInstanceDao, + eventBroadcaster *event.EventBroadcaster, + instanceID, eventID, resourceID string) error { + statusEvent, sErr := statusEventService.Get(ctx, eventID) + if sErr != nil { + return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) + } + + var resource *api.Resource + // check if the status event is delete event + if statusEvent.StatusEventType == api.StatusDeleteEventType { + // build resource with resource id and delete status + resource = &api.Resource{ + Meta: api.Meta{ + ID: resourceID, + }, + Source: statusEvent.ResourceSource, + Type: statusEvent.ResourceType, + Payload: statusEvent.Payload, + Status: statusEvent.Status, + } + } else { + resource, sErr = resourceService.Get(ctx, resourceID) + if sErr != nil { + return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) + } + } + + // broadcast the resource status to subscribers + log.V(4).Infof("Broadcast the resource status %s", resource.ID) + eventBroadcaster.Broadcast(resource) + + // add the event instance record + _, err := eventInstanceDao.Create(ctx, &api.EventInstance{ + EventID: eventID, + InstanceID: instanceID, + }) + + return err +} diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index 2dee39e4..cf301fd1 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -429,44 +429,17 @@ func (bkr *GRPCBroker) OnDelete(ctx context.Context, id string) error { // It does two things: // 1. build the resource status and broadcast it to subscribers // 2. add the event instance record to mark the event has been processed by the current instance -// TODO consider using a same way (MessageQueueEventServer.OnStatusUpdate) to handle this func (bkr *GRPCBroker) OnStatusUpdate(ctx context.Context, eventID, resourceID string) error { - statusEvent, sErr := bkr.statusEventService.Get(ctx, eventID) - if sErr != nil { - return fmt.Errorf("failed to get status event %s: %s", eventID, sErr.Error()) - } - - var resource *api.Resource - // check if the status event is delete event - if statusEvent.StatusEventType == api.StatusDeleteEventType { - // build resource with resource id and delete status - resource = &api.Resource{ - Meta: api.Meta{ - ID: resourceID, - }, - Source: statusEvent.ResourceSource, - Type: statusEvent.ResourceType, - Payload: statusEvent.Payload, - Status: statusEvent.Status, - } - } else { - resource, sErr = bkr.resourceService.Get(ctx, resourceID) - if sErr != nil { - return fmt.Errorf("failed to get resource %s: %s", resourceID, sErr.Error()) - } - } - - // broadcast the resource status to subscribers - log.V(4).Infof("Broadcast the resource status %s", resource.ID) - bkr.eventBroadcaster.Broadcast(resource) - - // add the event instance record - _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{ - EventID: eventID, - InstanceID: bkr.instanceID, - }) - - return err + return broadcastStatusEvent( + ctx, + bkr.statusEventService, + bkr.resourceService, + bkr.eventInstanceDao, + bkr.eventBroadcaster, + bkr.instanceID, + eventID, + resourceID, + ) } // IsConsumerSubscribed returns true if the consumer is subscribed to the broker for resource spec. diff --git a/cmd/maestro/server/healthcheck_server.go b/cmd/maestro/server/healthcheck_server.go index 07bed664..22f01875 100755 --- a/cmd/maestro/server/healthcheck_server.go +++ b/cmd/maestro/server/healthcheck_server.go @@ -117,6 +117,7 @@ func (s *HealthCheckServer) pulse(ctx context.Context) { return } klog.Errorf("Unable to get maestro instance: %s", err.Error()) + return } found.LastHeartbeat = time.Now() _, err = s.instanceDao.Replace(ctx, found) diff --git a/pkg/controllers/framework.go b/pkg/controllers/framework.go index 608c56be..f06f592a 100755 --- a/pkg/controllers/framework.go +++ b/pkg/controllers/framework.go @@ -202,6 +202,7 @@ func (km *KindControllerManager) processNextEvent() bool { } func (km *KindControllerManager) syncEvents() { + logger.Infof("purge all reconciled events") // delete the reconciled events from the database firstly if err := km.events.DeleteAllReconciledEvents(context.Background()); err != nil { // this process is called periodically, so if the error happened, we will wait for the next cycle to handle @@ -210,6 +211,7 @@ func (km *KindControllerManager) syncEvents() { return } + logger.Infof("sync all unreconciled events") unreconciledEvents, err := km.events.FindAllUnreconciledEvents(context.Background()) if err != nil { logger.Error(fmt.Sprintf("Failed to list unreconciled events from db, %v", err)) diff --git a/pkg/controllers/status_controller.go b/pkg/controllers/status_controller.go index fcb6cf45..7aea2b60 100644 --- a/pkg/controllers/status_controller.go +++ b/pkg/controllers/status_controller.go @@ -6,6 +6,7 @@ import ( "time" "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/services" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" @@ -16,16 +17,22 @@ const StatusEventID ControllerHandlerContextKey = "status_event" type StatusHandlerFunc func(ctx context.Context, eventID, sourceID string) error type StatusController struct { - controllers map[api.StatusEventType][]StatusHandlerFunc - statusEvents services.StatusEventService - eventsQueue workqueue.RateLimitingInterface + controllers map[api.StatusEventType][]StatusHandlerFunc + statusEvents services.StatusEventService + instanceDao dao.InstanceDao + eventInstanceDao dao.EventInstanceDao + eventsQueue workqueue.RateLimitingInterface } -func NewStatusController(statusEvents services.StatusEventService) *StatusController { +func NewStatusController(statusEvents services.StatusEventService, + instanceDao dao.InstanceDao, + eventInstanceDao dao.EventInstanceDao) *StatusController { return &StatusController{ - controllers: map[api.StatusEventType][]StatusHandlerFunc{}, - statusEvents: statusEvents, - eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"), + controllers: map[api.StatusEventType][]StatusHandlerFunc{}, + statusEvents: statusEvents, + instanceDao: instanceDao, + eventInstanceDao: eventInstanceDao, + eventsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "status-event-controller"), } } @@ -38,9 +45,8 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) { logger.Infof("Starting status event controller") defer sc.eventsQueue.ShutDown() - // TODO: start a goroutine to sync all status events periodically // use a jitter to avoid multiple instances syncing the events at the same time - // go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh) + go wait.JitterUntil(sc.syncStatusEvents, defaultEventsSyncPeriod, 0.25, true, stopCh) // start a goroutine to handle the status event from the event queue // the .Until will re-kick the runWorker one second after the runWorker completes @@ -51,35 +57,35 @@ func (sc *StatusController) Run(stopCh <-chan struct{}) { logger.Infof("Shutting down status event controller") } -func (sm *StatusController) runWorker() { +func (sc *StatusController) runWorker() { // hot loop until we're told to stop. processNextEvent will automatically wait until there's work available, so // we don't worry about secondary waits - for sm.processNextEvent() { + for sc.processNextEvent() { } } // processNextEvent deals with one key off the queue. -func (sm *StatusController) processNextEvent() bool { +func (sc *StatusController) processNextEvent() bool { // pull the next status event item from queue. // events queue blocks until it can return an item to be processed - key, quit := sm.eventsQueue.Get() + key, quit := sc.eventsQueue.Get() if quit { // the current queue is shutdown and becomes empty, quit this process return false } - defer sm.eventsQueue.Done(key) + defer sc.eventsQueue.Done(key) - if err := sm.handleStatusEvent(key.(string)); err != nil { + if err := sc.handleStatusEvent(key.(string)); err != nil { logger.Error(fmt.Sprintf("Failed to handle the event %v, %v ", key, err)) // we failed to handle the status event, we should requeue the item to work on later // this method will add a backoff to avoid hotlooping on particular items - sm.eventsQueue.AddRateLimited(key) + sc.eventsQueue.AddRateLimited(key) return true } // we handle the status event successfully, tell the queue to stop tracking history for this status event - sm.eventsQueue.Forget(key) + sc.eventsQueue.Forget(key) return true } @@ -131,3 +137,42 @@ func (sc *StatusController) add(ev api.StatusEventType, fns []StatusHandlerFunc) sc.controllers[ev] = append(sc.controllers[ev], fns...) } + +func (sc *StatusController) syncStatusEvents() { + ctx := context.Background() + + readyInstanceIDs, err := sc.instanceDao.FindReadyIDs(ctx) + if err != nil { + logger.Error(fmt.Sprintf("Failed to find ready instances from db, %v", err)) + return + } + logger.Infof("purge status events on the ready instances: %s", readyInstanceIDs) + + // find the status events that already were dispatched to all ready instances + statusEventIDs, err := sc.eventInstanceDao.GetEventsAssociatedWithInstances(ctx, readyInstanceIDs) + if err != nil { + logger.Error(fmt.Sprintf("Failed to find handled status events from db, %v", err)) + return + } + + // batch delete the handled status events + batches := batchStatusEventIDs(statusEventIDs, 500) + for _, batch := range batches { + if err := sc.statusEvents.DeleteAllEvents(ctx, batch); err != nil { + logger.Error(fmt.Sprintf("Failed to delete handled status events from db, %v", err)) + return + } + } +} + +func batchStatusEventIDs(statusEventIDs []string, batchSize int) [][]string { + batches := [][]string{} + for i := 0; i < len(statusEventIDs); i += batchSize { + end := i + batchSize + if end > len(statusEventIDs) { + end = len(statusEventIDs) + } + batches = append(batches, statusEventIDs[i:end]) + } + return batches +} diff --git a/pkg/controllers/status_controller_test.go b/pkg/controllers/status_controller_test.go new file mode 100644 index 00000000..c49736c4 --- /dev/null +++ b/pkg/controllers/status_controller_test.go @@ -0,0 +1,64 @@ +package controllers + +import ( + "testing" +) + +func TestBatchStatusEventIDs(t *testing.T) { + const batchSize = 500 + + cases := []struct { + name string + statusEventIDs []string + expected [][]string + }{ + { + name: "empty input", + statusEventIDs: []string{}, + expected: [][]string{}, + }, + { + name: "single batch less than batch size", + statusEventIDs: make([]string, 499), + expected: [][]string{make([]string, 499)}, + }, + { + name: "single batch equal to batch size", + statusEventIDs: make([]string, batchSize), + expected: [][]string{make([]string, batchSize)}, + }, + { + name: "multiple batches full", + statusEventIDs: make([]string, batchSize*2), + expected: [][]string{make([]string, batchSize), make([]string, batchSize)}, + }, + { + name: "multiple batches partial last", + statusEventIDs: make([]string, batchSize+100), + expected: [][]string{make([]string, batchSize), make([]string, 100)}, + }, + { + name: "multiple batches full partial last", + statusEventIDs: make([]string, batchSize*2+300), + expected: [][]string{make([]string, batchSize), make([]string, batchSize), make([]string, 300)}, + }, + } + + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + result := batchStatusEventIDs(tt.statusEventIDs, batchSize) + + // Ensure the number of batches is correct + if len(result) != len(tt.expected) { + t.Errorf("number of batches mismatch, got %d, want %d", len(result), len(tt.expected)) + } + + // Check the length of each batch + for i := range result { + if len(result[i]) != len(tt.expected[i]) { + t.Errorf("length of batch %d mismatch, got %d, want %d", i+1, len(result[i]), len(tt.expected[i])) + } + } + }) + } +} diff --git a/pkg/dao/event_instances.go b/pkg/dao/event_instances.go index c7722b16..7ec1eef5 100644 --- a/pkg/dao/event_instances.go +++ b/pkg/dao/event_instances.go @@ -12,6 +12,9 @@ import ( type EventInstanceDao interface { Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) + + FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) + GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) } var _ EventInstanceDao = &sqlEventInstanceDao{} @@ -43,3 +46,36 @@ func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.Eve } return eventInstance, nil } + +func (d *sqlEventInstanceDao) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) { + g2 := (*d.sessionFactory).New(ctx) + eventInstances := api.EventInstanceList{} + if err := g2.Where("event_id in (?)", ids).Find(&eventInstances).Error; err != nil { + return nil, err + } + return eventInstances, nil +} + +func (d *sqlEventInstanceDao) GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error) { + var eventIDs []string + + instanceCount := len(instanceIDs) + if instanceCount == 0 { + return eventIDs, nil + } + + g2 := (*d.sessionFactory).New(ctx) + + // Currently, the instance table should be small, if the instance table become to large, + // consider using join to optimize + if err := g2.Table("event_instances"). + Select("event_id"). + Where("instance_id IN ?", instanceIDs). + Group("event_id"). + Having("COUNT(DISTINCT instance_id) = ?", instanceCount). + Scan(&eventIDs).Error; err != nil { + return nil, err + } + + return eventIDs, nil +} diff --git a/pkg/dao/instance.go b/pkg/dao/instance.go index 21e732ca..8be26a3c 100644 --- a/pkg/dao/instance.go +++ b/pkg/dao/instance.go @@ -20,6 +20,7 @@ type InstanceDao interface { DeleteByIDs(ctx context.Context, ids []string) error FindByIDs(ctx context.Context, ids []string) (api.ServerInstanceList, error) FindByUpdatedTime(ctx context.Context, updatedTime time.Time) (api.ServerInstanceList, error) + FindReadyIDs(ctx context.Context) ([]string, error) All(ctx context.Context) (api.ServerInstanceList, error) } @@ -127,3 +128,16 @@ func (d *sqlInstanceDao) All(ctx context.Context) (api.ServerInstanceList, error } return instances, nil } + +func (d *sqlInstanceDao) FindReadyIDs(ctx context.Context) ([]string, error) { + g2 := (*d.sessionFactory).New(ctx) + instances := api.ServerInstanceList{} + if err := g2.Where("ready = ?", true).Find(&instances).Error; err != nil { + return nil, err + } + ids := make([]string, len(instances)) + for i, instance := range instances { + ids[i] = instance.ID + } + return ids, nil +} diff --git a/pkg/dao/mocks/instance.go b/pkg/dao/mocks/instance.go index b872b337..73ff4e21 100644 --- a/pkg/dao/mocks/instance.go +++ b/pkg/dao/mocks/instance.go @@ -136,6 +136,19 @@ func (d *instanceDaoMock) FindByUpdatedTime(ctx context.Context, updatedTime tim return instances, nil } +func (d *instanceDaoMock) FindReadyIDs(ctx context.Context) ([]string, error) { + d.mux.RLock() + defer d.mux.RUnlock() + + ids := []string{} + for _, instance := range d.instances { + if instance.Ready { + ids = append(ids, instance.ID) + } + } + return ids, nil +} + func (d *instanceDaoMock) All(ctx context.Context) (api.ServerInstanceList, error) { d.mux.RLock() defer d.mux.RUnlock() diff --git a/pkg/dao/status_event.go b/pkg/dao/status_event.go index 03916480..298f9dc4 100755 --- a/pkg/dao/status_event.go +++ b/pkg/dao/status_event.go @@ -19,6 +19,7 @@ type StatusEventDao interface { All(ctx context.Context) (api.StatusEventList, error) DeleteAllReconciledEvents(ctx context.Context) error + DeleteAllEvents(ctx context.Context, eventIDs []string) error FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, error) } @@ -94,6 +95,19 @@ func (d *sqlStatusEventDao) DeleteAllReconciledEvents(ctx context.Context) error return nil } +func (d *sqlStatusEventDao) DeleteAllEvents(ctx context.Context, eventIDs []string) error { + if len(eventIDs) == 0 { + return nil + } + + g2 := (*d.sessionFactory).New(ctx) + if err := g2.Unscoped().Omit(clause.Associations).Where("id IN ?", eventIDs).Delete(&api.StatusEvent{}).Error; err != nil { + db.MarkForRollback(ctx, err) + return err + } + return nil +} + func (d *sqlStatusEventDao) FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, error) { g2 := (*d.sessionFactory).New(ctx) statusEvents := api.StatusEventList{} diff --git a/pkg/db/migrations/202311151856_add_consumers.go b/pkg/db/migrations/202311151856_add_consumers.go index 0a764d25..f3dcf565 100755 --- a/pkg/db/migrations/202311151856_add_consumers.go +++ b/pkg/db/migrations/202311151856_add_consumers.go @@ -22,7 +22,7 @@ func addConsumers() *gormigrate.Migration { } if err := CreateFK(tx, fkMigration{ - "resources", "consumers", "consumer_name", "consumers(name)", + "resources", "consumers", "consumer_name", "consumers(name)", "ON DELETE RESTRICT ON UPDATE RESTRICT", }); err != nil { return err } diff --git a/pkg/db/migrations/202412181141_alter_event_instances.go b/pkg/db/migrations/202412181141_alter_event_instances.go new file mode 100644 index 00000000..d07c5cc0 --- /dev/null +++ b/pkg/db/migrations/202412181141_alter_event_instances.go @@ -0,0 +1,51 @@ +package migrations + +import ( + "gorm.io/gorm" + + "github.com/go-gormigrate/gormigrate/v2" +) + +func alterEventInstances() *gormigrate.Migration { + type EventInstance struct { + EventID string `gorm:"index:idx_status_event_instance"` // primary key of status_events table + InstanceID string `gorm:"index:idx_status_event_instance"` // primary key of server_instances table + SpecEventID string `gorm:"index"` // primary key of events table + } + + return &gormigrate.Migration{ + ID: "202412181141", + Migrate: func(tx *gorm.DB) error { + if err := tx.AutoMigrate(&EventInstance{}); err != nil { + return err + } + + return CreateFK(tx, fkMigration{ + "event_instances", "server_instances", "instance_id", "server_instances(id)", "ON DELETE CASCADE", + }, fkMigration{ + "event_instances", "status_events", "event_id", "status_events(id)", "ON DELETE CASCADE", + }, fkMigration{ + "event_instances", "events", "spec_event_id", "events(id)", "ON DELETE CASCADE", + }) + }, + Rollback: func(tx *gorm.DB) error { + if err := tx.Migrator().DropColumn(&EventInstance{}, "spec_event_id"); err != nil { + return err + } + + if err := tx.Migrator().DropIndex(&EventInstance{}, "idx_status_event_instance"); err != nil { + return err + } + + if err := tx.Migrator().DropConstraint(&EventInstance{}, fkName("event_instances", "server_instances")); err != nil { + return err + } + + if err := tx.Migrator().DropConstraint(&EventInstance{}, fkName("event_instances", "status_events")); err != nil { + return err + } + + return tx.Migrator().DropConstraint(&EventInstance{}, fkName("event_instances", "events")) + }, + } +} diff --git a/pkg/db/migrations/migration_structs.go b/pkg/db/migrations/migration_structs.go index 2707961e..79c79b70 100755 --- a/pkg/db/migrations/migration_structs.go +++ b/pkg/db/migrations/migration_structs.go @@ -35,6 +35,7 @@ var MigrationList = []*gormigrate.Migration{ addStatusEvents(), addEventInstances(), addLastHeartBeatAndReadyColumnInServerInstancesTable(), + alterEventInstances(), } // Model represents the base model struct. All entities will have this struct embedded. @@ -46,23 +47,28 @@ type Model struct { } type fkMigration struct { - Model string - Dest string - Field string - Reference string + Model string + Dest string + Field string + Reference string + Constraint string } func CreateFK(g2 *gorm.DB, fks ...fkMigration) error { - var query = `ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s ON DELETE RESTRICT ON UPDATE RESTRICT;` var drop = `ALTER TABLE %s DROP CONSTRAINT IF EXISTS %s;` for _, fk := range fks { - name := fmt.Sprintf("fk_%s_%s", fk.Model, fk.Dest) + name := fkName(fk.Model, fk.Dest) g2.Exec(fmt.Sprintf(drop, fk.Model, name)) - if err := g2.Exec(fmt.Sprintf(query, fk.Model, name, fk.Field, fk.Reference)).Error; err != nil { + if err := g2.Exec(fmt.Sprintf(`ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s %s;`, + fk.Model, name, fk.Field, fk.Reference, fk.Constraint)).Error; err != nil { return err } } return nil } + +func fkName(model, dest string) string { + return fmt.Sprintf("fk_%s_%s", model, dest) +} diff --git a/pkg/services/status_event.go b/pkg/services/status_event.go index c39916c8..f69c80b2 100755 --- a/pkg/services/status_event.go +++ b/pkg/services/status_event.go @@ -18,6 +18,7 @@ type StatusEventService interface { FindAllUnreconciledEvents(ctx context.Context) (api.StatusEventList, *errors.ServiceError) DeleteAllReconciledEvents(ctx context.Context) *errors.ServiceError + DeleteAllEvents(ctx context.Context, eventIDs []string) *errors.ServiceError } func NewStatusEventService(statusEventDao dao.StatusEventDao) StatusEventService { @@ -93,3 +94,10 @@ func (s *sqlStatusEventService) DeleteAllReconciledEvents(ctx context.Context) * } return nil } + +func (s *sqlStatusEventService) DeleteAllEvents(ctx context.Context, eventIDs []string) *errors.ServiceError { + if err := s.statusEventDao.DeleteAllEvents(ctx, eventIDs); err != nil { + return handleDeleteError("StatusEvent", errors.GeneralError("Unable to delete events %s: %s", eventIDs, err)) + } + return nil +} diff --git a/test/helper.go b/test/helper.go index ef374d39..e9fcfe5a 100755 --- a/test/helper.go +++ b/test/helper.go @@ -121,8 +121,13 @@ func NewHelper(t *testing.T) *Helper { Ctx: ctx, ContextCancelFunc: cancel, EventBroadcaster: event.NewEventBroadcaster(), - StatusDispatcher: dispatcher.NewHashDispatcher(helper.Env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&helper.Env().Database.SessionFactory), - dao.NewConsumerDao(&helper.Env().Database.SessionFactory), helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig), + StatusDispatcher: dispatcher.NewHashDispatcher( + helper.Env().Config.MessageBroker.ClientID, + dao.NewInstanceDao(&helper.Env().Database.SessionFactory), + dao.NewConsumerDao(&helper.Env().Database.SessionFactory), + helper.Env().Clients.CloudEventsSource, + helper.Env().Config.EventServer.ConsistentHashConfig, + ), AppConfig: env.Config, DBFactory: env.Database.SessionFactory, JWTPrivateKey: jwtKey, @@ -138,6 +143,10 @@ func NewHelper(t *testing.T) *Helper { jwkMockTeardown, } + if err := helper.MigrateDB(); err != nil { + panic(err) + } + helper.startEventBroadcaster() helper.startAPIServer() helper.startMetricsServer() @@ -238,6 +247,8 @@ func (helper *Helper) StartControllerManager(ctx context.Context) { ), StatusController: controllers.NewStatusController( helper.Env().Services.StatusEvents(), + dao.NewInstanceDao(&helper.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&helper.Env().Database.SessionFactory), ), } @@ -481,11 +492,9 @@ func (helper *Helper) CleanDB() error { for _, table := range []string{ "events", "status_events", - "event_instances", "resources", "consumers", "server_instances", - "migrations", } { if g2.Migrator().HasTable(table) { // remove table contents instead of dropping table @@ -504,10 +513,6 @@ func (helper *Helper) ResetDB() error { return err } - if err := helper.MigrateDB(); err != nil { - return err - } - return nil } diff --git a/test/integration/controller_test.go b/test/integration/controller_test.go index a6ce56cc..1285eb60 100755 --- a/test/integration/controller_test.go +++ b/test/integration/controller_test.go @@ -85,6 +85,8 @@ func TestControllerRacing(t *testing.T) { ), StatusController: controllers.NewStatusController( h.Env().Services.StatusEvents(), + dao.NewInstanceDao(&h.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), ), } @@ -181,6 +183,8 @@ func TestControllerReconcile(t *testing.T) { ), StatusController: controllers.NewStatusController( h.Env().Services.StatusEvents(), + dao.NewInstanceDao(&h.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), ), } @@ -312,6 +316,8 @@ func TestControllerSync(t *testing.T) { ), StatusController: controllers.NewStatusController( h.Env().Services.StatusEvents(), + dao.NewInstanceDao(&h.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), ), } @@ -347,3 +353,154 @@ func TestControllerSync(t *testing.T) { // cancel the context to stop the controller manager cancel() } + +func TestStatusControllerSync(t *testing.T) { + h, _ := test.RegisterIntegration(t) + + account := h.NewRandAccount() + ctx, cancel := context.WithCancel(h.NewAuthenticatedContext(account)) + + instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory) + statusEventDao := dao.NewStatusEventDao(&h.Env().Database.SessionFactory) + eventInstanceDao := dao.NewEventInstanceDao(&h.Env().Database.SessionFactory) + + // prepare instances + if _, err := instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: "i1"}, Ready: true, LastHeartbeat: time.Now()}); err != nil { + t.Fatal(err) + } + if _, err := instanceDao.Create(ctx, &api.ServerInstance{Meta: api.Meta{ID: "i2"}}); err != nil { + t.Fatal(err) + } + if _, err := instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ID: "i3"}, Ready: true, LastHeartbeat: time.Now()}); err != nil { + t.Fatal(err) + } + + // prepare events + evt1, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + evt2, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + evt3, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + evt4, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + evt5, err := statusEventDao.Create(ctx, &api.StatusEvent{}) + if err != nil { + t.Fatal(err) + } + + readyInstances, err := instanceDao.FindReadyIDs(ctx) + if err != nil { + t.Fatal(err) + } + + // prepare event-instances + for _, id := range readyInstances { + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: id, EventID: evt1.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: id, EventID: evt2.ID}); err != nil { + t.Fatal(err) + } + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i2", EventID: evt1.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i1", EventID: evt3.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i2", EventID: evt3.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i1", EventID: evt4.ID}); err != nil { + t.Fatal(err) + } + if _, err := eventInstanceDao.Create(ctx, &api.EventInstance{InstanceID: "i3", EventID: evt5.ID}); err != nil { + t.Fatal(err) + } + + // start the controller + go func() { + s := &server.ControllersServer{ + KindControllerManager: controllers.NewKindControllerManager( + db.NewAdvisoryLockFactory(h.Env().Database.SessionFactory), + h.Env().Services.Events(), + ), + StatusController: controllers.NewStatusController( + h.Env().Services.StatusEvents(), + dao.NewInstanceDao(&h.Env().Database.SessionFactory), + dao.NewEventInstanceDao(&h.Env().Database.SessionFactory), + ), + } + + s.Start(ctx) + }() + + purged := []string{evt1.ID, evt2.ID} + remained := []string{evt3.ID, evt4.ID, evt5.ID} + Eventually(func() error { + events, err := statusEventDao.FindByIDs(ctx, remained) + if err != nil { + return err + } + + if len(events) != 3 { + return fmt.Errorf("should have events %s remained, but got %v", remained, events) + } + + events, err = statusEventDao.FindByIDs(ctx, purged) + if err != nil { + return err + } + + if len(events) != 0 { + return fmt.Errorf("should purge the events %s, but got %+v", purged, events) + } + + eventInstances, err := eventInstanceDao.FindStatusEvents(ctx, purged) + if err != nil { + return err + } + if len(eventInstances) != 0 { + return fmt.Errorf("should purge the event-instances %s, but got %+v", purged, eventInstances) + } + + if _, err := eventInstanceDao.Get(ctx, evt3.ID, "i1"); err != nil { + return fmt.Errorf("%s-%s is not found", "e3", "i1") + } + if _, err := eventInstanceDao.Get(ctx, evt3.ID, "i2"); err != nil { + return fmt.Errorf("%s-%s is not found", "e3", "i2") + } + if _, err := eventInstanceDao.Get(ctx, evt4.ID, "i1"); err != nil { + return fmt.Errorf("%s-%s is not found", "e4", "i1") + } + if _, err := eventInstanceDao.Get(ctx, evt5.ID, "i3"); err != nil { + return fmt.Errorf("%s-%s is not found", "e5", "i3") + } + + return nil + }, 5*time.Second, 1*time.Second).Should(Succeed()) + + // cleanup + for _, evtID := range remained { + if err := statusEventDao.Delete(ctx, evtID); err != nil { + t.Fatal(err) + } + } + if err := instanceDao.DeleteByIDs(ctx, []string{"i1", "i2", "i3"}); err != nil { + t.Fatal(err) + } + + // cancel the context to stop the controller manager + cancel() +} diff --git a/test/registration.go b/test/registration.go index 1897a89a..f7e6804b 100755 --- a/test/registration.go +++ b/test/registration.go @@ -15,8 +15,12 @@ func RegisterIntegration(t *testing.T) (*Helper, *openapi.APIClient) { gm.RegisterTestingT(t) // Create a new helper helper := NewHelper(t) + // Reset the database to a seeded blank state - helper.ResetDB() + if err := helper.ResetDB(); err != nil { + panic(err) + } + // Create an api client client := helper.NewApiClient()