Skip to content

Commit

Permalink
fix: rename some timeline related names (#2355)
Browse files Browse the repository at this point in the history
1. Renamed the "events" table to "timeline" to more clearly reflect its
purpose and usage.
2. Renamed the encryption key from "logs" to "timeline" as the events
table contains more than just logs. This just makes things a bit
clearer.
  • Loading branch information
alecthomas authored Aug 14, 2024
1 parent 2ab778b commit 306b548
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 221 deletions.
10 changes: 5 additions & 5 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbc
// Get 1 more than the requested limit to determine if there are more results.
limitPlusOne := limit + 1

results, err := c.dal.QueryEvents(ctx, limitPlusOne, query...)
results, err := c.dal.QueryTimeline(ctx, limitPlusOne, query...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
newQuery = append(newQuery, dal.FilterTimeRange(thisRequestTime, lastEventTime))
}

events, err := c.dal.QueryEvents(ctx, int(req.Msg.Query.Limit), newQuery...)
events, err := c.dal.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...)
if err != nil {
return err
}
Expand All @@ -264,8 +264,8 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
}
}

func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error) {
var query []dal.EventFilter
func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, error) {
var query []dal.TimelineFilter

if pb.Order == pbconsole.EventsQuery_DESC {
query = append(query, dal.FilterDescending())
Expand Down Expand Up @@ -357,7 +357,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.EventFilter, error)
return query, nil
}

func eventDALToProto(event dal.Event) *pbconsole.Event {
func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event {
switch event := event.(type) {
case *dal.CallEvent:
var requestKey *string
Expand Down
28 changes: 14 additions & 14 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,14 +709,14 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey
return dalerrs.TranslatePGError(err)
}
}
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]interface{}{
payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]interface{}{
"prev_min_replicas": deployment.MinReplicas,
"min_replicas": minReplicas,
})
if err != nil {
return fmt.Errorf("failed to encrypt payload for InsertDeploymentUpdatedEvent: %w", err)
}
err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{
err = tx.InsertTimelineDeploymentUpdatedEvent(ctx, sql.InsertTimelineDeploymentUpdatedEventParams{
DeploymentKey: key,
Payload: payload,
})
Expand Down Expand Up @@ -782,15 +782,15 @@ func (d *DAL) ReplaceDeployment(ctx context.Context, newDeploymentKey model.Depl
}
}

payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{
payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]any{
"min_replicas": int32(minReplicas),
"replaced": replacedDeploymentKey,
})
if err != nil {
return fmt.Errorf("replace deployment failed to encrypt payload: %w", err)
}

err = tx.InsertDeploymentCreatedEvent(ctx, sql.InsertDeploymentCreatedEventParams{
err = tx.InsertTimelineDeploymentCreatedEvent(ctx, sql.InsertTimelineDeploymentCreatedEventParams{
DeploymentKey: newDeploymentKey,
Language: newDeployment.Language,
ModuleName: newDeployment.ModuleName,
Expand Down Expand Up @@ -1057,11 +1057,11 @@ func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error {
"error": log.Error,
"stack": log.Stack,
}
encryptedPayload, err := d.encryptJSON(encryption.LogsSubKey, payload)
encryptedPayload, err := d.encryptJSON(encryption.TimelineSubKey, payload)
if err != nil {
return fmt.Errorf("failed to encrypt log payload: %w", err)
}
return dalerrs.TranslatePGError(d.db.InsertLogEvent(ctx, sql.InsertLogEventParams{
return dalerrs.TranslatePGError(d.db.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{
DeploymentKey: log.DeploymentKey,
RequestKey: requestKey,
TimeStamp: log.Time,
Expand Down Expand Up @@ -1137,7 +1137,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
if pr, ok := call.ParentRequestKey.Get(); ok {
parentRequestKey = optional.Some(pr.String())
}
payload, err := d.encryptJSON(encryption.LogsSubKey, map[string]any{
payload, err := d.encryptJSON(encryption.TimelineSubKey, map[string]any{
"duration_ms": call.Duration.Milliseconds(),
"request": call.Request,
"response": call.Response,
Expand All @@ -1147,7 +1147,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
if err != nil {
return fmt.Errorf("failed to encrypt call payload: %w", err)
}
return dalerrs.TranslatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{
return dalerrs.TranslatePGError(d.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{
DeploymentKey: call.DeploymentKey,
RequestKey: requestKey,
ParentRequestKey: parentRequestKey,
Expand All @@ -1161,7 +1161,7 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error {
}

func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) {
count, err := d.db.DeleteOldEvents(ctx, sqltypes.Duration(age), eventType)
count, err := d.db.DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType)
return count, dalerrs.TranslatePGError(err)
}

Expand Down
26 changes: 13 additions & 13 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,39 +263,39 @@ func TestDAL(t *testing.T) {

t.Run("QueryEvents", func(t *testing.T) {
t.Run("Limit", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1)
events, err := dal.QueryTimeline(ctx, 1)
assert.NoError(t, err)
assert.Equal(t, 1, len(events))
})

t.Run("NoFilters", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1000)
events, err := dal.QueryTimeline(ctx, 1000)
assert.NoError(t, err)
assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events)
assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events)
})

t.Run("ByDeployment", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1000, FilterDeployments(deploymentKey))
events, err := dal.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey))
assert.NoError(t, err)
assertEventsEqual(t, []Event{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events)
assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events)
})

t.Run("ByCall", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]()))
events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]()))
assert.NoError(t, err)
assertEventsEqual(t, []Event{callEvent}, events)
assertEventsEqual(t, []TimelineEvent{callEvent}, events)
})

