diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index bb459a6be1..2de3026166 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -366,7 +366,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter return query, nil } -func eventDALToProto(event timeline.TimelineEvent) *pbconsole.Event { +func eventDALToProto(event timeline.Event) *pbconsole.Event { switch event := event.(type) { case *timeline.CallEvent: var requestKey *string diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 796f23e123..21fe766dac 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -478,7 +478,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie requestKey = optional.Some(rkey) } - err = s.timeline.InsertLogEvent(ctx, &timeline.Log{ + s.timeline.EnqueueEvent(ctx, &timeline.Log{ DeploymentKey: deploymentKey, RequestKey: requestKey, Time: msg.TimeStamp.AsTime(), @@ -487,10 +487,6 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie Message: msg.Message, Error: optional.Ptr(msg.Error), }) - - if err != nil { - return nil, err - } } if stream.Err() != nil { return nil, stream.Err() @@ -1080,7 +1076,7 @@ func (s *Service) callWithRequest( callResponse = either.RightOf[*ftlv1.CallResponse](err) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) } - s.timeline.InsertCallEvent(ctx, &timeline.Call{ + s.timeline.EnqueueEvent(ctx, &timeline.Call{ DeploymentKey: route.Deployment, RequestKey: requestKey, ParentRequestKey: parentKey, diff --git a/backend/controller/deployment_logs.go b/backend/controller/deployment_logs.go index 1336d881c2..48a90b1ce9 100644 --- a/backend/controller/deployment_logs.go +++ b/backend/controller/deployment_logs.go @@ -71,7 +71,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) { errorStr = optional.Some(entry.Error.Error()) } - err = d.timeline.InsertLogEvent(ctx, &timeline.Log{ + d.timeline.EnqueueEvent(ctx, &timeline.Log{ RequestKey: request, DeploymentKey: deployment, Time: entry.Time, @@ -80,9 +80,6 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) { Message: entry.Message, Error: errorStr, }) - if err != nil { - fmt.Printf("failed to insert log entry: %v :: error: %v\n", entry, err) - } case <-ctx.Done(): return case <-time.After(1 * time.Second): diff --git a/backend/controller/ingress/handler.go b/backend/controller/ingress/handler.go index bc884f42be..4c1b03830b 100644 --- a/backend/controller/ingress/handler.go +++ b/backend/controller/ingress/handler.go @@ -153,7 +153,7 @@ func Handle( if err == nil { observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]()) ingressEvent.Response.Body = io.NopCloser(strings.NewReader(string(rawBody))) - timelineService.InsertHTTPIngress(r.Context(), &ingressEvent) + timelineService.EnqueueEvent(r.Context(), &ingressEvent) } else { logger.Errorf(err, "could not write response body") observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body")) @@ -176,7 +176,7 @@ func recordIngressErrorEvent( ) { ingressEvent.Response.StatusCode = statusCode ingressEvent.Error = optional.Some(errorMsg) - timelineService.InsertHTTPIngress(ctx, ingressEvent) + timelineService.EnqueueEvent(ctx, ingressEvent) } // Copied from the Apache-licensed connect-go source. diff --git a/backend/controller/observability/observability.go b/backend/controller/observability/observability.go index 2d5d6602ad..3c3d92e6e1 100644 --- a/backend/controller/observability/observability.go +++ b/backend/controller/observability/observability.go @@ -18,6 +18,7 @@ var ( PubSub *PubSubMetrics Cron *CronMetrics Controller *ControllerTracing + Timeline *TimelineMetrics ) func init() { @@ -39,8 +40,10 @@ func init() { Cron, err = initCronMetrics() errs = errors.Join(errs, err) Controller = initControllerTracing() + Timeline, err = initTimelineMetrics() + errs = errors.Join(errs, err) - if err != nil { + if errs != nil { panic(fmt.Errorf("could not initialize controller metrics: %w", errs)) } } diff --git a/backend/controller/observability/timeline.go b/backend/controller/observability/timeline.go new file mode 100644 index 0000000000..7bdb02cadf --- /dev/null +++ b/backend/controller/observability/timeline.go @@ -0,0 +1,63 @@ +package observability + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" +) + +const ( + timelineMeterName = "ftl.timeline" +) + +type TimelineMetrics struct { + inserted metric.Int64Counter + dropped metric.Int64Counter + failed metric.Int64Counter +} + +func initTimelineMetrics() (*TimelineMetrics, error) { + result := &TimelineMetrics{ + inserted: noop.Int64Counter{}, + dropped: noop.Int64Counter{}, + failed: noop.Int64Counter{}, + } + + var err error + meter := otel.Meter(timelineMeterName) + + signalName := fmt.Sprintf("%s.inserted", timelineMeterName) + if result.inserted, err = meter.Int64Counter(signalName, metric.WithUnit("1"), + metric.WithDescription("the number of times a timeline event was inserted")); err != nil { + return nil, wrapErr(signalName, err) + } + + signalName = fmt.Sprintf("%s.dropped", timelineMeterName) + if result.dropped, err = meter.Int64Counter(signalName, metric.WithUnit("1"), + metric.WithDescription("the number of times a timeline event was dropped due to the queue being full")); err != nil { + return nil, wrapErr(signalName, err) + } + + signalName = fmt.Sprintf("%s.failed", timelineMeterName) + if result.dropped, err = meter.Int64Counter(signalName, metric.WithUnit("1"), + metric.WithDescription("the number of times a timeline event failed to be inserted into the database")); err != nil { + return nil, wrapErr(signalName, err) + } + + return result, nil +} + +func (m *TimelineMetrics) Inserted(ctx context.Context, count int) { + m.inserted.Add(ctx, int64(count)) +} + +func (m *TimelineMetrics) Dropped(ctx context.Context) { + m.dropped.Add(ctx, 1) +} + +func (m *TimelineMetrics) Failed(ctx context.Context, count int) { + m.failed.Add(ctx, int64(count)) +} diff --git a/backend/controller/timeline/events_call.go b/backend/controller/timeline/events_call.go index 6f891296db..8b828e33f9 100644 --- a/backend/controller/timeline/events_call.go +++ b/backend/controller/timeline/events_call.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "time" "github.com/alecthomas/types/either" @@ -14,7 +15,6 @@ import ( "github.com/TBD54566975/ftl/backend/libdal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) @@ -55,8 +55,9 @@ type Call struct { Response either.Either[*ftlv1.CallResponse, error] } -func (s *Service) InsertCallEvent(ctx context.Context, call *Call) { - logger := log.FromContext(ctx) +func (c *Call) inEvent() {} + +func (s *Service) insertCallEvent(ctx context.Context, querier sql.Querier, call *Call) error { callEvent := callToCallEvent(call) var sourceModule, sourceVerb optional.Option[string] @@ -84,18 +85,16 @@ func (s *Service) InsertCallEvent(ctx context.Context, call *Call) { data, err := json.Marshal(callJSON) if err != nil { - logger.Errorf(err, "failed to marshal call event") - return + return fmt.Errorf("failed to marshal call event: %w", err) } var payload ftlencryption.EncryptedTimelineColumn err = s.encryption.EncryptJSON(json.RawMessage(data), &payload) if err != nil { - logger.Errorf(err, "failed to encrypt call event") - return + return fmt.Errorf("failed to encrypt call event: %w", err) } - err = libdal.TranslatePGError(s.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{ + err = libdal.TranslatePGError(querier.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{ DeploymentKey: call.DeploymentKey, RequestKey: requestKey, ParentRequestKey: parentRequestKey, @@ -107,8 +106,9 @@ func (s *Service) InsertCallEvent(ctx context.Context, call *Call) { Payload: payload, })) if err != nil { - logger.Errorf(err, "failed to insert call event") + return fmt.Errorf("failed to insert call event: %w", err) } + return nil } func callToCallEvent(call *Call) *CallEvent { diff --git a/backend/controller/timeline/events_ingress.go b/backend/controller/timeline/events_ingress.go index 61d67be437..4f8d476504 100644 --- a/backend/controller/timeline/events_ingress.go +++ b/backend/controller/timeline/events_ingress.go @@ -3,6 +3,7 @@ package timeline import ( "context" "encoding/json" + "fmt" "io" "net/http" "time" @@ -13,7 +14,6 @@ import ( "github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) @@ -60,13 +60,12 @@ type Ingress struct { Error optional.Option[string] } -func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) { - logger := log.FromContext(ctx) +func (*Ingress) inEvent() {} +func (s *Service) insertHTTPIngress(ctx context.Context, querier sql.Querier, ingress *Ingress) error { requestBody, err := io.ReadAll(ingress.Request.Body) if err != nil { - logger.Errorf(err, "failed to read request body") - return + return fmt.Errorf("failed to read request body: %w", err) } if len(requestBody) == 0 { requestBody = []byte("{}") @@ -76,8 +75,7 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) { if ingress.Response.Body != nil { responseBody, err = io.ReadAll(ingress.Response.Body) if err != nil { - logger.Errorf(err, "failed to read response body") - return + return fmt.Errorf("failed to read response body: %w", err) } } @@ -87,8 +85,7 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) { reqHeaderBytes, err := json.Marshal(ingress.Request.Header) if err != nil { - logger.Errorf(err, "failed to marshal request header") - return + return fmt.Errorf("failed to marshal request header: %w", err) } if len(reqHeaderBytes) == 0 { reqHeaderBytes = []byte("{}") @@ -96,8 +93,7 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) { respHeaderBytes, err := json.Marshal(ingress.Response.Header) if err != nil { - logger.Errorf(err, "failed to marshal response header") - return + return fmt.Errorf("failed to marshal response header: %w", err) } if len(respHeaderBytes) == 0 { respHeaderBytes = []byte("{}") @@ -117,18 +113,16 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) { data, err := json.Marshal(ingressJSON) if err != nil { - logger.Errorf(err, "failed to marshal ingress JSON") - return + return fmt.Errorf("failed to marshal ingress JSON: %w", err) } var payload ftlencryption.EncryptedTimelineColumn err = s.encryption.EncryptJSON(json.RawMessage(data), &payload) if err != nil { - logger.Errorf(err, "failed to encrypt ingress payload") - return + return fmt.Errorf("failed to encrypt ingress payload: %w", err) } - err = libdal.TranslatePGError(s.db.InsertTimelineIngressEvent(ctx, sql.InsertTimelineIngressEventParams{ + err = libdal.TranslatePGError(querier.InsertTimelineIngressEvent(ctx, sql.InsertTimelineIngressEventParams{ DeploymentKey: ingress.DeploymentKey, RequestKey: optional.Some(ingress.RequestKey.String()), TimeStamp: ingress.StartTime, @@ -138,6 +132,7 @@ func (s *Service) InsertHTTPIngress(ctx context.Context, ingress *Ingress) { Payload: payload, })) if err != nil { - logger.Errorf(err, "failed to insert ingress event") + return fmt.Errorf("failed to insert ingress event: %w", err) } + return nil } diff --git a/backend/controller/timeline/events_log.go b/backend/controller/timeline/events_log.go index 7c0317866d..dcfd6d0048 100644 --- a/backend/controller/timeline/events_log.go +++ b/backend/controller/timeline/events_log.go @@ -24,6 +24,8 @@ type Log struct { Error optional.Option[string] } +func (l *Log) inEvent() {} + type LogEvent struct { ID int64 Log @@ -38,7 +40,7 @@ type eventLogJSON struct { Error optional.Option[string] `json:"error,omitempty"` } -func (s *Service) InsertLogEvent(ctx context.Context, log *Log) error { +func (s *Service) insertLogEvent(ctx context.Context, querier sql.Querier, log *Log) error { var requestKey optional.Option[string] if name, ok := log.RequestKey.Get(); ok { requestKey = optional.Some(name.String()) @@ -61,7 +63,7 @@ func (s *Service) InsertLogEvent(ctx context.Context, log *Log) error { return fmt.Errorf("failed to encrypt log payload: %w", err) } - return libdal.TranslatePGError(s.db.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{ + return libdal.TranslatePGError(querier.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{ DeploymentKey: log.DeploymentKey, RequestKey: requestKey, TimeStamp: log.Time, diff --git a/backend/controller/timeline/query.go b/backend/controller/timeline/query.go index 02d3858fe2..5bb0317aa8 100644 --- a/backend/controller/timeline/query.go +++ b/backend/controller/timeline/query.go @@ -103,7 +103,7 @@ func FilterDescending() TimelineFilter { } } -func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]TimelineEvent, error) { +func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]Event, error) { if limit < 1 { return nil, fmt.Errorf("limit must be >= 1, got %d", limit) } @@ -158,7 +158,7 @@ func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...Timel deploymentQuery += ` WHERE key = ANY($1::TEXT[])` deploymentArgs = append(deploymentArgs, filter.deployments) } - rows, err := s.Handle.Connection.QueryContext(ctx, deploymentQuery, deploymentArgs...) + rows, err := s.conn.QueryContext(ctx, deploymentQuery, deploymentArgs...) if err != nil { return nil, libdal.TranslatePGError(err) } @@ -214,7 +214,7 @@ func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...Timel q += fmt.Sprintf(" LIMIT %d", limit) // Issue query. - rows, err = s.Handle.Connection.QueryContext(ctx, q, args...) + rows, err = s.conn.QueryContext(ctx, q, args...) if err != nil { return nil, fmt.Errorf("%s: %w", q, libdal.TranslatePGError(err)) } @@ -227,8 +227,8 @@ func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...Timel return events, nil } -func (s *Service) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]TimelineEvent, error) { - var out []TimelineEvent +func (s *Service) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]Event, error) { + var out []Event for rows.Next() { row := eventRow{} var deploymentID int64 diff --git a/backend/controller/timeline/timeline.go b/backend/controller/timeline/timeline.go index a3cf3924e8..b4655eb374 100644 --- a/backend/controller/timeline/timeline.go +++ b/backend/controller/timeline/timeline.go @@ -2,12 +2,16 @@ package timeline import ( "context" + stdsql "database/sql" + "fmt" "time" "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql" "github.com/TBD54566975/ftl/backend/libdal" + "github.com/TBD54566975/ftl/internal/log" ) type EventType = sql.EventType @@ -19,39 +23,118 @@ const ( EventTypeDeploymentCreated = sql.EventTypeDeploymentCreated EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated EventTypeIngress = sql.EventTypeIngress + + maxBatchSize = 16 + maxBatchDelay = 100 * time.Millisecond ) -// TimelineEvent types. +// Event types. // //sumtype:decl -type TimelineEvent interface { +type Event interface { GetID() int64 event() } +// InEvent is a marker interface for events that are inserted into the timeline. +type InEvent interface { + inEvent() +} + type Service struct { - *libdal.Handle[Service] - db sql.Querier + ctx context.Context + conn *stdsql.DB encryption *encryption.Service + events chan InEvent } -func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *Service { +func New(ctx context.Context, conn *stdsql.DB, encryption *encryption.Service) *Service { var s *Service + events := make(chan InEvent, 1000) s = &Service{ - db: sql.New(conn), + ctx: ctx, + conn: conn, encryption: encryption, - Handle: libdal.New(conn, func(h *libdal.Handle[Service]) *Service { - return &Service{ - Handle: h, - db: sql.New(h.Connection), - encryption: s.encryption, - } - }), + events: events, } + go s.processEvents() return s } func (s *Service) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { - count, err := s.db.DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType) + count, err := sql.New(s.conn).DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType) return count, libdal.TranslatePGError(err) } + +// EnqueueEvent asynchronously enqueues an event for insertion into the timeline. +func (s *Service) EnqueueEvent(ctx context.Context, event InEvent) { + select { + case s.events <- event: + default: + log.FromContext(ctx).Warnf("Dropping event %T due to full queue", event) + } +} + +func (s *Service) processEvents() { + lastFlush := time.Now() + buffer := make([]InEvent, 0, maxBatchSize) + for { + select { + case event := <-s.events: + buffer = append(buffer, event) + + if len(buffer) < maxBatchSize || time.Since(lastFlush) < maxBatchDelay { + continue + } + s.flushEvents(buffer) + buffer = nil + + case <-time.After(maxBatchDelay): + if len(buffer) == 0 { + continue + } + s.flushEvents(buffer) + buffer = nil + } + } +} + +// Flush all events in the buffer to the database in a single transaction. +func (s *Service) flushEvents(events []InEvent) { + logger := log.FromContext(s.ctx).Scope("timeline") + tx, err := s.conn.Begin() + if err != nil { + logger.Errorf(err, "Failed to start transaction") + return + } + querier := sql.New(tx) + var lastError error + failures := 0 + for _, event := range events { + var err error + switch e := event.(type) { + case *Call: + err = s.insertCallEvent(s.ctx, querier, e) + case *Log: + err = s.insertLogEvent(s.ctx, querier, e) + case *Ingress: + err = s.insertHTTPIngress(s.ctx, querier, e) + default: + panic(fmt.Sprintf("unexpected event type: %T", e)) + } + if err != nil { + lastError = err + failures++ + } + } + err = tx.Commit() + if err != nil { + failures = len(events) + lastError = err + } + if lastError != nil { + logger.Errorf(lastError, "Failed to insert %d events, most recent error", failures) + observability.Timeline.Failed(s.ctx, failures) + } + observability.Timeline.Inserted(s.ctx, len(events)-failures) +} diff --git a/backend/controller/timeline/timeline_test.go b/backend/controller/timeline/timeline_test.go index 346264803b..e3bdef9b4e 100644 --- a/backend/controller/timeline/timeline_test.go +++ b/backend/controller/timeline/timeline_test.go @@ -85,8 +85,8 @@ func TestTimeline(t *testing.T) { t.Run("InsertCallEvent", func(t *testing.T) { call := callEventToCall(callEvent) - timeline.InsertCallEvent(ctx, call) - assert.NoError(t, err) + timeline.EnqueueEvent(ctx, call) + time.Sleep(200 * time.Millisecond) }) logEvent := &LogEvent{ @@ -100,8 +100,8 @@ func TestTimeline(t *testing.T) { }, } t.Run("InsertLogEntry", func(t *testing.T) { - err = timeline.InsertLogEvent(ctx, &logEvent.Log) - assert.NoError(t, err) + timeline.EnqueueEvent(ctx, &logEvent.Log) + time.Sleep(200 * time.Millisecond) }) ingressEvent := &IngressEvent{ @@ -119,7 +119,7 @@ func TestTimeline(t *testing.T) { } t.Run("InsertHTTPIngressEvent", func(t *testing.T) { - timeline.InsertHTTPIngress(ctx, &Ingress{ + timeline.EnqueueEvent(ctx, &Ingress{ DeploymentKey: ingressEvent.DeploymentKey, RequestKey: ingressEvent.RequestKey.MustGet(), StartTime: ingressEvent.Time, @@ -136,7 +136,7 @@ func TestTimeline(t *testing.T) { Header: http.Header(map[string][]string{"response": {"header"}}), }, }) - assert.NoError(t, err) + time.Sleep(200 * time.Millisecond) }) expectedDeploymentUpdatedEvent := &DeploymentUpdatedEvent{ @@ -154,36 +154,36 @@ func TestTimeline(t *testing.T) { t.Run("NoFilters", func(t *testing.T) { events, err := timeline.QueryTimeline(ctx, 1000) assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent}, events) + assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent}, events) }) t.Run("ByDeployment", func(t *testing.T) { events, err := timeline.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey)) assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent}, events) + assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent}, events) }) t.Run("ByCall", func(t *testing.T) { events, err := timeline.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{callEvent}, events) + assertEventsEqual(t, []Event{callEvent}, events) }) t.Run("ByLogLevel", func(t *testing.T) { events, err := timeline.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{logEvent}, events) + assertEventsEqual(t, []Event{logEvent}, events) }) t.Run("ByRequests", func(t *testing.T) { events, err := timeline.QueryTimeline(ctx, 1000, FilterRequests(requestKey)) assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{callEvent, logEvent, ingressEvent}, events) + assertEventsEqual(t, []Event{callEvent, logEvent, ingressEvent}, events) }) }) } -func normaliseEvents(events []TimelineEvent) []TimelineEvent { +func normaliseEvents(events []Event) []Event { for i := range events { event := events[i] re := reflect.Indirect(reflect.ValueOf(event)) @@ -197,9 +197,9 @@ func normaliseEvents(events []TimelineEvent) []TimelineEvent { return events } -func assertEventsEqual(t *testing.T, expected, actual []TimelineEvent) { +func assertEventsEqual(t *testing.T, expected, actual []Event) { t.Helper() - assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual)) + assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual), assert.Exclude[time.Duration](), assert.Exclude[time.Time]()) } func TestDeleteOldEvents(t *testing.T) { @@ -245,8 +245,8 @@ func TestDeleteOldEvents(t *testing.T) { t.Run("InsertCallEvent", func(t *testing.T) { call := callEventToCall(callEvent) - timeline.InsertCallEvent(ctx, call) - assert.NoError(t, err) + timeline.EnqueueEvent(ctx, call) + time.Sleep(200 * time.Millisecond) }) // hour old event callEvent = &CallEvent{ @@ -259,8 +259,8 @@ func TestDeleteOldEvents(t *testing.T) { } t.Run("InsertCallEvent", func(t *testing.T) { call := callEventToCall(callEvent) - timeline.InsertCallEvent(ctx, call) - assert.NoError(t, err) + timeline.EnqueueEvent(ctx, call) + time.Sleep(200 * time.Millisecond) }) // week old event @@ -275,8 +275,8 @@ func TestDeleteOldEvents(t *testing.T) { }, } t.Run("InsertLogEntry", func(t *testing.T) { - err = timeline.InsertLogEvent(ctx, &logEvent.Log) - assert.NoError(t, err) + timeline.EnqueueEvent(ctx, &logEvent.Log) + time.Sleep(200 * time.Millisecond) }) // hour old event @@ -291,8 +291,8 @@ func TestDeleteOldEvents(t *testing.T) { }, } t.Run("InsertLogEntry", func(t *testing.T) { - err = timeline.InsertLogEvent(ctx, &logEvent.Log) - assert.NoError(t, err) + timeline.EnqueueEvent(ctx, &logEvent.Log) + time.Sleep(200 * time.Millisecond) }) t.Run("DeleteOldEvents", func(t *testing.T) {