From 09ddae17a3e5e4078e73dd5099c52d6e31dfda7c Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 6 Dec 2024 14:41:02 +1100 Subject: [PATCH] wip # Conflicts: # backend/controller/console/console.go # backend/controller/controller.go # backend/controller/timeline/events_async.go # backend/controller/timeline/events_call.go # backend/controller/timeline/events_cron.go # backend/controller/timeline/events_ingress.go # backend/controller/timeline/events_log.go # backend/controller/timeline/events_pubsub_consume.go # backend/controller/timeline/events_pubsub_publish.go # backend/controller/timeline/timeline.go # Conflicts: # backend/controller/controller.go # backend/controller/timeline/timeline.go # Conflicts: # backend/controller/console/console.go --- backend/controller/controller.go | 6 ++-- backend/timeline/events_cron.go | 58 ++++++++++++++++++++++++++++++++ backend/timeline/service_test.go | 3 +- 3 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 backend/timeline/events_cron.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 49eefc1662..a38eefb8b5 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -462,7 +462,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie requestKey = optional.Some(rkey) } - timeline.Publish(ctx, &timeline.Log{ + timeline.Publish(ctx, timeline.Log{ DeploymentKey: deploymentKey, RequestKey: requestKey, Time: msg.TimeStamp.AsTime(), @@ -899,7 +899,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldepl routes := s.routeTable.Current() route, ok := routes.GetDeployment(module).Get() if ok { - timeline.Publish(ctx, &timeline.PubSubPublish{ + timeline.Publish(ctx, timeline.PubSubPublish{ DeploymentKey: route, RequestKey: requestKey, Time: now, @@ -1257,7 +1257,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration if e, ok := err.Get(); ok { errStr = optional.Some(e.Error()) } - timeline.Publish(ctx, &timeline.AsyncExecute{ + timeline.Publish(ctx, timeline.AsyncExecute{ DeploymentKey: deployment, RequestKey: call.ParentRequestKey, EventType: eventType, diff --git a/backend/timeline/events_cron.go b/backend/timeline/events_cron.go new file mode 100644 index 0000000000..f5d5d89f12 --- /dev/null +++ b/backend/timeline/events_cron.go @@ -0,0 +1,58 @@ +package timeline + +// TODO: cron service needs to call this +// type CronScheduled struct { +// DeploymentKey model.DeploymentKey +// Verb schema.Ref + +// Time time.Time +// ScheduledAt time.Time +// Schedule string +// Error optional.Option[string] +// } + +// func (e *CronScheduled) toEvent() (Event, error) { //nolint:unparam +// return &CronScheduledEvent{ +// CronScheduled: *e, +// Duration: time.Since(e.Time), +// }, nil +// } + +// type eventCronScheduledJSON struct { +// DurationMS int64 `json:"duration_ms"` +// ScheduledAt time.Time `json:"scheduled_at"` +// Schedule string `json:"schedule"` +// Error optional.Option[string] `json:"error,omitempty"` +// } + +// func (s *Service) insertCronScheduledEvent(ctx context.Context, querier sql.Querier, event *CronScheduledEvent) error { +// cronJSON := eventCronScheduledJSON{ +// DurationMS: event.Duration.Milliseconds(), +// ScheduledAt: event.ScheduledAt, +// Schedule: event.Schedule, +// Error: event.Error, +// } + +// data, err := json.Marshal(cronJSON) +// if err != nil { +// return fmt.Errorf("failed to marshal cron JSON: %w", err) +// } + +// var payload ftlencryption.EncryptedTimelineColumn +// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload) +// if err != nil { +// return fmt.Errorf("failed to encrypt cron JSON: %w", err) +// } + +// err = libdal.TranslatePGError(querier.InsertTimelineCronScheduledEvent(ctx, sql.InsertTimelineCronScheduledEventParams{ +// DeploymentKey: event.DeploymentKey, +// TimeStamp: event.Time, +// Module: event.Verb.Module, +// Verb: event.Verb.Name, +// Payload: payload, +// })) +// if err != nil { +// return fmt.Errorf("failed to insert cron event: %w", err) +// } +// return err +// } diff --git a/backend/timeline/service_test.go b/backend/timeline/service_test.go index b598c66fc2..5b48af22b6 100644 --- a/backend/timeline/service_test.go +++ b/backend/timeline/service_test.go @@ -108,10 +108,11 @@ func TestDeleteOldEvents(t *testing.T) { } // Delete half the events (everything older than 3 seconds) - _, err := service.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{ + resp, err := service.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{ AgeSeconds: 3, EventType: timelinepb.EventType_EVENT_TYPE_UNSPECIFIED, })) assert.NoError(t, err) assert.Equal(t, len(service.events), 150, "expected only half the events to be deleted") + assert.Equal(t, resp.Msg.DeletedCount, 150, "expected half the events to be in the deletion count") }