Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
# Conflicts:
#	backend/controller/console/console.go
#	backend/controller/controller.go
#	backend/controller/timeline/events_async.go
#	backend/controller/timeline/events_call.go
#	backend/controller/timeline/events_cron.go
#	backend/controller/timeline/events_ingress.go
#	backend/controller/timeline/events_log.go
#	backend/controller/timeline/events_pubsub_consume.go
#	backend/controller/timeline/events_pubsub_publish.go
#	backend/controller/timeline/timeline.go

# Conflicts:
#	backend/controller/controller.go
#	backend/controller/timeline/timeline.go

# Conflicts:
#	backend/controller/console/console.go
  • Loading branch information
matt2e committed Dec 6, 2024
1 parent 016d8ff commit 83246ed
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 4 deletions.
6 changes: 3 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

timeline.Publish(ctx, &timeline.Log{
timeline.Publish(ctx, timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: requestKey,
Time: msg.TimeStamp.AsTime(),
Expand Down Expand Up @@ -900,7 +900,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldepl
routes := s.routeTable.Current()
route, ok := routes.GetDeployment(module).Get()
if ok {
timeline.Publish(ctx, &timeline.PubSubPublish{
timeline.Publish(ctx, timeline.PubSubPublish{
DeploymentKey: route,
RequestKey: requestKey,
Time: now,
Expand Down Expand Up @@ -1258,7 +1258,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
if e, ok := err.Get(); ok {
errStr = optional.Some(e.Error())
}
timeline.Publish(ctx, &timeline.AsyncExecute{
timeline.Publish(ctx, timeline.AsyncExecute{
DeploymentKey: deployment,
RequestKey: call.ParentRequestKey,
EventType: eventType,
Expand Down
58 changes: 58 additions & 0 deletions backend/timeline/events_cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package timeline

// TODO: cron service needs to call this
// type CronScheduled struct {
// DeploymentKey model.DeploymentKey
// Verb schema.Ref

// Time time.Time
// ScheduledAt time.Time
// Schedule string
// Error optional.Option[string]
// }

// func (e *CronScheduled) toEvent() (Event, error) { //nolint:unparam
// return &CronScheduledEvent{
// CronScheduled: *e,
// Duration: time.Since(e.Time),
// }, nil
// }

// type eventCronScheduledJSON struct {
// DurationMS int64 `json:"duration_ms"`
// ScheduledAt time.Time `json:"scheduled_at"`
// Schedule string `json:"schedule"`
// Error optional.Option[string] `json:"error,omitempty"`
// }

// func (s *Service) insertCronScheduledEvent(ctx context.Context, querier sql.Querier, event *CronScheduledEvent) error {
// cronJSON := eventCronScheduledJSON{
// DurationMS: event.Duration.Milliseconds(),
// ScheduledAt: event.ScheduledAt,
// Schedule: event.Schedule,
// Error: event.Error,
// }

// data, err := json.Marshal(cronJSON)
// if err != nil {
// return fmt.Errorf("failed to marshal cron JSON: %w", err)
// }

// var payload ftlencryption.EncryptedTimelineColumn
// err = s.encryption.EncryptJSON(json.RawMessage(data), &payload)
// if err != nil {
// return fmt.Errorf("failed to encrypt cron JSON: %w", err)
// }

// err = libdal.TranslatePGError(querier.InsertTimelineCronScheduledEvent(ctx, sql.InsertTimelineCronScheduledEventParams{
// DeploymentKey: event.DeploymentKey,
// TimeStamp: event.Time,
// Module: event.Verb.Module,
// Verb: event.Verb.Name,
// Payload: payload,
// }))
// if err != nil {
// return fmt.Errorf("failed to insert cron event: %w", err)
// }
// return err
// }
3 changes: 2 additions & 1 deletion backend/timeline/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ func TestDeleteOldEvents(t *testing.T) {
}

// Delete half the events (everything older than 3 seconds)
_, err := service.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{
resp, err := service.DeleteOldEvents(ctx, connect.NewRequest(&timelinepb.DeleteOldEventsRequest{
AgeSeconds: 3,
EventType: timelinepb.EventType_EVENT_TYPE_UNSPECIFIED,
}))
assert.NoError(t, err)
assert.Equal(t, len(service.events), 150, "expected only half the events to be deleted")
assert.Equal(t, resp.Msg.DeletedCount, 150, "expected half the events to be in the deletion count")
}

0 comments on commit 83246ed

Please sign in to comment.