diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index 23cbd9d3d2..67d9ea296b 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -195,7 +195,7 @@ func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbc // Get 1 more than the requested limit to determine if there are more results. limitPlusOne := limit + 1 - results, err := c.dal.QueryEvents(ctx, limitPlusOne, query...) + results, err := c.dal.QueryTimeline(ctx, limitPlusOne, query...) if err != nil { return nil, err } @@ -241,7 +241,7 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[ newQuery = append(newQuery, dal.FilterTimeRange(thisRequestTime, lastEventTime)) } - events, err := c.dal.QueryEvents(ctx, int(req.Msg.Query.Limit), newQuery...) + events, err := c.dal.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...) if err != nil { return err } @@ -264,8 +264,8 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[ } } -func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error) { - var query []dal.EventFilter +func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, error) { + var query []dal.TimelineFilter if pb.Order == pbconsole.EventsQuery_DESC { query = append(query, dal.FilterDescending()) @@ -357,7 +357,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error) return query, nil } -func eventDALToProto(event dal.Event) *pbconsole.Event { +func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event { switch event := event.(type) { case *dal.CallEvent: var requestKey *string diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index 731679ea0f..5c2780936a 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -435,20 +435,6 @@ type EncryptionKey struct { CreatedAt time.Time } -type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage - ParentRequestID optional.Option[string] -} - type FsmInstance struct { ID int64 CreatedAt time.Time @@ -520,6 +506,20 @@ type Runner struct { Labels json.RawMessage } +type Timeline struct { + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload []byte + ParentRequestID optional.Option[string] +} + type Topic struct { ID int64 Key model.TopicKey diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 9e1f7b4ac1..7172fe71dc 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -709,14 +709,14 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey return dalerrs.TranslatePGError(err) } } - payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]interface{}{ + payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]interface{}{ "prev_min_replicas": deployment.MinReplicas, "min_replicas": minReplicas, }) if err != nil { return fmt.Errorf("failed to encrypt payload for InsertDeploymentUpdatedEvent: %w", err) } - err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{ + err = tx.InsertTimelineDeploymentUpdatedEvent(ctx, sql.InsertTimelineDeploymentUpdatedEventParams{ DeploymentKey: key, Payload: payload, }) @@ -782,7 +782,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl } } - payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{ + payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]any{ "min_replicas": int32(minReplicas), "replaced": replacedDeploymentKey, }) @@ -790,7 +790,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl return fmt.Errorf("replace deployment failed to encrypt payload: %w", err) } - err = tx.InsertDeploymentCreatedEvent(ctx, sql.InsertDeploymentCreatedEventParams{ + err = tx.InsertTimelineDeploymentCreatedEvent(ctx, sql.InsertTimelineDeploymentCreatedEventParams{ DeploymentKey: newDeploymentKey, Language: newDeployment.Language, ModuleName: newDeployment.ModuleName, @@ -1057,11 +1057,11 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error { "error": log.Error, "stack": log.Stack, } - encryptedPayload, err := d.encryptJSON(encryption.LogsSubKey, payload) + encryptedPayload, err := d.encryptJSON(encryption.TimelineSubKey, payload) if err != nil { return fmt.Errorf("failed to encrypt log payload: %w", err) } - return dalerrs.TranslatePGError(d.db.InsertLogEvent(ctx, sql.InsertLogEventParams{ + return dalerrs.TranslatePGError(d.db.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{ DeploymentKey: log.DeploymentKey, RequestKey: requestKey, TimeStamp: log.Time, @@ -1137,7 +1137,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { if pr, ok := call.ParentRequestKey.Get(); ok { parentRequestKey = optional.Some(pr.String()) } - payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{ + payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]any{ "duration_ms": call.Duration.Milliseconds(), "request": call.Request, "response": call.Response, @@ -1147,7 +1147,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { if err != nil { return fmt.Errorf("failed to encrypt call payload: %w", err) } - return dalerrs.TranslatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{ + return dalerrs.TranslatePGError(d.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{ DeploymentKey: call.DeploymentKey, RequestKey: requestKey, ParentRequestKey: parentRequestKey, @@ -1161,7 +1161,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { } func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { - count, err := d.db.DeleteOldEvents(ctx, sqltypes.Duration(age), eventType) + count, err := d.db.DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType) return count, dalerrs.TranslatePGError(err) } diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index ee3618c442..43ed5e9c4c 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -263,39 +263,39 @@ func TestDAL(t *testing.T) { t.Run("QueryEvents", func(t *testing.T) { t.Run("Limit", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1) + events, err := dal.QueryTimeline(ctx, 1) assert.NoError(t, err) assert.Equal(t, 1, len(events)) }) t.Run("NoFilters", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000) + events, err := dal.QueryTimeline(ctx, 1000) assert.NoError(t, err) - assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) + assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) }) t.Run("ByDeployment", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000, FilterDeployments(deploymentKey)) + events, err := dal.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey)) assert.NoError(t, err) - assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) + assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) }) t.Run("ByCall", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) + events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) assert.NoError(t, err) - assertEventsEqual(t, []Event{callEvent}, events) + assertEventsEqual(t, []TimelineEvent{callEvent}, events) }) t.Run("ByLogLevel", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) + events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) assert.NoError(t, err) - assertEventsEqual(t, []Event{logEvent}, events) + assertEventsEqual(t, []TimelineEvent{logEvent}, events) }) t.Run("ByRequests", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000, FilterRequests(requestKey)) + events, err := dal.QueryTimeline(ctx, 1000, FilterRequests(requestKey)) assert.NoError(t, err) - assertEventsEqual(t, []Event{callEvent, logEvent}, events) + assertEventsEqual(t, []TimelineEvent{callEvent, logEvent}, events) }) }) @@ -386,7 +386,7 @@ func TestRunnerStateFromProto(t *testing.T) { assert.Equal(t, RunnerStateIdle, RunnerStateFromProto(state)) } -func normaliseEvents(events []Event) []Event { +func normaliseEvents(events []TimelineEvent) []TimelineEvent { for i := range len(events) { event := events[i] re := reflect.Indirect(reflect.ValueOf(event)) @@ -400,7 +400,7 @@ func normaliseEvents(events []Event) []Event { return events } -func assertEventsEqual(t *testing.T, expected, actual []Event) { +func assertEventsEqual(t *testing.T, expected, actual []TimelineEvent) { t.Helper() assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual)) } diff --git a/backend/controller/dal/events.go b/backend/controller/dal/events.go index 75babd7fe4..1ae4282b11 100644 --- a/backend/controller/dal/events.go +++ b/backend/controller/dal/events.go @@ -28,10 +28,10 @@ const ( EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated ) -// Event types. +// TimelineEvent types. // //sumtype:decl -type Event interface { +type TimelineEvent interface { GetID() int64 event() } @@ -112,9 +112,9 @@ type eventFilter struct { descending bool } -type EventFilter func(query *eventFilter) +type TimelineFilter func(query *eventFilter) -func FilterLogLevel(level log.Level) EventFilter { +func FilterLogLevel(level log.Level) TimelineFilter { return func(query *eventFilter) { query.level = &level } @@ -123,19 +123,19 @@ func FilterLogLevel(level log.Level) EventFilter { // FilterCall filters call events between the given modules. // // May be called multiple times. -func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) EventFilter { +func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) TimelineFilter { return func(query *eventFilter) { query.calls = append(query.calls, &eventFilterCall{sourceModule: sourceModule, destModule: destModule, destVerb: destVerb}) } } -func FilterDeployments(deploymentKeys ...model.DeploymentKey) EventFilter { +func FilterDeployments(deploymentKeys ...model.DeploymentKey) TimelineFilter { return func(query *eventFilter) { query.deployments = append(query.deployments, deploymentKeys...) } } -func FilterRequests(requestKeys ...model.RequestKey) EventFilter { +func FilterRequests(requestKeys ...model.RequestKey) TimelineFilter { return func(query *eventFilter) { for _, request := range requestKeys { query.requests = append(query.requests, request.String()) @@ -143,7 +143,7 @@ func FilterRequests(requestKeys ...model.RequestKey) EventFilter { } } -func FilterTypes(types ...sql.EventType) EventFilter { +func FilterTypes(types ...sql.EventType) TimelineFilter { return func(query *eventFilter) { query.types = append(query.types, types...) } @@ -152,7 +152,7 @@ func FilterTypes(types ...sql.EventType) EventFilter { // FilterTimeRange filters events between the given times, inclusive. // // Either maybe be zero to indicate no upper or lower bound. -func FilterTimeRange(olderThan, newerThan time.Time) EventFilter { +func FilterTimeRange(olderThan, newerThan time.Time) TimelineFilter { return func(query *eventFilter) { query.newerThan = newerThan query.olderThan = olderThan @@ -160,7 +160,7 @@ func FilterTimeRange(olderThan, newerThan time.Time) EventFilter { } // FilterIDRange filters events between the given IDs, inclusive. -func FilterIDRange(higherThan, lowerThan int64) EventFilter { +func FilterIDRange(higherThan, lowerThan int64) TimelineFilter { return func(query *eventFilter) { query.idHigherThan = higherThan query.idLowerThan = lowerThan @@ -168,7 +168,7 @@ func FilterIDRange(higherThan, lowerThan int64) EventFilter { } // FilterDescending returns events in descending order. -func FilterDescending() EventFilter { +func FilterDescending() TimelineFilter { return func(query *eventFilter) { query.descending = true } @@ -201,12 +201,12 @@ type eventDeploymentUpdatedJSON struct { } type eventRow struct { - sql.Event + sql.Timeline DeploymentKey model.DeploymentKey RequestKey optional.Option[model.RequestKey] } -func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter) ([]Event, error) { +func (d *DAL) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]TimelineEvent, error) { if limit < 1 { return nil, fmt.Errorf("limit must be >= 1, got %d", limit) } @@ -222,7 +222,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter e.custom_key_4, e.type, e.payload - FROM events e + FROM timeline e LEFT JOIN requests ir on e.request_id = ir.id WHERE true -- The "true" is to simplify the ANDs below. ` @@ -256,7 +256,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter deploymentArgs := []any{} if len(filter.deployments) != 0 { // Unfortunately, if we use a join here, PG will do a sequential scan on - // events and deployments, making a 7ms query into a 700ms query. + // timeline and deployments, making a 7ms query into a 700ms query. // https://www.pgexplain.dev/plan/ecd44488-6060-4ad1-a9b4-49d092c3de81 deploymentQuery += ` WHERE key = ANY($1::TEXT[])` deploymentArgs = append(deploymentArgs, filter.deployments) @@ -323,15 +323,15 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter } defer rows.Close() - events, err := d.transformRowsToEvents(deploymentKeys, rows) + events, err := d.transformRowsToTimelineEvents(deploymentKeys, rows) if err != nil { return nil, err } return events, nil } -func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]Event, error) { - var out []Event +func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]TimelineEvent, error) { + var out []TimelineEvent for rows.Next() { row := eventRow{} var deploymentID int64 @@ -349,7 +349,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey switch row.Type { case sql.EventTypeLog: var jsonPayload eventLogJSON - if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil { + if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt log event: %w", err) } @@ -371,7 +371,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey case sql.EventTypeCall: var jsonPayload eventCallJSON - if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil { + if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } var sourceVerb optional.Option[schema.Ref] @@ -396,7 +396,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey case sql.EventTypeDeploymentCreated: var jsonPayload eventDeploymentCreatedJSON - if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil { + if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } out = append(out, &DeploymentCreatedEvent{ @@ -411,7 +411,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey case sql.EventTypeDeploymentUpdated: var jsonPayload eventDeploymentUpdatedJSON - if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil { + if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } out = append(out, &DeploymentUpdatedEvent{ diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 731679ea0f..5c2780936a 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -435,20 +435,6 @@ type EncryptionKey struct { CreatedAt time.Time } -type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage - ParentRequestID optional.Option[string] -} - type FsmInstance struct { ID int64 CreatedAt time.Time @@ -520,6 +506,20 @@ type Runner struct { Labels json.RawMessage } +type Timeline struct { + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload []byte + ParentRequestID optional.Option[string] +} + type Topic struct { ID int64 Key model.TopicKey diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index b1d5c20460..0f2602688a 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -34,7 +34,7 @@ type Querier interface { CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateOnlyEncryptionKey(ctx context.Context, key []byte) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error - DeleteOldEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) + DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) @@ -90,12 +90,12 @@ type Querier interface { GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) - InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error - InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error - InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error - InsertEvent(ctx context.Context, arg InsertEventParams) error - InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error + InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error + InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error + InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error + InsertTimelineEvent(ctx context.Context, arg InsertTimelineEventParams) error + InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error // Mark any controller entries that haven't been updated recently as dead. KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error) KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index cd98b31a89..386d180fba 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -270,8 +270,8 @@ WITH rows AS ( SELECT COUNT(*) FROM rows; --- name: InsertLogEvent :exec -INSERT INTO events ( +-- name: InsertTimelineLogEvent :exec +INSERT INTO timeline ( deployment_id, request_id, time_stamp, @@ -293,8 +293,8 @@ VALUES ( sqlc.arg('payload') ); --- name: InsertDeploymentCreatedEvent :exec -INSERT INTO events ( +-- name: InsertTimelineDeploymentCreatedEvent :exec +INSERT INTO timeline ( deployment_id, type, custom_key_1, @@ -302,7 +302,7 @@ INSERT INTO events ( payload ) VALUES ( - ( + ( SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key @@ -313,8 +313,8 @@ VALUES ( sqlc.arg('payload') ); --- name: InsertDeploymentUpdatedEvent :exec -INSERT INTO events ( +-- name: InsertTimelineDeploymentUpdatedEvent :exec +INSERT INTO timeline ( deployment_id, type, custom_key_1, @@ -333,8 +333,8 @@ VALUES ( sqlc.arg('payload') ); --- name: InsertCallEvent :exec -INSERT INTO events ( +-- name: InsertTimelineCallEvent :exec +INSERT INTO timeline ( deployment_id, request_id, parent_request_id, @@ -365,9 +365,9 @@ VALUES ( sqlc.arg('payload') ); --- name: DeleteOldEvents :one +-- name: DeleteOldTimelineEvents :one WITH deleted AS ( - DELETE FROM events + DELETE FROM timeline WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL AND type = sqlc.arg('type') RETURNING 1 @@ -423,8 +423,8 @@ FROM ingress_routes ir WHERE d.min_replicas > 0; --- name: InsertEvent :exec -INSERT INTO events (deployment_id, request_id, parent_request_id, type, +-- name: InsertTimelineEvent :exec +INSERT INTO timeline (deployment_id, request_id, parent_request_id, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) @@ -667,8 +667,8 @@ VALUES ( sqlc.arg('name')::TEXT, sqlc.arg('event_type')::TEXT ) -ON CONFLICT (name, module_id) DO -UPDATE SET +ON CONFLICT (name, module_id) DO +UPDATE SET type = sqlc.arg('event_type')::TEXT RETURNING id; @@ -693,12 +693,12 @@ VALUES ( sqlc.arg('name')::TEXT ) ON CONFLICT (name, module_id) DO -UPDATE SET +UPDATE SET topic_id = excluded.topic_id, deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key) -RETURNING +RETURNING id, - CASE + CASE WHEN xmax = 0 THEN true ELSE false END AS inserted; diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index fb7f4f10bd..79b2a680dd 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -331,9 +331,9 @@ func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.Re return err } -const deleteOldEvents = `-- name: DeleteOldEvents :one +const deleteOldTimelineEvents = `-- name: DeleteOldTimelineEvents :one WITH deleted AS ( - DELETE FROM events + DELETE FROM timeline WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL AND type = $2 RETURNING 1 @@ -342,8 +342,8 @@ SELECT COUNT(*) FROM deleted ` -func (q *Queries) DeleteOldEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) { - row := q.db.QueryRowContext(ctx, deleteOldEvents, timeout, type_) +func (q *Queries) DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) { + row := q.db.QueryRowContext(ctx, deleteOldTimelineEvents, timeout, type_) var count int64 err := row.Scan(&count) return count, err @@ -1945,8 +1945,64 @@ func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent return i, err } -const insertCallEvent = `-- name: InsertCallEvent :exec -INSERT INTO events ( +const insertSubscriber = `-- name: InsertSubscriber :exec +INSERT INTO topic_subscribers ( + key, + topic_subscriptions_id, + deployment_id, + sink, + retry_attempts, + backoff, + max_backoff, + catch_verb +) +VALUES ( + $1::subscriber_key, + ( + SELECT topic_subscriptions.id as id + FROM topic_subscriptions + INNER JOIN modules ON topic_subscriptions.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topic_subscriptions.name = $3::TEXT + ), + (SELECT id FROM deployments WHERE key = $4::deployment_key), + $5, + $6, + $7::interval, + $8::interval, + $9 +) +` + +type InsertSubscriberParams struct { + Key model.SubscriberKey + Module string + SubscriptionName string + Deployment model.DeploymentKey + Sink schema.RefKey + RetryAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] +} + +func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { + _, err := q.db.ExecContext(ctx, insertSubscriber, + arg.Key, + arg.Module, + arg.SubscriptionName, + arg.Deployment, + arg.Sink, + arg.RetryAttempts, + arg.Backoff, + arg.MaxBackoff, + arg.CatchVerb, + ) + return err +} + +const insertTimelineCallEvent = `-- name: InsertTimelineCallEvent :exec +INSERT INTO timeline ( deployment_id, request_id, parent_request_id, @@ -1978,7 +2034,7 @@ VALUES ( ) ` -type InsertCallEventParams struct { +type InsertTimelineCallEventParams struct { DeploymentKey model.DeploymentKey RequestKey optional.Option[string] ParentRequestKey optional.Option[string] @@ -1987,11 +2043,11 @@ type InsertCallEventParams struct { SourceVerb optional.Option[string] DestModule string DestVerb string - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error { - _, err := q.db.ExecContext(ctx, insertCallEvent, +func (q *Queries) InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineCallEvent, arg.DeploymentKey, arg.RequestKey, arg.ParentRequestKey, @@ -2005,8 +2061,8 @@ func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams return err } -const insertDeploymentCreatedEvent = `-- name: InsertDeploymentCreatedEvent :exec -INSERT INTO events ( +const insertTimelineDeploymentCreatedEvent = `-- name: InsertTimelineDeploymentCreatedEvent :exec +INSERT INTO timeline ( deployment_id, type, custom_key_1, @@ -2014,7 +2070,7 @@ INSERT INTO events ( payload ) VALUES ( - ( + ( SELECT id FROM deployments WHERE deployments.key = $1::deployment_key @@ -2026,15 +2082,15 @@ VALUES ( ) ` -type InsertDeploymentCreatedEventParams struct { +type InsertTimelineDeploymentCreatedEventParams struct { DeploymentKey model.DeploymentKey Language string ModuleName string - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error { - _, err := q.db.ExecContext(ctx, insertDeploymentCreatedEvent, +func (q *Queries) InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineDeploymentCreatedEvent, arg.DeploymentKey, arg.Language, arg.ModuleName, @@ -2043,8 +2099,8 @@ func (q *Queries) InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDe return err } -const insertDeploymentUpdatedEvent = `-- name: InsertDeploymentUpdatedEvent :exec -INSERT INTO events ( +const insertTimelineDeploymentUpdatedEvent = `-- name: InsertTimelineDeploymentUpdatedEvent :exec +INSERT INTO timeline ( deployment_id, type, custom_key_1, @@ -2064,15 +2120,15 @@ VALUES ( ) ` -type InsertDeploymentUpdatedEventParams struct { +type InsertTimelineDeploymentUpdatedEventParams struct { DeploymentKey model.DeploymentKey Language string ModuleName string - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error { - _, err := q.db.ExecContext(ctx, insertDeploymentUpdatedEvent, +func (q *Queries) InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineDeploymentUpdatedEvent, arg.DeploymentKey, arg.Language, arg.ModuleName, @@ -2081,15 +2137,15 @@ func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDe return err } -const insertEvent = `-- name: InsertEvent :exec -INSERT INTO events (deployment_id, request_id, parent_request_id, type, +const insertTimelineEvent = `-- name: InsertTimelineEvent :exec +INSERT INTO timeline (deployment_id, request_id, parent_request_id, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id ` -type InsertEventParams struct { +type InsertTimelineEventParams struct { DeploymentID int64 RequestID optional.Option[int64] ParentRequestID optional.Option[string] @@ -2098,11 +2154,11 @@ type InsertEventParams struct { CustomKey2 optional.Option[string] CustomKey3 optional.Option[string] CustomKey4 optional.Option[string] - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error { - _, err := q.db.ExecContext(ctx, insertEvent, +func (q *Queries) InsertTimelineEvent(ctx context.Context, arg InsertTimelineEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineEvent, arg.DeploymentID, arg.RequestID, arg.ParentRequestID, @@ -2116,8 +2172,8 @@ func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error return err } -const insertLogEvent = `-- name: InsertLogEvent :exec -INSERT INTO events ( +const insertTimelineLogEvent = `-- name: InsertTimelineLogEvent :exec +INSERT INTO timeline ( deployment_id, request_id, time_stamp, @@ -2140,16 +2196,16 @@ VALUES ( ) ` -type InsertLogEventParams struct { +type InsertTimelineLogEventParams struct { DeploymentKey model.DeploymentKey RequestKey optional.Option[string] TimeStamp time.Time Level int32 - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error { - _, err := q.db.ExecContext(ctx, insertLogEvent, +func (q *Queries) InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineLogEvent, arg.DeploymentKey, arg.RequestKey, arg.TimeStamp, @@ -2159,62 +2215,6 @@ func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) return err } -const insertSubscriber = `-- name: InsertSubscriber :exec -INSERT INTO topic_subscribers ( - key, - topic_subscriptions_id, - deployment_id, - sink, - retry_attempts, - backoff, - max_backoff, - catch_verb -) -VALUES ( - $1::subscriber_key, - ( - SELECT topic_subscriptions.id as id - FROM topic_subscriptions - INNER JOIN modules ON topic_subscriptions.module_id = modules.id - WHERE modules.name = $2::TEXT - AND topic_subscriptions.name = $3::TEXT - ), - (SELECT id FROM deployments WHERE key = $4::deployment_key), - $5, - $6, - $7::interval, - $8::interval, - $9 -) -` - -type InsertSubscriberParams struct { - Key model.SubscriberKey - Module string - SubscriptionName string - Deployment model.DeploymentKey - Sink schema.RefKey - RetryAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] -} - -func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { - _, err := q.db.ExecContext(ctx, insertSubscriber, - arg.Key, - arg.Module, - arg.SubscriptionName, - arg.Deployment, - arg.Sink, - arg.RetryAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.CatchVerb, - ) - return err -} - const killStaleControllers = `-- name: KillStaleControllers :one WITH matches AS ( UPDATE controller @@ -2708,12 +2708,12 @@ VALUES ( $6::TEXT ) ON CONFLICT (name, module_id) DO -UPDATE SET +UPDATE SET topic_id = excluded.topic_id, deployment_id = (SELECT id FROM deployments WHERE key = $5::deployment_key) -RETURNING +RETURNING id, - CASE + CASE WHEN xmax = 0 THEN true ELSE false END AS inserted @@ -2755,8 +2755,8 @@ VALUES ( $3::TEXT, $4::TEXT ) -ON CONFLICT (name, module_id) DO -UPDATE SET +ON CONFLICT (name, module_id) DO +UPDATE SET type = $4::TEXT RETURNING id ` diff --git a/backend/controller/sql/schema/20240814060154_rename_events_to_timeline.sql b/backend/controller/sql/schema/20240814060154_rename_events_to_timeline.sql new file mode 100644 index 0000000000..579ed8faa9 --- /dev/null +++ b/backend/controller/sql/schema/20240814060154_rename_events_to_timeline.sql @@ -0,0 +1,6 @@ +-- migrate:up + +ALTER TABLE events RENAME TO timeline; + +-- migrate:down + diff --git a/common/configuration/sql/models.go b/common/configuration/sql/models.go index 731679ea0f..5c2780936a 100644 --- a/common/configuration/sql/models.go +++ b/common/configuration/sql/models.go @@ -435,20 +435,6 @@ type EncryptionKey struct { CreatedAt time.Time } -type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage - ParentRequestID optional.Option[string] -} - type FsmInstance struct { ID int64 CreatedAt time.Time @@ -520,6 +506,20 @@ type Runner struct { Labels json.RawMessage } +type Timeline struct { + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload []byte + ParentRequestID optional.Option[string] +} + type Topic struct { ID int64 Key model.TopicKey diff --git a/internal/encryption/encryption.go b/internal/encryption/encryption.go index b6816e5986..9815b846bc 100644 --- a/internal/encryption/encryption.go +++ b/internal/encryption/encryption.go @@ -19,8 +19,8 @@ import ( type SubKey string const ( - LogsSubKey SubKey = "logs" - AsyncSubKey SubKey = "async" + TimelineSubKey SubKey = "timeline" + AsyncSubKey SubKey = "async" ) type DataEncryptor interface { diff --git a/internal/encryption/encryption_test.go b/internal/encryption/encryption_test.go index f13e4358d5..b30719ff17 100644 --- a/internal/encryption/encryption_test.go +++ b/internal/encryption/encryption_test.go @@ -9,10 +9,10 @@ import ( func TestNoOpEncryptor(t *testing.T) { encryptor := NoOpEncryptorNext{} - encrypted, err := encryptor.Encrypt(LogsSubKey, []byte("hunter2")) + encrypted, err := encryptor.Encrypt(TimelineSubKey, []byte("hunter2")) assert.NoError(t, err) - decrypted, err := encryptor.Decrypt(LogsSubKey, encrypted) + decrypted, err := encryptor.Decrypt(TimelineSubKey, encrypted) assert.NoError(t, err) assert.Equal(t, "hunter2", string(decrypted)) @@ -25,10 +25,10 @@ func TestKMSEncryptorFakeKMS(t *testing.T) { encryptor, err := NewKMSEncryptorGenerateKey(uri, nil) assert.NoError(t, err) - encrypted, err := encryptor.Encrypt(LogsSubKey, []byte("hunter2")) + encrypted, err := encryptor.Encrypt(TimelineSubKey, []byte("hunter2")) assert.NoError(t, err) - decrypted, err := encryptor.Decrypt(LogsSubKey, encrypted) + decrypted, err := encryptor.Decrypt(TimelineSubKey, encrypted) assert.NoError(t, err) assert.Equal(t, "hunter2", string(decrypted)) diff --git a/internal/encryption/integration_test.go b/internal/encryption/integration_test.go index 92cca64ad0..03825982bb 100644 --- a/internal/encryption/integration_test.go +++ b/internal/encryption/integration_test.go @@ -130,10 +130,10 @@ func TestKMSEncryptorLocalstack(t *testing.T) { encryptor, err := NewKMSEncryptorGenerateKey(uri, v1client) assert.NoError(t, err) - encrypted, err := encryptor.Encrypt(LogsSubKey, []byte("hunter2")) + encrypted, err := encryptor.Encrypt(TimelineSubKey, []byte("hunter2")) assert.NoError(t, err) - decrypted, err := encryptor.Decrypt(LogsSubKey, encrypted) + decrypted, err := encryptor.Decrypt(TimelineSubKey, encrypted) assert.NoError(t, err) assert.Equal(t, "hunter2", string(decrypted))