diff --git a/backend/controller/call_events.go b/backend/controller/call_events.go deleted file mode 100644 index 609e9a3657..0000000000 --- a/backend/controller/call_events.go +++ /dev/null @@ -1,65 +0,0 @@ -package controller - -import ( - "context" - "time" - - "github.com/alecthomas/types/optional" - - "github.com/TBD54566975/ftl/backend/controller/dal" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" - "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/model" -) - -type Call struct { - deploymentKey model.DeploymentKey - requestKey model.RequestKey - parentRequestKey optional.Option[model.RequestKey] - startTime time.Time - destVerb *schema.Ref - callers []*schema.Ref - request *ftlv1.CallRequest - response optional.Option[*ftlv1.CallResponse] - callError optional.Option[error] -} - -func (s *Service) recordCall(ctx context.Context, call *Call) { - logger := log.FromContext(ctx) - var sourceVerb optional.Option[schema.Ref] - if len(call.callers) > 0 { - sourceVerb = optional.Some(*call.callers[0]) - } - - var errorStr optional.Option[string] - var stack optional.Option[string] - var responseBody []byte - - if callError, ok := call.callError.Get(); ok { - errorStr = optional.Some(callError.Error()) - } else if response, ok := call.response.Get(); ok { - responseBody = response.GetBody() - if callError := response.GetError(); callError != nil { - errorStr = optional.Some(callError.Message) - stack = optional.Ptr(callError.Stack) - } - } - - err := s.dal.InsertCallEvent(ctx, &dal.CallEvent{ - Time: call.startTime, - DeploymentKey: call.deploymentKey, - RequestKey: optional.Some(call.requestKey), - ParentRequestKey: call.parentRequestKey, - Duration: time.Since(call.startTime), - SourceVerb: sourceVerb, - DestVerb: *call.destVerb, - Request: call.request.GetBody(), - Response: responseBody, - Error: errorStr, - Stack: stack, - }) - if err != nil { - logger.Errorf(err, "failed to record call") - } -} diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index e3c8f3cf2d..ce8d9748dc 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -13,6 +13,8 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/timeline" + timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect" @@ -25,14 +27,16 @@ import ( ) type ConsoleService struct { - dal *dal.DAL + dal *dal.DAL + timeline *timeline.Service } var _ pbconsoleconnect.ConsoleServiceHandler = (*ConsoleService)(nil) -func NewService(dal *dal.DAL) *ConsoleService { +func NewService(dal *dal.DAL, timeline *timeline.Service) *ConsoleService { return &ConsoleService{ - dal: dal, + dal: dal, + timeline: timeline, } } @@ -195,7 +199,7 @@ func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbc // Get 1 more than the requested limit to determine if there are more results. limitPlusOne := limit + 1 - results, err := c.dal.QueryTimeline(ctx, limitPlusOne, query...) + results, err := c.timeline.QueryTimeline(ctx, limitPlusOne, query...) if err != nil { return nil, err } @@ -238,10 +242,10 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[ newQuery := query if !lastEventTime.IsZero() { - newQuery = append(newQuery, dal.FilterTimeRange(thisRequestTime, lastEventTime)) + newQuery = append(newQuery, timelinedal.FilterTimeRange(thisRequestTime, lastEventTime)) } - events, err := c.dal.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...) + events, err := c.timeline.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...) if err != nil { return err } @@ -264,11 +268,11 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[ } } -func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, error) { - var query []dal.TimelineFilter +func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFilter, error) { + var query []timelinedal.TimelineFilter if pb.Order == pbconsole.EventsQuery_DESC { - query = append(query, dal.FilterDescending()) + query = append(query, timelinedal.FilterDescending()) } for _, filter := range pb.Filters { @@ -282,7 +286,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err } deploymentKeys = append(deploymentKeys, deploymentKey) } - query = append(query, dal.FilterDeployments(deploymentKeys...)) + query = append(query, timelinedal.FilterDeployments(deploymentKeys...)) case *pbconsole.EventsQuery_Filter_Requests: requestKeys := make([]model.RequestKey, 0, len(filter.Requests.Requests)) @@ -293,32 +297,32 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err } requestKeys = append(requestKeys, requestKey) } - query = append(query, dal.FilterRequests(requestKeys...)) + query = append(query, timelinedal.FilterRequests(requestKeys...)) case *pbconsole.EventsQuery_Filter_EventTypes: - eventTypes := make([]dal.EventType, 0, len(filter.EventTypes.EventTypes)) + eventTypes := make([]timelinedal.EventType, 0, len(filter.EventTypes.EventTypes)) for _, eventType := range filter.EventTypes.EventTypes { switch eventType { case pbconsole.EventType_EVENT_TYPE_CALL: - eventTypes = append(eventTypes, dal.EventTypeCall) + eventTypes = append(eventTypes, timelinedal.EventTypeCall) case pbconsole.EventType_EVENT_TYPE_LOG: - eventTypes = append(eventTypes, dal.EventTypeLog) + eventTypes = append(eventTypes, timelinedal.EventTypeLog) case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_CREATED: - eventTypes = append(eventTypes, dal.EventTypeDeploymentCreated) + eventTypes = append(eventTypes, timelinedal.EventTypeDeploymentCreated) case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_UPDATED: - eventTypes = append(eventTypes, dal.EventTypeDeploymentUpdated) + eventTypes = append(eventTypes, timelinedal.EventTypeDeploymentUpdated) default: return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType)) } } - query = append(query, dal.FilterTypes(eventTypes...)) + query = append(query, timelinedal.FilterTypes(eventTypes...)) case *pbconsole.EventsQuery_Filter_LogLevel: level := log.Level(filter.LogLevel.LogLevel) if level < log.Trace || level > log.Error { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown log level %v", filter.LogLevel.LogLevel)) } - query = append(query, dal.FilterLogLevel(level)) + query = append(query, timelinedal.FilterLogLevel(level)) case *pbconsole.EventsQuery_Filter_Time: var newerThan, olderThan time.Time @@ -328,7 +332,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err if filter.Time.OlderThan != nil { olderThan = filter.Time.OlderThan.AsTime() } - query = append(query, dal.FilterTimeRange(olderThan, newerThan)) + query = append(query, timelinedal.FilterTimeRange(olderThan, newerThan)) case *pbconsole.EventsQuery_Filter_Id: var lowerThan, higherThan int64 @@ -338,7 +342,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err if filter.Id.HigherThan != nil { higherThan = *filter.Id.HigherThan } - query = append(query, dal.FilterIDRange(lowerThan, higherThan)) + query = append(query, timelinedal.FilterIDRange(lowerThan, higherThan)) case *pbconsole.EventsQuery_Filter_Call: var sourceModule optional.Option[string] if filter.Call.SourceModule != nil { @@ -348,7 +352,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err if filter.Call.DestVerb != nil { destVerb = optional.Some(*filter.Call.DestVerb) } - query = append(query, dal.FilterCall(sourceModule, filter.Call.DestModule, destVerb)) + query = append(query, timelinedal.FilterCall(sourceModule, filter.Call.DestModule, destVerb)) default: return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown filter %T", filter)) @@ -357,9 +361,9 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err return query, nil } -func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event { +func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event { switch event := event.(type) { - case *dal.CallEvent: + case *timelinedal.CallEvent: var requestKey *string if r, ok := event.RequestKey.Get(); ok { rstr := r.String() @@ -391,7 +395,7 @@ func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event { }, } - case *dal.LogEvent: + case *timelinedal.LogEvent: var requestKey *string if r, ok := event.RequestKey.Get(); ok { rstr := r.String() @@ -414,7 +418,7 @@ func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event { }, } - case *dal.DeploymentCreatedEvent: + case *timelinedal.DeploymentCreatedEvent: var replaced *string if r, ok := event.ReplacedDeployment.Get(); ok { rstr := r.String() @@ -433,7 +437,7 @@ func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event { }, }, } - case *dal.DeploymentUpdatedEvent: + case *timelinedal.DeploymentUpdatedEvent: return &pbconsole.Event{ TimeStamp: timestamppb.New(event.Time), Id: event.ID, diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 2afd227d15..2cf4c337cc 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -47,6 +47,8 @@ import ( "github.com/TBD54566975/ftl/backend/controller/pubsub" "github.com/TBD54566975/ftl/backend/controller/scaling" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + "github.com/TBD54566975/ftl/backend/controller/timeline" + timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal" "github.com/TBD54566975/ftl/backend/libdal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect" @@ -145,7 +147,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali sm := cf.SecretsFromContext(ctx) admin := admin.NewAdminService(cm, sm, svc.dal) - console := console.NewService(svc.dal) + console := console.NewService(svc.dal, svc.timeline) ingressHandler := http.Handler(svc) if len(config.AllowOrigins) > 0 { @@ -204,6 +206,7 @@ type Service struct { tasks *scheduledtask.Scheduler cronJobs *cronjobs.Service pubSub *pubsub.Manager + timeline *timeline.Service controllerListListeners []ControllerListListener // Map from runnerKey.String() to client. @@ -232,12 +235,12 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool) (*Service config.ControllerTimeout = time.Second * 5 } - encryptionSrv, err := encryption.New(ctx, conn, api.NewBuilder().WithKMSURI(optional.Ptr(config.KMSURI))) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder().WithKMSURI(optional.Ptr(config.KMSURI))) if err != nil { return nil, fmt.Errorf("failed to create encryption dal: %w", err) } - db := dal.New(ctx, conn, encryptionSrv) + db := dal.New(ctx, conn, encryption) ldb := leasesdal.New(conn) svc := &Service{ tasks: scheduledtask.New(ctx, key, ldb), @@ -245,7 +248,6 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool) (*Service leasesdal: ldb, conn: conn, key: key, - deploymentLogsSink: newDeploymentLogsSink(ctx, db), clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)), config: config, increaseReplicaFailures: map[string]int{}, @@ -253,12 +255,17 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool) (*Service svc.routes.Store(map[string][]dal.Route{}) svc.schema.Store(&schema.Schema{}) - cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryptionSrv, conn) + cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn) svc.cronJobs = cronSvc pubSub := pubsub.New(ctx, db, svc.tasks, svc) svc.pubSub = pubSub + timelineSvc := timeline.New(ctx, conn, encryption) + svc.timeline = timelineSvc + + svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc) + go svc.syncSchema(ctx) // Use min, max backoff if we are running in production, otherwise use @@ -456,15 +463,12 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie requestKey = optional.Some(rkey) } - err = s.dal.InsertLogEvent(ctx, &dal.LogEvent{ - RequestKey: requestKey, + err = s.timeline.RecordLog(ctx, &timeline.Log{ DeploymentKey: deploymentKey, - Time: msg.TimeStamp.AsTime(), - Level: msg.LogLevel, - Attributes: msg.Attributes, - Message: msg.Message, - Error: optional.Ptr(msg.Error), + RequestKey: requestKey, + Msg: msg, }) + if err != nil { return nil, err } @@ -1077,16 +1081,16 @@ func (s *Service) callWithRequest( } else { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) } - s.recordCall(ctx, &Call{ - deploymentKey: route.Deployment, - requestKey: requestKey, - parentRequestKey: parentKey, - startTime: start, - destVerb: verbRef, - callers: callers, - callError: optional.Nil(err), - request: req.Msg, - response: maybeResponse, + s.timeline.RecordCall(ctx, &timeline.Call{ + DeploymentKey: route.Deployment, + RequestKey: requestKey, + ParentRequestKey: parentKey, + StartTime: start, + DestVerb: verbRef, + Callers: callers, + CallError: optional.Nil(err), + Request: req.Msg, + Response: maybeResponse, }) return resp, err } @@ -1817,7 +1821,7 @@ func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) { return time.Hour, nil } - removed, err := s.dal.DeleteOldEvents(ctx, dal.EventTypeCall, *s.config.EventLogRetention) + removed, err := s.timeline.DeleteOldEvents(ctx, timelinedal.EventTypeCall, *s.config.EventLogRetention) if err != nil { return 0, fmt.Errorf("failed to prune call events: %w", err) } diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 90c088212f..ad2f7bac48 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -880,32 +880,6 @@ func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, return runnerFromDB(row), nil } -func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error { - var requestKey optional.Option[string] - if name, ok := log.RequestKey.Get(); ok { - requestKey = optional.Some(name.String()) - } - - payload := map[string]any{ - "message": log.Message, - "attributes": log.Attributes, - "error": log.Error, - "stack": log.Stack, - } - var encryptedPayload api.EncryptedTimelineColumn - err := d.encryption.EncryptJSON(payload, &encryptedPayload) - if err != nil { - return fmt.Errorf("failed to encrypt log payload: %w", err) - } - return libdal.TranslatePGError(d.db.InsertTimelineLogEvent(ctx, dalsql.InsertTimelineLogEventParams{ - DeploymentKey: log.DeploymentKey, - RequestKey: requestKey, - TimeStamp: log.Time, - Level: log.Level, - Payload: encryptedPayload, - })) -} - func (d *DAL) loadDeployment(ctx context.Context, deployment dalsql.GetDeploymentRow) (*model.Deployment, error) { out := &model.Deployment{ Module: deployment.ModuleName, @@ -960,48 +934,6 @@ func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, add return id, libdal.TranslatePGError(err) } -func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { - var sourceModule, sourceVerb optional.Option[string] - if sr, ok := call.SourceVerb.Get(); ok { - sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name) - } - var requestKey optional.Option[string] - if rn, ok := call.RequestKey.Get(); ok { - requestKey = optional.Some(rn.String()) - } - var parentRequestKey optional.Option[string] - if pr, ok := call.ParentRequestKey.Get(); ok { - parentRequestKey = optional.Some(pr.String()) - } - var payload api.EncryptedTimelineColumn - err := d.encryption.EncryptJSON(map[string]any{ - "duration_ms": call.Duration.Milliseconds(), - "request": call.Request, - "response": call.Response, - "error": call.Error, - "stack": call.Stack, - }, &payload) - if err != nil { - return fmt.Errorf("failed to encrypt call payload: %w", err) - } - return libdal.TranslatePGError(d.db.InsertTimelineCallEvent(ctx, dalsql.InsertTimelineCallEventParams{ - DeploymentKey: call.DeploymentKey, - RequestKey: requestKey, - ParentRequestKey: parentRequestKey, - TimeStamp: call.Time, - SourceModule: sourceModule, - SourceVerb: sourceVerb, - DestModule: call.DestVerb.Module, - DestVerb: call.DestVerb.Name, - Payload: payload, - })) -} - -func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { - count, err := d.db.DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType) - return count, libdal.TranslatePGError(err) -} - func (d *DAL) GetActiveRunners(ctx context.Context) ([]Runner, error) { rows, err := d.db.GetActiveRunners(ctx) if err != nil { diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index d554b4e683..76cf8ba111 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "io" - "reflect" "sync" "testing" "time" @@ -24,7 +23,6 @@ import ( "github.com/TBD54566975/ftl/internal/sha256" ) -//nolint:maintidx func TestDAL(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) @@ -170,75 +168,6 @@ func TestDAL(t *testing.T) { assert.NoError(t, err) }) - callEvent := &CallEvent{ - Time: time.Now().Round(time.Millisecond), - DeploymentKey: deploymentKey, - RequestKey: optional.Some(requestKey), - Request: []byte("{}"), - Response: []byte(`{"time":"now"}`), - DestVerb: schema.Ref{Module: "time", Name: "time"}, - } - t.Run("InsertCallEvent", func(t *testing.T) { - err = dal.InsertCallEvent(ctx, callEvent) - assert.NoError(t, err) - }) - - logEvent := &LogEvent{ - Time: time.Now().Round(time.Millisecond), - DeploymentKey: deploymentKey, - RequestKey: optional.Some(requestKey), - Level: int32(log.Warn), - Attributes: map[string]string{"attr": "value"}, - Message: "A log entry", - } - t.Run("InsertLogEntry", func(t *testing.T) { - err = dal.InsertLogEvent(ctx, logEvent) - assert.NoError(t, err) - }) - - expectedDeploymentUpdatedEvent := &DeploymentUpdatedEvent{ - DeploymentKey: deploymentKey, - MinReplicas: 1, - } - - t.Run("QueryEvents", func(t *testing.T) { - t.Run("Limit", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1) - assert.NoError(t, err) - assert.Equal(t, 1, len(events)) - }) - - t.Run("NoFilters", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000) - assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) - }) - - t.Run("ByDeployment", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey)) - assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) - }) - - t.Run("ByCall", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) - assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{callEvent}, events) - }) - - t.Run("ByLogLevel", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) - assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{logEvent}, events) - }) - - t.Run("ByRequests", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000, FilterRequests(requestKey)) - assert.NoError(t, err) - assertEventsEqual(t, []TimelineEvent{callEvent, logEvent}, events) - }) - }) - t.Run("GetRoutingTable", func(t *testing.T) { routes, err := dal.GetRoutingTable(ctx, []string{deployment.Module}) assert.NoError(t, err) @@ -350,119 +279,3 @@ func TestRunnerStateFromProto(t *testing.T) { state := ftlv1.RunnerState_RUNNER_NEW assert.Equal(t, RunnerStateNew, RunnerStateFromProto(state)) } - -func normaliseEvents(events []TimelineEvent) []TimelineEvent { - for i := range len(events) { - event := events[i] - re := reflect.Indirect(reflect.ValueOf(event)) - f := re.FieldByName("Time") - f.Set(reflect.Zero(f.Type())) - f = re.FieldByName("ID") - f.Set(reflect.Zero(f.Type())) - events[i] = event - } - - return events -} - -func assertEventsEqual(t *testing.T, expected, actual []TimelineEvent) { - t.Helper() - assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual)) -} - -func TestDeleteOldEvents(t *testing.T) { - ctx := log.ContextWithNewDefaultLogger(context.Background()) - conn := sqltest.OpenForTesting(ctx, t) - encryption, err := encryption.New(ctx, conn, api.NewBuilder()) - assert.NoError(t, err) - - dal := New(ctx, conn, encryption) - - var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) - var testSha sha256.SHA256 - - t.Run("CreateArtefact", func(t *testing.T) { - testSha, err = dal.CreateArtefact(ctx, testContent) - assert.NoError(t, err) - }) - - module := &schema.Module{Name: "test"} - var deploymentKey model.DeploymentKey - t.Run("CreateDeployment", func(t *testing.T) { - deploymentKey, err = dal.CreateDeployment(ctx, "go", module, []DeploymentArtefact{{ - Digest: testSha, - Executable: true, - Path: "dir/filename", - }}, nil, nil) - assert.NoError(t, err) - }) - - requestKey := model.NewRequestKey(model.OriginIngress, "GET /test") - // week old event - callEvent := &CallEvent{ - Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond), - DeploymentKey: deploymentKey, - RequestKey: optional.Some(requestKey), - Request: []byte("{}"), - Response: []byte(`{"time": "now"}`), - DestVerb: schema.Ref{Module: "time", Name: "time"}, - } - t.Run("InsertCallEvent", func(t *testing.T) { - err = dal.InsertCallEvent(ctx, callEvent) - assert.NoError(t, err) - }) - // hour old event - callEvent = &CallEvent{ - Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond), - DeploymentKey: deploymentKey, - RequestKey: optional.Some(requestKey), - Request: []byte("{}"), - Response: []byte(`{"time": "now"}`), - DestVerb: schema.Ref{Module: "time", Name: "time"}, - } - t.Run("InsertCallEvent", func(t *testing.T) { - err = dal.InsertCallEvent(ctx, callEvent) - assert.NoError(t, err) - }) - - // week old event - logEvent := &LogEvent{ - Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond), - DeploymentKey: deploymentKey, - RequestKey: optional.Some(requestKey), - Level: int32(log.Warn), - Attributes: map[string]string{"attr": "value"}, - Message: "A log entry", - } - t.Run("InsertLogEntry", func(t *testing.T) { - err = dal.InsertLogEvent(ctx, logEvent) - assert.NoError(t, err) - }) - // hour old event - logEvent = &LogEvent{ - Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond), - DeploymentKey: deploymentKey, - RequestKey: optional.Some(requestKey), - Level: int32(log.Warn), - Attributes: map[string]string{"attr": "value"}, - Message: "A log entry", - } - t.Run("InsertLogEntry", func(t *testing.T) { - err = dal.InsertLogEvent(ctx, logEvent) - assert.NoError(t, err) - }) - - t.Run("DeleteOldEvents", func(t *testing.T) { - count, err := dal.DeleteOldEvents(ctx, EventTypeCall, 2*24*time.Hour) - assert.NoError(t, err) - assert.Equal(t, int64(1), count) - - count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute) - assert.NoError(t, err) - assert.Equal(t, int64(2), count) - - count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute) - assert.NoError(t, err) - assert.Equal(t, int64(0), count) - }) -} diff --git a/backend/controller/dal/internal/sql/models.go b/backend/controller/dal/internal/sql/models.go index a46fe7d077..7cf957c6da 100644 --- a/backend/controller/dal/internal/sql/models.go +++ b/backend/controller/dal/internal/sql/models.go @@ -104,50 +104,6 @@ func (ns NullControllerState) Value() (driver.Value, error) { return string(ns.ControllerState), nil } -type EventType string - -const ( - EventTypeCall EventType = "call" - EventTypeLog EventType = "log" - EventTypeDeploymentCreated EventType = "deployment_created" - EventTypeDeploymentUpdated EventType = "deployment_updated" -) - -func (e *EventType) Scan(src interface{}) error { - switch s := src.(type) { - case []byte: - *e = EventType(s) - case string: - *e = EventType(s) - default: - return fmt.Errorf("unsupported scan type for EventType: %T", src) - } - return nil -} - -type NullEventType struct { - EventType EventType - Valid bool // Valid is true if EventType is not NULL -} - -// Scan implements the Scanner interface. -func (ns *NullEventType) Scan(value interface{}) error { - if value == nil { - ns.EventType, ns.Valid = "", false - return nil - } - ns.Valid = true - return ns.EventType.Scan(value) -} - -// Value implements the driver Valuer interface. -func (ns NullEventType) Value() (driver.Value, error) { - if !ns.Valid { - return nil, nil - } - return string(ns.EventType), nil -} - type FsmStatus string const ( diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index 690159a59d..ff0dd4965f 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -30,12 +30,9 @@ type Querier interface { CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error - DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) - // This is a dummy query to ensure that the Timeline model is generated. - DummyQueryTimeline(ctx context.Context, id int64) (Timeline, error) FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error) FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error) FailFSMInstance(ctx context.Context, fsm schema.RefKey, key string) (bool, error) @@ -83,11 +80,8 @@ type Querier interface { GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) GetZombieAsyncCalls(ctx context.Context, limit int32) ([]AsyncCall, error) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error - InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error - InsertTimelineEvent(ctx context.Context, arg InsertTimelineEventParams) error - InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) // Mark any controller entries that haven't been updated recently as dead. KillStaleControllers(ctx context.Context, timeout sqltypes.Duration) (int64, error) diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index baacce5485..38c8f34e73 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -205,29 +205,6 @@ FROM runners r WHERE state = 'assigned' AND d.key = sqlc.arg('key')::deployment_key; --- name: InsertTimelineLogEvent :exec -INSERT INTO timeline ( - deployment_id, - request_id, - time_stamp, - custom_key_1, - type, - payload -) -VALUES ( - (SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1), - ( - CASE - WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT LIMIT 1) - END - ), - sqlc.arg('time_stamp')::TIMESTAMPTZ, - sqlc.arg('level')::INT, - 'log', - sqlc.arg('payload') -); - -- name: InsertTimelineDeploymentCreatedEvent :exec INSERT INTO timeline ( deployment_id, @@ -268,48 +245,6 @@ VALUES ( sqlc.arg('payload') ); --- name: InsertTimelineCallEvent :exec -INSERT INTO timeline ( - deployment_id, - request_id, - parent_request_id, - time_stamp, - type, - custom_key_1, - custom_key_2, - custom_key_3, - custom_key_4, - payload -) -VALUES ( - (SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key), - (CASE - WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT) - END), - (CASE - WHEN sqlc.narg('parent_request_key')::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('parent_request_key')::TEXT) - END), - sqlc.arg('time_stamp')::TIMESTAMPTZ, - 'call', - sqlc.narg('source_module')::TEXT, - sqlc.narg('source_verb')::TEXT, - sqlc.arg('dest_module')::TEXT, - sqlc.arg('dest_verb')::TEXT, - sqlc.arg('payload') -); - --- name: DeleteOldTimelineEvents :one -WITH deleted AS ( - DELETE FROM timeline - WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL - AND type = sqlc.arg('type') - RETURNING 1 -) -SELECT COUNT(*) -FROM deleted; - -- name: CreateRequest :exec INSERT INTO requests (origin, "key", source_addr) VALUES ($1, $2, $3); @@ -357,14 +292,6 @@ FROM ingress_routes ir INNER JOIN deployments d ON ir.deployment_id = d.id WHERE d.min_replicas > 0; - --- name: InsertTimelineEvent :exec -INSERT INTO timeline (deployment_id, request_id, parent_request_id, type, - custom_key_1, custom_key_2, custom_key_3, custom_key_4, - payload) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) -RETURNING id; - -- name: SucceedAsyncCall :one UPDATE async_calls SET @@ -780,7 +707,3 @@ RETURNING parent_request_key, trace_context, catching; - --- name: DummyQueryTimeline :one --- This is a dummy query to ensure that the Timeline model is generated. -SELECT * FROM timeline WHERE id = @id; diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index f2a32178b8..9436ced395 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -256,24 +256,6 @@ func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.Re return err } -const deleteOldTimelineEvents = `-- name: DeleteOldTimelineEvents :one -WITH deleted AS ( - DELETE FROM timeline - WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL - AND type = $2 - RETURNING 1 -) -SELECT COUNT(*) -FROM deleted -` - -func (q *Queries) DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) { - row := q.db.QueryRowContext(ctx, deleteOldTimelineEvents, timeout, type_) - var count int64 - err := row.Scan(&count) - return count, err -} - const deleteSubscribers = `-- name: DeleteSubscribers :many DELETE FROM topic_subscribers WHERE deployment_id IN ( @@ -357,30 +339,6 @@ func (q *Queries) DeregisterRunner(ctx context.Context, key model.RunnerKey) (in return count, err } -const dummyQueryTimeline = `-- name: DummyQueryTimeline :one -SELECT id, time_stamp, deployment_id, request_id, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload, parent_request_id FROM timeline WHERE id = $1 -` - -// This is a dummy query to ensure that the Timeline model is generated. -func (q *Queries) DummyQueryTimeline(ctx context.Context, id int64) (Timeline, error) { - row := q.db.QueryRowContext(ctx, dummyQueryTimeline, id) - var i Timeline - err := row.Scan( - &i.ID, - &i.TimeStamp, - &i.DeploymentID, - &i.RequestID, - &i.Type, - &i.CustomKey1, - &i.CustomKey2, - &i.CustomKey3, - &i.CustomKey4, - &i.Payload, - &i.ParentRequestID, - ) - return i, err -} - const failAsyncCall = `-- name: FailAsyncCall :one UPDATE async_calls SET @@ -1799,66 +1757,6 @@ func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberPara return err } -const insertTimelineCallEvent = `-- name: InsertTimelineCallEvent :exec -INSERT INTO timeline ( - deployment_id, - request_id, - parent_request_id, - time_stamp, - type, - custom_key_1, - custom_key_2, - custom_key_3, - custom_key_4, - payload -) -VALUES ( - (SELECT id FROM deployments WHERE deployments.key = $1::deployment_key), - (CASE - WHEN $2::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT) - END), - (CASE - WHEN $3::TEXT IS NULL THEN NULL - ELSE (SELECT id FROM requests ir WHERE ir.key = $3::TEXT) - END), - $4::TIMESTAMPTZ, - 'call', - $5::TEXT, - $6::TEXT, - $7::TEXT, - $8::TEXT, - $9 -) -` - -type InsertTimelineCallEventParams struct { - DeploymentKey model.DeploymentKey - RequestKey optional.Option[string] - ParentRequestKey optional.Option[string] - TimeStamp time.Time - SourceModule optional.Option[string] - SourceVerb optional.Option[string] - DestModule string - DestVerb string - Payload api.EncryptedTimelineColumn -} - -func (q *Queries) InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error { - _, err := q.db.ExecContext(ctx, insertTimelineCallEvent, - arg.DeploymentKey, - arg.RequestKey, - arg.ParentRequestKey, - arg.TimeStamp, - arg.SourceModule, - arg.SourceVerb, - arg.DestModule, - arg.DestVerb, - arg.Payload, - ) - return err -} - const insertTimelineDeploymentCreatedEvent = `-- name: InsertTimelineDeploymentCreatedEvent :exec INSERT INTO timeline ( deployment_id, diff --git a/backend/controller/deployment_logs.go b/backend/controller/deployment_logs.go index 78aef8a64d..4f47bbf29e 100644 --- a/backend/controller/deployment_logs.go +++ b/backend/controller/deployment_logs.go @@ -7,17 +7,18 @@ import ( "github.com/alecthomas/types/optional" - "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/timeline" + timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) var _ log.Sink = (*deploymentLogsSink)(nil) -func newDeploymentLogsSink(ctx context.Context, dal *dal.DAL) *deploymentLogsSink { +func newDeploymentLogsSink(ctx context.Context, timeline *timeline.Service) *deploymentLogsSink { sink := &deploymentLogsSink{ logQueue: make(chan log.Entry, 10000), - dal: dal, + timeline: timeline, } // Process logs in background @@ -28,7 +29,7 @@ func newDeploymentLogsSink(ctx context.Context, dal *dal.DAL) *deploymentLogsSin type deploymentLogsSink struct { logQueue chan log.Entry - dal *dal.DAL + timeline *timeline.Service } // Log implements Sink @@ -71,7 +72,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) { } } - err = d.dal.InsertLogEvent(ctx, &dal.LogEvent{ + err = d.timeline.InsertLogEvent(ctx, &timelinedal.LogEvent{ RequestKey: request, DeploymentKey: deployment, Time: entry.Time, diff --git a/backend/controller/dal/events.go b/backend/controller/timeline/dal/dal.go similarity index 81% rename from backend/controller/dal/events.go rename to backend/controller/timeline/dal/dal.go index 931aa0e427..39d332c77b 100644 --- a/backend/controller/dal/events.go +++ b/backend/controller/timeline/dal/dal.go @@ -8,15 +8,107 @@ import ( "strconv" "time" - "github.com/alecthomas/types/optional" - - "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" + "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/controller/timeline/dal/internal/sql" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" + "github.com/alecthomas/types/optional" ) +type DAL struct { + *libdal.Handle[DAL] + db sql.Querier + encryption *encryption.Service +} + +func New(conn libdal.Connection, encryption *encryption.Service) *DAL { + var d *DAL + d = &DAL{ + db: sql.New(conn), + encryption: encryption, + Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { + return &DAL{ + Handle: h, + db: sql.New(h.Connection), + encryption: d.encryption, + } + }), + } + return d +} + +func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error { + var requestKey optional.Option[string] + if name, ok := log.RequestKey.Get(); ok { + requestKey = optional.Some(name.String()) + } + + payload := map[string]any{ + "message": log.Message, + "attributes": log.Attributes, + "error": log.Error, + "stack": log.Stack, + } + var encryptedPayload ftlencryption.EncryptedTimelineColumn + err := d.encryption.EncryptJSON(payload, &encryptedPayload) + if err != nil { + return fmt.Errorf("failed to encrypt log payload: %w", err) + } + return libdal.TranslatePGError(d.db.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{ + DeploymentKey: log.DeploymentKey, + RequestKey: requestKey, + TimeStamp: log.Time, + Level: log.Level, + Payload: encryptedPayload, + })) +} + +func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { + var sourceModule, sourceVerb optional.Option[string] + if sr, ok := call.SourceVerb.Get(); ok { + sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name) + } + var requestKey optional.Option[string] + if rn, ok := call.RequestKey.Get(); ok { + requestKey = optional.Some(rn.String()) + } + var parentRequestKey optional.Option[string] + if pr, ok := call.ParentRequestKey.Get(); ok { + parentRequestKey = optional.Some(pr.String()) + } + var payload ftlencryption.EncryptedTimelineColumn + err := d.encryption.EncryptJSON(map[string]any{ + "duration_ms": call.Duration.Milliseconds(), + "request": call.Request, + "response": call.Response, + "error": call.Error, + "stack": call.Stack, + }, &payload) + if err != nil { + return fmt.Errorf("failed to encrypt call payload: %w", err) + } + return libdal.TranslatePGError(d.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{ + DeploymentKey: call.DeploymentKey, + RequestKey: requestKey, + ParentRequestKey: parentRequestKey, + TimeStamp: call.Time, + SourceModule: sourceModule, + SourceVerb: sourceVerb, + DestModule: call.DestVerb.Module, + DestVerb: call.DestVerb.Name, + Payload: payload, + })) +} + +func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { + count, err := d.db.DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType) + return count, libdal.TranslatePGError(err) +} + type EventType = sql.EventType // Supported event types. diff --git a/backend/controller/timeline/dal/dal_test.go b/backend/controller/timeline/dal/dal_test.go new file mode 100644 index 0000000000..200a441f8a --- /dev/null +++ b/backend/controller/timeline/dal/dal_test.go @@ -0,0 +1,252 @@ +package dal + +import ( + "bytes" + "context" + "reflect" + "testing" + "time" + + controllerdal "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" + "github.com/TBD54566975/ftl/backend/schema" + ftlencryption "github.com/TBD54566975/ftl/internal/encryption" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/sha256" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" +) + +func TestTimelineDAL(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + conn := sqltest.OpenForTesting(ctx, t) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) + assert.NoError(t, err) + + dal := New(conn, encryption) + controllerDAL := controllerdal.New(ctx, conn, encryption) + + var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) + + t.Run("UpsertModule", func(t *testing.T) { + err = controllerDAL.UpsertModule(ctx, "go", "test") + assert.NoError(t, err) + }) + + var testSha sha256.SHA256 + + t.Run("CreateArtefact", func(t *testing.T) { + testSha, err = controllerDAL.CreateArtefact(ctx, testContent) + assert.NoError(t, err) + }) + + module := &schema.Module{Name: "test"} + var deploymentKey model.DeploymentKey + t.Run("CreateDeployment", func(t *testing.T) { + deploymentKey, err = controllerDAL.CreateDeployment(ctx, "go", module, []controllerdal.DeploymentArtefact{{ + Digest: testSha, + Executable: true, + Path: "dir/filename", + }}, nil, nil) + assert.NoError(t, err) + }) + + t.Run("SetDeploymentReplicas", func(t *testing.T) { + err := controllerDAL.SetDeploymentReplicas(ctx, deploymentKey, 1) + assert.NoError(t, err) + }) + + requestKey := model.NewRequestKey(model.OriginIngress, "GET /test") + t.Run("CreateIngressRequest", func(t *testing.T) { + err = controllerDAL.CreateRequest(ctx, requestKey, "127.0.0.1:1234") + assert.NoError(t, err) + }) + + callEvent := &CallEvent{ + Time: time.Now().Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Request: []byte("{}"), + Response: []byte(`{"time":"now"}`), + DestVerb: schema.Ref{Module: "time", Name: "time"}, + } + t.Run("InsertCallEvent", func(t *testing.T) { + err = dal.InsertCallEvent(ctx, callEvent) + assert.NoError(t, err) + }) + + logEvent := &LogEvent{ + Time: time.Now().Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Level: int32(log.Warn), + Attributes: map[string]string{"attr": "value"}, + Message: "A log entry", + } + t.Run("InsertLogEntry", func(t *testing.T) { + err = dal.InsertLogEvent(ctx, logEvent) + assert.NoError(t, err) + }) + + expectedDeploymentUpdatedEvent := &DeploymentUpdatedEvent{ + DeploymentKey: deploymentKey, + MinReplicas: 1, + } + + t.Run("QueryEvents", func(t *testing.T) { + t.Run("Limit", func(t *testing.T) { + events, err := dal.QueryTimeline(ctx, 1) + assert.NoError(t, err) + assert.Equal(t, 1, len(events)) + }) + + t.Run("NoFilters", func(t *testing.T) { + events, err := dal.QueryTimeline(ctx, 1000) + assert.NoError(t, err) + assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) + }) + + t.Run("ByDeployment", func(t *testing.T) { + events, err := dal.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey)) + assert.NoError(t, err) + assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) + }) + + t.Run("ByCall", func(t *testing.T) { + events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) + assert.NoError(t, err) + assertEventsEqual(t, []TimelineEvent{callEvent}, events) + }) + + t.Run("ByLogLevel", func(t *testing.T) { + events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) + assert.NoError(t, err) + assertEventsEqual(t, []TimelineEvent{logEvent}, events) + }) + + t.Run("ByRequests", func(t *testing.T) { + events, err := dal.QueryTimeline(ctx, 1000, FilterRequests(requestKey)) + assert.NoError(t, err) + assertEventsEqual(t, []TimelineEvent{callEvent, logEvent}, events) + }) + }) +} + +func normaliseEvents(events []TimelineEvent) []TimelineEvent { + for i := range len(events) { + event := events[i] + re := reflect.Indirect(reflect.ValueOf(event)) + f := re.FieldByName("Time") + f.Set(reflect.Zero(f.Type())) + f = re.FieldByName("ID") + f.Set(reflect.Zero(f.Type())) + events[i] = event + } + + return events +} + +func assertEventsEqual(t *testing.T, expected, actual []TimelineEvent) { + t.Helper() + assert.Equal(t, normaliseEvents(expected), normaliseEvents(actual)) +} + +func TestDeleteOldEvents(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + conn := sqltest.OpenForTesting(ctx, t) + encryption, err := encryption.New(ctx, conn, ftlencryption.NewBuilder()) + assert.NoError(t, err) + + dal := New(conn, encryption) + controllerDAL := controllerdal.New(ctx, conn, encryption) + + var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) + var testSha sha256.SHA256 + + t.Run("CreateArtefact", func(t *testing.T) { + testSha, err = controllerDAL.CreateArtefact(ctx, testContent) + assert.NoError(t, err) + }) + + module := &schema.Module{Name: "test"} + var deploymentKey model.DeploymentKey + t.Run("CreateDeployment", func(t *testing.T) { + deploymentKey, err = controllerDAL.CreateDeployment(ctx, "go", module, []controllerdal.DeploymentArtefact{{ + Digest: testSha, + Executable: true, + Path: "dir/filename", + }}, nil, nil) + assert.NoError(t, err) + }) + + requestKey := model.NewRequestKey(model.OriginIngress, "GET /test") + // week old event + callEvent := &CallEvent{ + Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Request: []byte("{}"), + Response: []byte(`{"time": "now"}`), + DestVerb: schema.Ref{Module: "time", Name: "time"}, + } + t.Run("InsertCallEvent", func(t *testing.T) { + err = dal.InsertCallEvent(ctx, callEvent) + assert.NoError(t, err) + }) + // hour old event + callEvent = &CallEvent{ + Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Request: []byte("{}"), + Response: []byte(`{"time": "now"}`), + DestVerb: schema.Ref{Module: "time", Name: "time"}, + } + t.Run("InsertCallEvent", func(t *testing.T) { + err = dal.InsertCallEvent(ctx, callEvent) + assert.NoError(t, err) + }) + + // week old event + logEvent := &LogEvent{ + Time: time.Now().Add(-24 * 7 * time.Hour).Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Level: int32(log.Warn), + Attributes: map[string]string{"attr": "value"}, + Message: "A log entry", + } + t.Run("InsertLogEntry", func(t *testing.T) { + err = dal.InsertLogEvent(ctx, logEvent) + assert.NoError(t, err) + }) + // hour old event + logEvent = &LogEvent{ + Time: time.Now().Add(-1 * time.Hour).Round(time.Millisecond), + DeploymentKey: deploymentKey, + RequestKey: optional.Some(requestKey), + Level: int32(log.Warn), + Attributes: map[string]string{"attr": "value"}, + Message: "A log entry", + } + t.Run("InsertLogEntry", func(t *testing.T) { + err = dal.InsertLogEvent(ctx, logEvent) + assert.NoError(t, err) + }) + + t.Run("DeleteOldEvents", func(t *testing.T) { + count, err := dal.DeleteOldEvents(ctx, EventTypeCall, 2*24*time.Hour) + assert.NoError(t, err) + assert.Equal(t, int64(1), count) + + count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute) + assert.NoError(t, err) + assert.Equal(t, int64(2), count) + + count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute) + assert.NoError(t, err) + assert.Equal(t, int64(0), count) + }) +} diff --git a/backend/controller/timeline/dal/internal/sql/db.go b/backend/controller/timeline/dal/internal/sql/db.go new file mode 100644 index 0000000000..0e0973111c --- /dev/null +++ b/backend/controller/timeline/dal/internal/sql/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/backend/controller/timeline/dal/internal/sql/models.go b/backend/controller/timeline/dal/internal/sql/models.go new file mode 100644 index 0000000000..11ad26d656 --- /dev/null +++ b/backend/controller/timeline/dal/internal/sql/models.go @@ -0,0 +1,73 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "database/sql" + "database/sql/driver" + "fmt" + "time" + + "github.com/TBD54566975/ftl/internal/encryption" + "github.com/alecthomas/types/optional" +) + +type EventType string + +const ( + EventTypeCall EventType = "call" + EventTypeLog EventType = "log" + EventTypeDeploymentCreated EventType = "deployment_created" + EventTypeDeploymentUpdated EventType = "deployment_updated" +) + +func (e *EventType) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = EventType(s) + case string: + *e = EventType(s) + default: + return fmt.Errorf("unsupported scan type for EventType: %T", src) + } + return nil +} + +type NullEventType struct { + EventType EventType + Valid bool // Valid is true if EventType is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullEventType) Scan(value interface{}) error { + if value == nil { + ns.EventType, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.EventType.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullEventType) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.EventType), nil +} + +type Timeline struct { + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID sql.NullInt64 + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload encryption.EncryptedTimelineColumn + ParentRequestID optional.Option[string] +} diff --git a/backend/controller/timeline/dal/internal/sql/querier.go b/backend/controller/timeline/dal/internal/sql/querier.go new file mode 100644 index 0000000000..afa6fa3142 --- /dev/null +++ b/backend/controller/timeline/dal/internal/sql/querier.go @@ -0,0 +1,21 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sql + +import ( + "context" + + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" +) + +type Querier interface { + DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) + // This is a dummy query to ensure that the Timeline model is generated. + DummyQueryTimeline(ctx context.Context, id int64) (Timeline, error) + InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error + InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error +} + +var _ Querier = (*Queries)(nil) diff --git a/backend/controller/timeline/dal/internal/sql/queries.sql b/backend/controller/timeline/dal/internal/sql/queries.sql new file mode 100644 index 0000000000..8e989ef779 --- /dev/null +++ b/backend/controller/timeline/dal/internal/sql/queries.sql @@ -0,0 +1,68 @@ +-- name: InsertTimelineLogEvent :exec +INSERT INTO timeline ( + deployment_id, + request_id, + time_stamp, + custom_key_1, + type, + payload +) +VALUES ( + (SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1), + ( + CASE + WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT LIMIT 1) + END + ), + sqlc.arg('time_stamp')::TIMESTAMPTZ, + sqlc.arg('level')::INT, + 'log', + sqlc.arg('payload') +); + +-- name: InsertTimelineCallEvent :exec +INSERT INTO timeline ( + deployment_id, + request_id, + parent_request_id, + time_stamp, + type, + custom_key_1, + custom_key_2, + custom_key_3, + custom_key_4, + payload +) +VALUES ( + (SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key), + (CASE + WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT) + END), + (CASE + WHEN sqlc.narg('parent_request_key')::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('parent_request_key')::TEXT) + END), + sqlc.arg('time_stamp')::TIMESTAMPTZ, + 'call', + sqlc.narg('source_module')::TEXT, + sqlc.narg('source_verb')::TEXT, + sqlc.arg('dest_module')::TEXT, + sqlc.arg('dest_verb')::TEXT, + sqlc.arg('payload') +); + +-- name: DeleteOldTimelineEvents :one +WITH deleted AS ( + DELETE FROM timeline + WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL + AND type = sqlc.arg('type') + RETURNING 1 +) +SELECT COUNT(*) +FROM deleted; + +-- name: DummyQueryTimeline :one +-- This is a dummy query to ensure that the Timeline model is generated. +SELECT * FROM timeline WHERE id = @id; diff --git a/backend/controller/timeline/dal/internal/sql/queries.sql.go b/backend/controller/timeline/dal/internal/sql/queries.sql.go new file mode 100644 index 0000000000..19914f65a5 --- /dev/null +++ b/backend/controller/timeline/dal/internal/sql/queries.sql.go @@ -0,0 +1,160 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: queries.sql + +package sql + +import ( + "context" + "time" + + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/internal/encryption" + "github.com/alecthomas/types/optional" +) + +const deleteOldTimelineEvents = `-- name: DeleteOldTimelineEvents :one +WITH deleted AS ( + DELETE FROM timeline + WHERE time_stamp < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL + AND type = $2 + RETURNING 1 +) +SELECT COUNT(*) +FROM deleted +` + +func (q *Queries) DeleteOldTimelineEvents(ctx context.Context, timeout sqltypes.Duration, type_ EventType) (int64, error) { + row := q.db.QueryRowContext(ctx, deleteOldTimelineEvents, timeout, type_) + var count int64 + err := row.Scan(&count) + return count, err +} + +const dummyQueryTimeline = `-- name: DummyQueryTimeline :one +SELECT id, time_stamp, deployment_id, request_id, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload, parent_request_id FROM timeline WHERE id = $1 +` + +// This is a dummy query to ensure that the Timeline model is generated. +func (q *Queries) DummyQueryTimeline(ctx context.Context, id int64) (Timeline, error) { + row := q.db.QueryRowContext(ctx, dummyQueryTimeline, id) + var i Timeline + err := row.Scan( + &i.ID, + &i.TimeStamp, + &i.DeploymentID, + &i.RequestID, + &i.Type, + &i.CustomKey1, + &i.CustomKey2, + &i.CustomKey3, + &i.CustomKey4, + &i.Payload, + &i.ParentRequestID, + ) + return i, err +} + +const insertTimelineCallEvent = `-- name: InsertTimelineCallEvent :exec +INSERT INTO timeline ( + deployment_id, + request_id, + parent_request_id, + time_stamp, + type, + custom_key_1, + custom_key_2, + custom_key_3, + custom_key_4, + payload +) +VALUES ( + (SELECT id FROM deployments WHERE deployments.key = $1::deployment_key), + (CASE + WHEN $2::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT) + END), + (CASE + WHEN $3::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = $3::TEXT) + END), + $4::TIMESTAMPTZ, + 'call', + $5::TEXT, + $6::TEXT, + $7::TEXT, + $8::TEXT, + $9 +) +` + +type InsertTimelineCallEventParams struct { + DeploymentKey interface{} + RequestKey optional.Option[string] + ParentRequestKey optional.Option[string] + TimeStamp time.Time + SourceModule optional.Option[string] + SourceVerb optional.Option[string] + DestModule string + DestVerb string + Payload encryption.EncryptedTimelineColumn +} + +func (q *Queries) InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineCallEvent, + arg.DeploymentKey, + arg.RequestKey, + arg.ParentRequestKey, + arg.TimeStamp, + arg.SourceModule, + arg.SourceVerb, + arg.DestModule, + arg.DestVerb, + arg.Payload, + ) + return err +} + +const insertTimelineLogEvent = `-- name: InsertTimelineLogEvent :exec +INSERT INTO timeline ( + deployment_id, + request_id, + time_stamp, + custom_key_1, + type, + payload +) +VALUES ( + (SELECT id FROM deployments d WHERE d.key = $1::deployment_key LIMIT 1), + ( + CASE + WHEN $2::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT LIMIT 1) + END + ), + $3::TIMESTAMPTZ, + $4::INT, + 'log', + $5 +) +` + +type InsertTimelineLogEventParams struct { + DeploymentKey interface{} + RequestKey optional.Option[string] + TimeStamp time.Time + Level int32 + Payload encryption.EncryptedTimelineColumn +} + +func (q *Queries) InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineLogEvent, + arg.DeploymentKey, + arg.RequestKey, + arg.TimeStamp, + arg.Level, + arg.Payload, + ) + return err +} diff --git a/backend/controller/timeline/timeline.go b/backend/controller/timeline/timeline.go new file mode 100644 index 0000000000..360399d2ca --- /dev/null +++ b/backend/controller/timeline/timeline.go @@ -0,0 +1,122 @@ +package timeline + +import ( + "context" + "fmt" + "time" + + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/backend/controller/encryption" + "github.com/TBD54566975/ftl/backend/controller/timeline/dal" + "github.com/TBD54566975/ftl/backend/libdal" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" +) + +type Service struct { + dal dal.DAL +} + +func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *Service { + return &Service{dal: *dal.New(conn, encryption)} +} + +type Log struct { + DeploymentKey model.DeploymentKey + RequestKey optional.Option[model.RequestKey] + Msg *ftlv1.StreamDeploymentLogsRequest +} + +func (s *Service) RecordLog(ctx context.Context, log *Log) error { + err := s.dal.InsertLogEvent(ctx, &dal.LogEvent{ + RequestKey: log.RequestKey, + DeploymentKey: log.DeploymentKey, + Time: log.Msg.TimeStamp.AsTime(), + Level: log.Msg.LogLevel, + Attributes: log.Msg.Attributes, + Message: log.Msg.Message, + Error: optional.Ptr(log.Msg.Error), + }) + if err != nil { + return fmt.Errorf("failed to insert log event: %w", err) + } + return nil +} + +type Call struct { + DeploymentKey model.DeploymentKey + RequestKey model.RequestKey + ParentRequestKey optional.Option[model.RequestKey] + StartTime time.Time + DestVerb *schema.Ref + Callers []*schema.Ref + Request *ftlv1.CallRequest + Response optional.Option[*ftlv1.CallResponse] + CallError optional.Option[error] +} + +func (s *Service) RecordCall(ctx context.Context, call *Call) { + logger := log.FromContext(ctx) + var sourceVerb optional.Option[schema.Ref] + if len(call.Callers) > 0 { + sourceVerb = optional.Some(*call.Callers[0]) + } + + var errorStr optional.Option[string] + var stack optional.Option[string] + var responseBody []byte + + if callError, ok := call.CallError.Get(); ok { + errorStr = optional.Some(callError.Error()) + } else if response, ok := call.Response.Get(); ok { + responseBody = response.GetBody() + if callError := response.GetError(); callError != nil { + errorStr = optional.Some(callError.Message) + stack = optional.Ptr(callError.Stack) + } + } + + err := s.dal.InsertCallEvent(ctx, &dal.CallEvent{ + Time: call.StartTime, + DeploymentKey: call.DeploymentKey, + RequestKey: optional.Some(call.RequestKey), + ParentRequestKey: call.ParentRequestKey, + Duration: time.Since(call.StartTime), + SourceVerb: sourceVerb, + DestVerb: *call.DestVerb, + Request: call.Request.GetBody(), + Response: responseBody, + Error: errorStr, + Stack: stack, + }) + if err != nil { + logger.Errorf(err, "failed to record call") + } +} + +func (s *Service) InsertLogEvent(ctx context.Context, log *dal.LogEvent) error { + err := s.dal.InsertLogEvent(ctx, log) + if err != nil { + return fmt.Errorf("failed to insert log event: %w", err) + } + return nil +} + +func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...dal.TimelineFilter) ([]dal.TimelineEvent, error) { + events, err := s.dal.QueryTimeline(ctx, limit, filters...) + if err != nil { + return nil, fmt.Errorf("failed to query timeline: %w", err) + } + return events, nil +} + +func (s *Service) DeleteOldEvents(ctx context.Context, eventType dal.EventType, age time.Duration) (int64, error) { + deleted, err := s.dal.DeleteOldEvents(ctx, eventType, age) + if err != nil { + return 0, fmt.Errorf("failed to delete old events: %w", err) + } + return deleted, nil +} diff --git a/sqlc.yaml b/sqlc.yaml index 0c46f8f1d2..ef8601c182 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -178,6 +178,38 @@ sql: go: <<: *gengo out: "backend/controller/encryption/dal/internal/sql" + - <<: *daldir + queries: + - backend/controller/timeline/dal/internal/sql/queries.sql + gen: + go: + <<: *gengo + out: "backend/controller/timeline/dal/internal/sql" + overrides: + - db_type: "encrypted_timeline" + go_type: "github.com/TBD54566975/ftl/internal/encryption.EncryptedTimelineColumn" + - db_type: "encrypted_timeline" + nullable: true + go_type: "github.com/TBD54566975/ftl/internal/encryption.OptionalEncryptedTimelineColumn" + - db_type: "timestamptz" + go_type: "time.Time" + - db_type: "timestamptz" + nullable: true + go_type: + type: "optional.Option[time.Time]" + - db_type: "pg_catalog.interval" + go_type: "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes.Duration" + - db_type: "pg_catalog.interval" + nullable: true + go_type: + type: "optional.Option[sqltypes.Duration]" + - db_type: "text" + go_type: "string" + - db_type: "text" + nullable: true + go_type: + import: "github.com/alecthomas/types/optional" + type: "Option[string]" rules: - name: postgresql-query-too-costly message: "Query cost estimate is too high"