t.Run("ByLogLevel", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace))
events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace))
assert.NoError(t, err)
assertEventsEqual(t, []Event{logEvent}, events)
assertEventsEqual(t, []TimelineEvent{logEvent}, events)
})

t.Run("ByRequests", func(t *testing.T) {
events, err := dal.QueryEvents(ctx, 1000, FilterRequests(requestKey))
events, err := dal.QueryTimeline(ctx, 1000, FilterRequests(requestKey))
assert.NoError(t, err)
assertEventsEqual(t, []Event{callEvent, logEvent}, events)
assertEventsEqual(t, []TimelineEvent{callEvent, logEvent}, events)
})
})

Expand Down Expand Up @@ -386,7 +386,7 @@ func TestRunnerStateFromProto(t *testing.T) {
assert.Equal(t, RunnerStateIdle, RunnerStateFromProto(state))
}

func normaliseEvents(events []Event) []Event {
func normaliseEvents(events []TimelineEvent) []TimelineEvent {
for i := range len(events) {
event := events[i]
re := reflect.Indirect(reflect.ValueOf(event))
Expand All @@ -400,7 +400,7 @@ func normaliseEvents(events []Event) []Event {
return events
}

func assertEventsEqual(t *testing.T, expected, actual []Event) {
func assertEventsEqual(t *testing.T, expected, actual []TimelineEvent) {
t.Helper()
assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual))
}
Expand Down
44 changes: 22 additions & 22 deletions backend/controller/dal/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const (
EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated
)

// Event types.
// TimelineEvent types.
//
//sumtype:decl
type Event interface {
type TimelineEvent interface {
GetID() int64
event()
}
Expand Down Expand Up @@ -112,9 +112,9 @@ type eventFilter struct {
descending bool
}

type EventFilter func(query *eventFilter)
type TimelineFilter func(query *eventFilter)

func FilterLogLevel(level log.Level) EventFilter {
func FilterLogLevel(level log.Level) TimelineFilter {
return func(query *eventFilter) {
query.level = &level
}
Expand All @@ -123,27 +123,27 @@ func FilterLogLevel(level log.Level) EventFilter {
// FilterCall filters call events between the given modules.
//
// May be called multiple times.
func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) EventFilter {
func FilterCall(sourceModule optional.Option[string], destModule string, destVerb optional.Option[string]) TimelineFilter {
return func(query *eventFilter) {
query.calls = append(query.calls, &eventFilterCall{sourceModule: sourceModule, destModule: destModule, destVerb: destVerb})
}
}

func FilterDeployments(deploymentKeys ...model.DeploymentKey) EventFilter {
func FilterDeployments(deploymentKeys ...model.DeploymentKey) TimelineFilter {
return func(query *eventFilter) {
query.deployments = append(query.deployments, deploymentKeys...)
}
}

func FilterRequests(requestKeys ...model.RequestKey) EventFilter {
func FilterRequests(requestKeys ...model.RequestKey) TimelineFilter {
return func(query *eventFilter) {
for _, request := range requestKeys {
query.requests = append(query.requests, request.String())
}
}
}

func FilterTypes(types ...sql.EventType) EventFilter {
func FilterTypes(types ...sql.EventType) TimelineFilter {
return func(query *eventFilter) {
query.types = append(query.types, types...)
}
Expand All @@ -152,23 +152,23 @@ func FilterTypes(types ...sql.EventType) EventFilter {
// FilterTimeRange filters events between the given times, inclusive.
//
// Either maybe be zero to indicate no upper or lower bound.
func FilterTimeRange(olderThan, newerThan time.Time) EventFilter {
func FilterTimeRange(olderThan, newerThan time.Time) TimelineFilter {
return func(query *eventFilter) {
query.newerThan = newerThan
query.olderThan = olderThan
}
}

// FilterIDRange filters events between the given IDs, inclusive.
func FilterIDRange(higherThan, lowerThan int64) EventFilter {
func FilterIDRange(higherThan, lowerThan int64) TimelineFilter {
return func(query *eventFilter) {
query.idHigherThan = higherThan
query.idLowerThan = lowerThan
}
}

// FilterDescending returns events in descending order.
func FilterDescending() EventFilter {
func FilterDescending() TimelineFilter {
return func(query *eventFilter) {
query.descending = true
}
Expand Down Expand Up @@ -201,12 +201,12 @@ type eventDeploymentUpdatedJSON struct {
}

type eventRow struct {
sql.Event
sql.Timeline
DeploymentKey model.DeploymentKey
RequestKey optional.Option[model.RequestKey]
}

func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter) ([]Event, error) {
func (d *DAL) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]TimelineEvent, error) {
if limit < 1 {
return nil, fmt.Errorf("limit must be >= 1, got %d", limit)
}
Expand All @@ -222,7 +222,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter
e.custom_key_4,
e.type,
e.payload
FROM events e
FROM timeline e
LEFT JOIN requests ir on e.request_id = ir.id
WHERE true -- The "true" is to simplify the ANDs below.
`
Expand Down Expand Up @@ -256,7 +256,7 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter
deploymentArgs := []any{}
if len(filter.deployments) != 0 {
// Unfortunately, if we use a join here, PG will do a sequential scan on
// events and deployments, making a 7ms query into a 700ms query.
// timeline and deployments, making a 7ms query into a 700ms query.
// https://www.pgexplain.dev/plan/ecd44488-6060-4ad1-a9b4-49d092c3de81
deploymentQuery += ` WHERE key = ANY($1::TEXT[])`
deploymentArgs = append(deploymentArgs, filter.deployments)
Expand Down Expand Up @@ -323,15 +323,15 @@ func (d *DAL) QueryEvents(ctx context.Context, limit int, filters ...EventFilter
}
defer rows.Close()

events, err := d.transformRowsToEvents(deploymentKeys, rows)
events, err := d.transformRowsToTimelineEvents(deploymentKeys, rows)
if err != nil {
return nil, err
}
return events, nil
}

func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]Event, error) {
var out []Event
func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]TimelineEvent, error) {
var out []TimelineEvent
for rows.Next() {
row := eventRow{}
var deploymentID int64
Expand All @@ -349,7 +349,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey
switch row.Type {
case sql.EventTypeLog:
var jsonPayload eventLogJSON
if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil {
if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt log event: %w", err)
}

Expand All @@ -371,7 +371,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey

case sql.EventTypeCall:
var jsonPayload eventCallJSON
if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil {
if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt call event: %w", err)
}
var sourceVerb optional.Option[schema.Ref]
Expand All @@ -396,7 +396,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey

case sql.EventTypeDeploymentCreated:
var jsonPayload eventDeploymentCreatedJSON
if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil {
if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt call event: %w", err)
}
out = append(out, &DeploymentCreatedEvent{
Expand All @@ -411,7 +411,7 @@ func (d *DAL) transformRowsToEvents(deploymentKeys map[int64]model.DeploymentKey

case sql.EventTypeDeploymentUpdated:
var jsonPayload eventDeploymentUpdatedJSON
if err := d.decryptJSON(encryption.LogsSubKey, row.Payload, &jsonPayload); err != nil {
if err := d.decryptJSON(encryption.TimelineSubKey, row.Payload, &jsonPayload); err != nil {
return nil, fmt.Errorf("failed to decrypt call event: %w", err)
}
out = append(out, &DeploymentUpdatedEvent{
Expand Down
Loading

0 comments on commit 306b548

Please sign in to comment.