From c608d256c85465df73c614530316107151266d79 Mon Sep 17 00:00:00 2001 From: Alec Thomas Date: Wed, 14 Aug 2024 15:55:58 +1000 Subject: [PATCH] fix: rename some timeline related names Renamed the "events" table to "timeline" to more clearly reflect its purpose and usage. Renamed the encryption key from "logs" to "events" as the events table contains more than just logs. This just makes things a bit clearer. --- backend/controller/console/console.go | 10 +- backend/controller/cronjobs/sql/models.go | 28 +-- backend/controller/dal/dal.go | 18 +- backend/controller/dal/dal_test.go | 26 +-- backend/controller/dal/events.go | 44 ++-- backend/controller/sql/models.go | 28 +-- backend/controller/sql/querier.go | 12 +- backend/controller/sql/queries.sql | 36 ++-- backend/controller/sql/queries.sql.go | 192 +++++++++--------- ...240814060154_rename_events_to_timeline.sql | 6 + common/configuration/sql/models.go | 28 +-- internal/encryption/encryption.go | 4 +- internal/encryption/encryption_test.go | 8 +- internal/encryption/integration_test.go | 8 +- 14 files changed, 227 insertions(+), 221 deletions(-) create mode 100644 backend/controller/sql/schema/20240814060154_rename_events_to_timeline.sql diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index 23cbd9d3d2..67d9ea296b 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -195,7 +195,7 @@ func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbc // Get 1 more than the requested limit to determine if there are more results. limitPlusOne := limit + 1 - results, err := c.dal.QueryEvents(ctx, limitPlusOne, query...) + results, err := c.dal.QueryTimeline(ctx, limitPlusOne, query...) if err != nil { return nil, err } @@ -241,7 +241,7 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[ newQuery = append(newQuery, dal.FilterTimeRange(thisRequestTime, lastEventTime)) } - events, err := c.dal.QueryEvents(ctx, int(req.Msg.Query.Limit), newQuery...) + events, err := c.dal.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...) if err != nil { return err } @@ -264,8 +264,8 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[ } } -func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error) { - var query []dal.EventFilter +func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, error) { + var query []dal.TimelineFilter if pb.Order == pbconsole.EventsQuery_DESC { query = append(query, dal.FilterDescending()) @@ -357,7 +357,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error) return query, nil } -func eventDALToProto(event dal.Event) *pbconsole.Event { +func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event { switch event := event.(type) { case *dal.CallEvent: var requestKey *string diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index 731679ea0f..5c2780936a 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -435,20 +435,6 @@ type EncryptionKey struct { CreatedAt time.Time } -type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage - ParentRequestID optional.Option[string] -} - type FsmInstance struct { ID int64 CreatedAt time.Time @@ -520,6 +506,20 @@ type Runner struct { Labels json.RawMessage } +type Timeline struct { + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload []byte + ParentRequestID optional.Option[string] +} + type Topic struct { ID int64 Key model.TopicKey diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 9e1f7b4ac1..7172fe71dc 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -709,14 +709,14 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey return dalerrs.TranslatePGError(err) } } - payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]interface{}{ + payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]interface{}{ "prev_min_replicas": deployment.MinReplicas, "min_replicas": minReplicas, }) if err != nil { return fmt.Errorf("failed to encrypt payload for InsertDeploymentUpdatedEvent: %w", err) } - err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{ + err = tx.InsertTimelineDeploymentUpdatedEvent(ctx, sql.InsertTimelineDeploymentUpdatedEventParams{ DeploymentKey: key, Payload: payload, }) @@ -782,7 +782,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl } } - payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{ + payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]any{ "min_replicas": int32(minReplicas), "replaced": replacedDeploymentKey, }) @@ -790,7 +790,7 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl return fmt.Errorf("replace deployment failed to encrypt payload: %w", err) } - err = tx.InsertDeploymentCreatedEvent(ctx, sql.InsertDeploymentCreatedEventParams{ + err = tx.InsertTimelineDeploymentCreatedEvent(ctx, sql.InsertTimelineDeploymentCreatedEventParams{ DeploymentKey: newDeploymentKey, Language: newDeployment.Language, ModuleName: newDeployment.ModuleName, @@ -1057,11 +1057,11 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error { "error": log.Error, "stack": log.Stack, } - encryptedPayload, err := d.encryptJSON(encryption.LogsSubKey, payload) + encryptedPayload, err := d.encryptJSON(encryption.TimelineSubKey, payload) if err != nil { return fmt.Errorf("failed to encrypt log payload: %w", err) } - return dalerrs.TranslatePGError(d.db.InsertLogEvent(ctx, sql.InsertLogEventParams{ + return dalerrs.TranslatePGError(d.db.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{ DeploymentKey: log.DeploymentKey, RequestKey: requestKey, TimeStamp: log.Time, @@ -1137,7 +1137,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { if pr, ok := call.ParentRequestKey.Get(); ok { parentRequestKey = optional.Some(pr.String()) } - payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{ + payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]any{ "duration_ms": call.Duration.Milliseconds(), "request": call.Request, "response": call.Response, @@ -1147,7 +1147,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { if err != nil { return fmt.Errorf("failed to encrypt call payload: %w", err) } - return dalerrs.TranslatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{ + return dalerrs.TranslatePGError(d.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{ DeploymentKey: call.DeploymentKey, RequestKey: requestKey, ParentRequestKey: parentRequestKey, @@ -1161,7 +1161,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { } func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { - count, err := d.db.DeleteOldEvents(ctx, sqltypes.Duration(age), eventType) + count, err := d.db.DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType) return count, dalerrs.TranslatePGError(err) } diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index ee3618c442..43ed5e9c4c 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -263,39 +263,39 @@ func TestDAL(t *testing.T) { t.Run("QueryEvents", func(t *testing.T) { t.Run("Limit", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1) + events, err := dal.QueryTimeline(ctx, 1) assert.NoError(t, err) assert.Equal(t, 1, len(events)) }) t.Run("NoFilters", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000) + events, err := dal.QueryTimeline(ctx, 1000) assert.NoError(t, err) - assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) + assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) }) t.Run("ByDeployment", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000, FilterDeployments(deploymentKey)) + events, err := dal.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey)) assert.NoError(t, err) - assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) + assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) }) t.Run("ByCall", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) + events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) assert.NoError(t, err) - assertEventsEqual(t, []Event{callEvent}, events) + assertEventsEqual(t, []TimelineEvent{callEvent}, events) }) t.Run("ByLogLevel", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) + events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) assert.NoError(t, err) - assertEventsEqual(t, []Event{logEvent}, events) + assertEventsEqual(t, []TimelineEvent{logEvent}, events) }) t.Run("ByRequests", func(t *testing.T) { - events, err := dal.QueryEvents(ctx, 1000, FilterRequests(requestKey)) + events, err := dal.QueryTimeline(ctx, 1000, FilterRequests(requestKey)) assert.NoError(t, err) - assertEventsEqual(t, []Event{callEvent, logEvent}, events) + assertEventsEqual(t, []TimelineEvent{callEvent, logEvent}, events) }) }) @@ -386,7 +386,7 @@ func TestRunnerStateFromProto(t *testing.T) { assert.Equal(t, RunnerStateIdle, RunnerStateFromProto(state)) } -func normaliseEvents(events []Event) []Event { +func normaliseEvents(events []TimelineEvent) []TimelineEvent { for i := range len(events) { event := events[i] re := reflect.Indirect(reflect.ValueOf(event)) @@ -400,7 +400,7 @@ func normaliseEvents(events []Event) []Event { return events } -func assertEventsEqual(t *testing.T, expected, actual []Event) { +func assertEventsEqual(t *testing.T, expected, actual []TimelineEvent) { t.Helper() assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual)) } diff --git a/backend/controller/dal/events.go b/backend/controller/dal/events.go index 75babd7fe4..1ae4282b11 100644 --- a/backend/controller/dal/events.go +++ b/backend/controller/dal/events.go @@ -28,10 +28,10 @@ const ( EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated ) -// Event types. +// TimelineEvent types. // //sumtype:decl -type Event interface { +type TimelineEvent interface { GetID() int64 event() } @@ -112,9 +112,9 @@ type eventFilter struct { descending bool } -type EventFilter func(query *eventFilter) +type TimelineFilter func(query *eventFilter) -func FilterLogLevel(level log.Level) EventFilter { +func FilterLogLevel(level log.Level) TimelineFilter { return func(query *eventFilter) { query.level = &level } @@ -123,19 +123,19 @@ func FilterLogLevel(level log.Level) EventFilter { // FilterCall filters call events between the given modules. // // May be called multiple times. -func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) EventFilter { +func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) TimelineFilter { return func(query *eventFilter) { query.calls = append(query.calls, &eventFilterCall{sourceModule: sourceModule, destModule: destModule, destVerb: destVerb}) } } -func FilterDeployments(deploymentKeys ...model.DeploymentKey) EventFilter { +func FilterDeployments(deploymentKeys ...model.DeploymentKey) TimelineFilter { return func(query *eventFilter) { query.deployments = append(query.deployments, deploymentKeys...) } } -func FilterRequests(requestKeys ...model.RequestKey) EventFilter { +func FilterRequests(requestKeys ...model.RequestKey) TimelineFilter { return func(query *eventFilter) { for _, request := range requestKeys { query.requests = append(query.requests, request.String()) @@ -143,7 +143,7 @@ func FilterRequests(requestKeys ...model.RequestKey) EventFilter { } } -func FilterTypes(types ...sql.EventType) EventFilter { +func FilterTypes(types ...sql.EventType) TimelineFilter { return func(query *eventFilter) { query.types = append(query.types, types...) } @@ -152,7 +152,7 @@ func FilterTypes(types ...sql.EventType) EventFilter { // FilterTimeRange filters events between the given times, inclusive. // // Either maybe be zero to indicate no upper or lower bound. -func FilterTimeRange(olderThan, newerThan time.Time) EventFilter { +func FilterTimeRange(olderThan, newerThan time.Time) TimelineFilter { return func(query *eventFilter) { query.newerThan = newerThan query.olderThan = olderThan @@ -160,7 +160,7 @@ func FilterTimeRange(olderThan, newerThan time.Time) EventFilter { } // FilterIDRange filters events between the given IDs, inclusive. -func FilterIDRange(higherThan, lowerThan int64) EventFilter { +func FilterIDRange(higherThan, lowerThan int64) TimelineFilter { return func(query *eventFilter) { query.idHigherThan = higherThan query.idLowerThan = lowerThan @@ -168,7 +168,7 @@ func FilterIDRange(higherThan, lowerThan int64) EventFilter { } // FilterDescending returns events in descending order. -func FilterDescending() EventFilter { +func FilterDescending() TimelineFilter { return func(query *eventFilter) { query.descending = true } @@ -201,12 +201,12 @@ type eventDeploymentUpdatedJSON struct { } type eventRow struct { - sql.Event + sql.Timeline DeploymentKey model.DeploymentKey RequestKey optional.Option[model.RequestKey] } -func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter) ([]Event, error) { +func (d *DAL) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]TimelineEvent, error) { if limit < 1 { return nil, fmt.Errorf("limit must be >= 1, got %d", limit) } @@ -222,7 +222,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter e.custom_key_4, e.type, e.payload - FROM events e + FROM timeline e LEFT JOIN requests ir on e.request_id = ir.id WHERE true -- The "true" is to simplify the ANDs below. ` @@ -256,7 +256,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter deploymentArgs := []any{} if len(filter.deployments) != 0 { // Unfortunately, if we use a join here, PG will do a sequential scan on - // events and deployments, making a 7ms query into a 700ms query. + // timeline and deployments, making a 7ms query into a 700ms query. // https://www.pgexplain.dev/plan/ecd44488-6060-4ad1-a9b4-49d092c3de81 deploymentQuery += ` WHERE key = ANY($1::TEXT[])` deploymentArgs = append(deploymentArgs, filter.deployments) @@ -323,15 +323,15 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter } defer rows.Close() - events, err := d.transformRowsToEvents(deploymentKeys, rows) + events, err := d.transformRowsToTimelineEvents(deploymentKeys, rows) if err != nil { return nil, err } return events, nil } -func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]Event, error) { - var out []Event +func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]TimelineEvent, error) { + var out []TimelineEvent for rows.Next() { row := eventRow{} var deploymentID int64 @@ -349,7 +349,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey switch row.Type { case sql.EventTypeLog: var jsonPayload eventLogJSON - if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil { + if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt log event: %w", err) } @@ -371,7 +371,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey case sql.EventTypeCall: var jsonPayload eventCallJSON - if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil { + if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } var sourceVerb optional.Option[schema.Ref] @@ -396,7 +396,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey case sql.EventTypeDeploymentCreated: var jsonPayload eventDeploymentCreatedJSON - if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil { + if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } out = append(out, &DeploymentCreatedEvent{ @@ -411,7 +411,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey case sql.EventTypeDeploymentUpdated: var jsonPayload eventDeploymentUpdatedJSON - if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil { + if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } out = append(out, &DeploymentUpdatedEvent{ diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index 731679ea0f..5c2780936a 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -435,20 +435,6 @@ type EncryptionKey struct { CreatedAt time.Time } -type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage - ParentRequestID optional.Option[string] -} - type FsmInstance struct { ID int64 CreatedAt time.Time @@ -520,6 +506,20 @@ type Runner struct { Labels json.RawMessage } +type Timeline struct { + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload []byte + ParentRequestID optional.Option[string] +} + type Topic struct { ID int64 Key model.TopicKey diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index b1d5c20460..0f2602688a 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -34,7 +34,7 @@ type Querier interface { CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateOnlyEncryptionKey(ctx context.Context, key []byte) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error - DeleteOldEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) + DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) @@ -90,12 +90,12 @@ type Querier interface { GetSubscriptionsNeedingUpdate(ctx context.Context) ([]GetSubscriptionsNeedingUpdateRow, error) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) - InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error - InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error - InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error - InsertEvent(ctx context.Context, arg InsertEventParams) error - InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error + InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error + InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error + InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error + InsertTimelineEvent(ctx context.Context, arg InsertTimelineEventParams) error + InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error // Mark any controller entries that haven't been updated recently as dead. KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error) KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index cd98b31a89..386d180fba 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -270,8 +270,8 @@ WITH rows AS ( SELECT COUNT(*) FROM rows; --- name: InsertLogEvent :exec -INSERT INTO events ( +-- name: InsertTimelineLogEvent :exec +INSERT INTO timeline ( deployment_id, request_id, time_stamp, @@ -293,8 +293,8 @@ VALUES ( sqlc.arg('payload') ); --- name: InsertDeploymentCreatedEvent :exec -INSERT INTO events ( +-- name: InsertTimelineDeploymentCreatedEvent :exec +INSERT INTO timeline ( deployment_id, type, custom_key_1, @@ -302,7 +302,7 @@ INSERT INTO events ( payload ) VALUES ( - ( + ( SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key @@ -313,8 +313,8 @@ VALUES ( sqlc.arg('payload') ); --- name: InsertDeploymentUpdatedEvent :exec -INSERT INTO events ( +-- name: InsertTimelineDeploymentUpdatedEvent :exec +INSERT INTO timeline ( deployment_id, type, custom_key_1, @@ -333,8 +333,8 @@ VALUES ( sqlc.arg('payload') ); --- name: InsertCallEvent :exec -INSERT INTO events ( +-- name: InsertTimelineCallEvent :exec +INSERT INTO timeline ( deployment_id, request_id, parent_request_id, @@ -365,9 +365,9 @@ VALUES ( sqlc.arg('payload') ); --- name: DeleteOldEvents :one +-- name: DeleteOldTimelineEvents :one WITH deleted AS ( - DELETE FROM events + DELETE FROM timeline WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL AND type = sqlc.arg('type') RETURNING 1 @@ -423,8 +423,8 @@ FROM ingress_routes ir WHERE d.min_replicas > 0; --- name: InsertEvent :exec -INSERT INTO events (deployment_id, request_id, parent_request_id, type, +-- name: InsertTimelineEvent :exec +INSERT INTO timeline (deployment_id, request_id, parent_request_id, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) @@ -667,8 +667,8 @@ VALUES ( sqlc.arg('name')::TEXT, sqlc.arg('event_type')::TEXT ) -ON CONFLICT (name, module_id) DO -UPDATE SET +ON CONFLICT (name, module_id) DO +UPDATE SET type = sqlc.arg('event_type')::TEXT RETURNING id; @@ -693,12 +693,12 @@ VALUES ( sqlc.arg('name')::TEXT ) ON CONFLICT (name, module_id) DO -UPDATE SET +UPDATE SET topic_id = excluded.topic_id, deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment')::deployment_key) -RETURNING +RETURNING id, - CASE + CASE WHEN xmax = 0 THEN true ELSE false END AS inserted; diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index fb7f4f10bd..79b2a680dd 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -331,9 +331,9 @@ func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.Re return err } -const deleteOldEvents = `-- name: DeleteOldEvents :one +const deleteOldTimelineEvents = `-- name: DeleteOldTimelineEvents :one WITH deleted AS ( - DELETE FROM events + DELETE FROM timeline WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL AND type = $2 RETURNING 1 @@ -342,8 +342,8 @@ SELECT COUNT(*) FROM deleted ` -func (q *Queries) DeleteOldEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) { - row := q.db.QueryRowContext(ctx, deleteOldEvents, timeout, type_) +func (q *Queries) DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) { + row := q.db.QueryRowContext(ctx, deleteOldTimelineEvents, timeout, type_) var count int64 err := row.Scan(&count) return count, err @@ -1945,8 +1945,64 @@ func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent return i, err } -const insertCallEvent = `-- name: InsertCallEvent :exec -INSERT INTO events ( +const insertSubscriber = `-- name: InsertSubscriber :exec +INSERT INTO topic_subscribers ( + key, + topic_subscriptions_id, + deployment_id, + sink, + retry_attempts, + backoff, + max_backoff, + catch_verb +) +VALUES ( + $1::subscriber_key, + ( + SELECT topic_subscriptions.id as id + FROM topic_subscriptions + INNER JOIN modules ON topic_subscriptions.module_id = modules.id + WHERE modules.name = $2::TEXT + AND topic_subscriptions.name = $3::TEXT + ), + (SELECT id FROM deployments WHERE key = $4::deployment_key), + $5, + $6, + $7::interval, + $8::interval, + $9 +) +` + +type InsertSubscriberParams struct { + Key model.SubscriberKey + Module string + SubscriptionName string + Deployment model.DeploymentKey + Sink schema.RefKey + RetryAttempts int32 + Backoff sqltypes.Duration + MaxBackoff sqltypes.Duration + CatchVerb optional.Option[schema.RefKey] +} + +func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { + _, err := q.db.ExecContext(ctx, insertSubscriber, + arg.Key, + arg.Module, + arg.SubscriptionName, + arg.Deployment, + arg.Sink, + arg.RetryAttempts, + arg.Backoff, + arg.MaxBackoff, + arg.CatchVerb, + ) + return err +} + +const insertTimelineCallEvent = `-- name: InsertTimelineCallEvent :exec +INSERT INTO timeline ( deployment_id, request_id, parent_request_id, @@ -1978,7 +2034,7 @@ VALUES ( ) ` -type InsertCallEventParams struct { +type InsertTimelineCallEventParams struct { DeploymentKey model.DeploymentKey RequestKey optional.Option[string] ParentRequestKey optional.Option[string] @@ -1987,11 +2043,11 @@ type InsertCallEventParams struct { SourceVerb optional.Option[string] DestModule string DestVerb string - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error { - _, err := q.db.ExecContext(ctx, insertCallEvent, +func (q *Queries) InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineCallEvent, arg.DeploymentKey, arg.RequestKey, arg.ParentRequestKey, @@ -2005,8 +2061,8 @@ func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams return err } -const insertDeploymentCreatedEvent = `-- name: InsertDeploymentCreatedEvent :exec -INSERT INTO events ( +const insertTimelineDeploymentCreatedEvent = `-- name: InsertTimelineDeploymentCreatedEvent :exec +INSERT INTO timeline ( deployment_id, type, custom_key_1, @@ -2014,7 +2070,7 @@ INSERT INTO events ( payload ) VALUES ( - ( + ( SELECT id FROM deployments WHERE deployments.key = $1::deployment_key @@ -2026,15 +2082,15 @@ VALUES ( ) ` -type InsertDeploymentCreatedEventParams struct { +type InsertTimelineDeploymentCreatedEventParams struct { DeploymentKey model.DeploymentKey Language string ModuleName string - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDeploymentCreatedEventParams) error { - _, err := q.db.ExecContext(ctx, insertDeploymentCreatedEvent, +func (q *Queries) InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineDeploymentCreatedEvent, arg.DeploymentKey, arg.Language, arg.ModuleName, @@ -2043,8 +2099,8 @@ func (q *Queries) InsertDeploymentCreatedEvent(ctx context.Context, arg InsertDe return err } -const insertDeploymentUpdatedEvent = `-- name: InsertDeploymentUpdatedEvent :exec -INSERT INTO events ( +const insertTimelineDeploymentUpdatedEvent = `-- name: InsertTimelineDeploymentUpdatedEvent :exec +INSERT INTO timeline ( deployment_id, type, custom_key_1, @@ -2064,15 +2120,15 @@ VALUES ( ) ` -type InsertDeploymentUpdatedEventParams struct { +type InsertTimelineDeploymentUpdatedEventParams struct { DeploymentKey model.DeploymentKey Language string ModuleName string - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDeploymentUpdatedEventParams) error { - _, err := q.db.ExecContext(ctx, insertDeploymentUpdatedEvent, +func (q *Queries) InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineDeploymentUpdatedEvent, arg.DeploymentKey, arg.Language, arg.ModuleName, @@ -2081,15 +2137,15 @@ func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDe return err } -const insertEvent = `-- name: InsertEvent :exec -INSERT INTO events (deployment_id, request_id, parent_request_id, type, +const insertTimelineEvent = `-- name: InsertTimelineEvent :exec +INSERT INTO timeline (deployment_id, request_id, parent_request_id, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id ` -type InsertEventParams struct { +type InsertTimelineEventParams struct { DeploymentID int64 RequestID optional.Option[int64] ParentRequestID optional.Option[string] @@ -2098,11 +2154,11 @@ type InsertEventParams struct { CustomKey2 optional.Option[string] CustomKey3 optional.Option[string] CustomKey4 optional.Option[string] - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error { - _, err := q.db.ExecContext(ctx, insertEvent, +func (q *Queries) InsertTimelineEvent(ctx context.Context, arg InsertTimelineEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineEvent, arg.DeploymentID, arg.RequestID, arg.ParentRequestID, @@ -2116,8 +2172,8 @@ func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error return err } -const insertLogEvent = `-- name: InsertLogEvent :exec -INSERT INTO events ( +const insertTimelineLogEvent = `-- name: InsertTimelineLogEvent :exec +INSERT INTO timeline ( deployment_id, request_id, time_stamp, @@ -2140,16 +2196,16 @@ VALUES ( ) ` -type InsertLogEventParams struct { +type InsertTimelineLogEventParams struct { DeploymentKey model.DeploymentKey RequestKey optional.Option[string] TimeStamp time.Time Level int32 - Payload json.RawMessage + Payload []byte } -func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) error { - _, err := q.db.ExecContext(ctx, insertLogEvent, +func (q *Queries) InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineLogEvent, arg.DeploymentKey, arg.RequestKey, arg.TimeStamp, @@ -2159,62 +2215,6 @@ func (q *Queries) InsertLogEvent(ctx context.Context, arg InsertLogEventParams) return err } -const insertSubscriber = `-- name: InsertSubscriber :exec -INSERT INTO topic_subscribers ( - key, - topic_subscriptions_id, - deployment_id, - sink, - retry_attempts, - backoff, - max_backoff, - catch_verb -) -VALUES ( - $1::subscriber_key, - ( - SELECT topic_subscriptions.id as id - FROM topic_subscriptions - INNER JOIN modules ON topic_subscriptions.module_id = modules.id - WHERE modules.name = $2::TEXT - AND topic_subscriptions.name = $3::TEXT - ), - (SELECT id FROM deployments WHERE key = $4::deployment_key), - $5, - $6, - $7::interval, - $8::interval, - $9 -) -` - -type InsertSubscriberParams struct { - Key model.SubscriberKey - Module string - SubscriptionName string - Deployment model.DeploymentKey - Sink schema.RefKey - RetryAttempts int32 - Backoff sqltypes.Duration - MaxBackoff sqltypes.Duration - CatchVerb optional.Option[schema.RefKey] -} - -func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error { - _, err := q.db.ExecContext(ctx, insertSubscriber, - arg.Key, - arg.Module, - arg.SubscriptionName, - arg.Deployment, - arg.Sink, - arg.RetryAttempts, - arg.Backoff, - arg.MaxBackoff, - arg.CatchVerb, - ) - return err -} - const killStaleControllers = `-- name: KillStaleControllers :one WITH matches AS ( UPDATE controller @@ -2708,12 +2708,12 @@ VALUES ( $6::TEXT ) ON CONFLICT (name, module_id) DO -UPDATE SET +UPDATE SET topic_id = excluded.topic_id, deployment_id = (SELECT id FROM deployments WHERE key = $5::deployment_key) -RETURNING +RETURNING id, - CASE + CASE WHEN xmax = 0 THEN true ELSE false END AS inserted @@ -2755,8 +2755,8 @@ VALUES ( $3::TEXT, $4::TEXT ) -ON CONFLICT (name, module_id) DO -UPDATE SET +ON CONFLICT (name, module_id) DO +UPDATE SET type = $4::TEXT RETURNING id ` diff --git a/backend/controller/sql/schema/20240814060154_rename_events_to_timeline.sql b/backend/controller/sql/schema/20240814060154_rename_events_to_timeline.sql new file mode 100644 index 0000000000..579ed8faa9 --- /dev/null +++ b/backend/controller/sql/schema/20240814060154_rename_events_to_timeline.sql @@ -0,0 +1,6 @@ +-- migrate:up + +ALTER TABLE events RENAME TO timeline; + +-- migrate:down + diff --git a/common/configuration/sql/models.go b/common/configuration/sql/models.go index 731679ea0f..5c2780936a 100644 --- a/common/configuration/sql/models.go +++ b/common/configuration/sql/models.go @@ -435,20 +435,6 @@ type EncryptionKey struct { CreatedAt time.Time } -type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage - ParentRequestID optional.Option[string] -} - type FsmInstance struct { ID int64 CreatedAt time.Time @@ -520,6 +506,20 @@ type Runner struct { Labels json.RawMessage } +type Timeline struct { + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload []byte + ParentRequestID optional.Option[string] +} + type Topic struct { ID int64 Key model.TopicKey diff --git a/internal/encryption/encryption.go b/internal/encryption/encryption.go index b6816e5986..9815b846bc 100644 --- a/internal/encryption/encryption.go +++ b/internal/encryption/encryption.go @@ -19,8 +19,8 @@ import ( type SubKey string const ( - LogsSubKey SubKey = "logs" - AsyncSubKey SubKey = "async" + TimelineSubKey SubKey = "timeline" + AsyncSubKey SubKey = "async" ) type DataEncryptor interface { diff --git a/internal/encryption/encryption_test.go b/internal/encryption/encryption_test.go index f13e4358d5..b30719ff17 100644 --- a/internal/encryption/encryption_test.go +++ b/internal/encryption/encryption_test.go @@ -9,10 +9,10 @@ import ( func TestNoOpEncryptor(t *testing.T) { encryptor := NoOpEncryptorNext{} - encrypted, err := encryptor.Encrypt(LogsSubKey, []byte("hunter2")) + encrypted, err := encryptor.Encrypt(TimelineSubKey, []byte("hunter2")) assert.NoError(t, err) - decrypted, err := encryptor.Decrypt(LogsSubKey, encrypted) + decrypted, err := encryptor.Decrypt(TimelineSubKey, encrypted) assert.NoError(t, err) assert.Equal(t, "hunter2", string(decrypted)) @@ -25,10 +25,10 @@ func TestKMSEncryptorFakeKMS(t *testing.T) { encryptor, err := NewKMSEncryptorGenerateKey(uri, nil) assert.NoError(t, err) - encrypted, err := encryptor.Encrypt(LogsSubKey, []byte("hunter2")) + encrypted, err := encryptor.Encrypt(TimelineSubKey, []byte("hunter2")) assert.NoError(t, err) - decrypted, err := encryptor.Decrypt(LogsSubKey, encrypted) + decrypted, err := encryptor.Decrypt(TimelineSubKey, encrypted) assert.NoError(t, err) assert.Equal(t, "hunter2", string(decrypted)) diff --git a/internal/encryption/integration_test.go b/internal/encryption/integration_test.go index 92cca64ad0..4b0c35d50e 100644 --- a/internal/encryption/integration_test.go +++ b/internal/encryption/integration_test.go @@ -50,9 +50,9 @@ func TestEncryptionForLogs(t *testing.T) { }, // confirm that we can't find that raw request string in the table - in.QueryRow("ftl", "SELECT COUNT(*) FROM events WHERE type = 'call'", int64(1)), + in.QueryRow("ftl", "SELECT COUNT(*) FROM timeline WHERE type = 'call'", int64(1)), func(t testing.TB, ic in.TestContext) { - values := in.GetRow(t, ic, "ftl", "SELECT payload FROM events WHERE type = 'call' LIMIT 1", 1) + values := in.GetRow(t, ic, "ftl", "SELECT payload FROM timeline WHERE type = 'call' LIMIT 1", 1) payload, ok := values[0].([]byte) assert.True(t, ok, "could not convert payload to string") assert.NotContains(t, string(payload), "Alice", "raw request string should not be stored in the table") @@ -130,10 +130,10 @@ func TestKMSEncryptorLocalstack(t *testing.T) { encryptor, err := NewKMSEncryptorGenerateKey(uri, v1client) assert.NoError(t, err) - encrypted, err := encryptor.Encrypt(LogsSubKey, []byte("hunter2")) + encrypted, err := encryptor.Encrypt(TimelineSubKey, []byte("hunter2")) assert.NoError(t, err) - decrypted, err := encryptor.Decrypt(LogsSubKey, encrypted) + decrypted, err := encryptor.Decrypt(TimelineSubKey, encrypted) assert.NoError(t, err) assert.Equal(t, "hunter2", string(decrypted))