Skip to content

Commit

Permalink
chore(refactor): create timeline package in controller (#2674)
Browse files Browse the repository at this point in the history
Shouldn't change any functionality just creating a new package for the
timeline since it's has a bunch of code and we'll be adding many more
event types in the near future.

---------

Co-authored-by: Alec Thomas <[email protected]>
  • Loading branch information
wesbillman and alecthomas authored Sep 14, 2024
1 parent 7a9d9cf commit 88fb90c
Show file tree
Hide file tree
Showing 22 changed files with 1,119 additions and 814 deletions.
65 changes: 0 additions & 65 deletions backend/controller/call_events.go

This file was deleted.

56 changes: 30 additions & 26 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/timeline"
timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect"
Expand All @@ -25,14 +27,16 @@ import (
)

type ConsoleService struct {
dal *dal.DAL
dal *dal.DAL
timeline *timeline.Service
}

var _ pbconsoleconnect.ConsoleServiceHandler = (*ConsoleService)(nil)

func NewService(dal *dal.DAL) *ConsoleService {
func NewService(dal *dal.DAL, timeline *timeline.Service) *ConsoleService {
return &ConsoleService{
dal: dal,
dal: dal,
timeline: timeline,
}
}

Expand Down Expand Up @@ -195,7 +199,7 @@ func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbc
// Get 1 more than the requested limit to determine if there are more results.
limitPlusOne := limit + 1

results, err := c.dal.QueryTimeline(ctx, limitPlusOne, query...)
results, err := c.timeline.QueryTimeline(ctx, limitPlusOne, query...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -238,10 +242,10 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
newQuery := query

if !lastEventTime.IsZero() {
newQuery = append(newQuery, dal.FilterTimeRange(thisRequestTime, lastEventTime))
newQuery = append(newQuery, timelinedal.FilterTimeRange(thisRequestTime, lastEventTime))
}

events, err := c.dal.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...)
events, err := c.timeline.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...)
if err != nil {
return err
}
Expand All @@ -264,11 +268,11 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
}
}

func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, error) {
var query []dal.TimelineFilter
func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFilter, error) {
var query []timelinedal.TimelineFilter

if pb.Order == pbconsole.EventsQuery_DESC {
query = append(query, dal.FilterDescending())
query = append(query, timelinedal.FilterDescending())
}

for _, filter := range pb.Filters {
Expand All @@ -282,7 +286,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err
}
deploymentKeys = append(deploymentKeys, deploymentKey)
}
query = append(query, dal.FilterDeployments(deploymentKeys...))
query = append(query, timelinedal.FilterDeployments(deploymentKeys...))

case *pbconsole.EventsQuery_Filter_Requests:
requestKeys := make([]model.RequestKey, 0, len(filter.Requests.Requests))
Expand All @@ -293,32 +297,32 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err
}
requestKeys = append(requestKeys, requestKey)
}
query = append(query, dal.FilterRequests(requestKeys...))
query = append(query, timelinedal.FilterRequests(requestKeys...))

case *pbconsole.EventsQuery_Filter_EventTypes:
eventTypes := make([]dal.EventType, 0, len(filter.EventTypes.EventTypes))
eventTypes := make([]timelinedal.EventType, 0, len(filter.EventTypes.EventTypes))
for _, eventType := range filter.EventTypes.EventTypes {
switch eventType {
case pbconsole.EventType_EVENT_TYPE_CALL:
eventTypes = append(eventTypes, dal.EventTypeCall)
eventTypes = append(eventTypes, timelinedal.EventTypeCall)
case pbconsole.EventType_EVENT_TYPE_LOG:
eventTypes = append(eventTypes, dal.EventTypeLog)
eventTypes = append(eventTypes, timelinedal.EventTypeLog)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_CREATED:
eventTypes = append(eventTypes, dal.EventTypeDeploymentCreated)
eventTypes = append(eventTypes, timelinedal.EventTypeDeploymentCreated)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_UPDATED:
eventTypes = append(eventTypes, dal.EventTypeDeploymentUpdated)
eventTypes = append(eventTypes, timelinedal.EventTypeDeploymentUpdated)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType))
}
}
query = append(query, dal.FilterTypes(eventTypes...))
query = append(query, timelinedal.FilterTypes(eventTypes...))

case *pbconsole.EventsQuery_Filter_LogLevel:
level := log.Level(filter.LogLevel.LogLevel)
if level < log.Trace || level > log.Error {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown log level %v", filter.LogLevel.LogLevel))
}
query = append(query, dal.FilterLogLevel(level))
query = append(query, timelinedal.FilterLogLevel(level))

case *pbconsole.EventsQuery_Filter_Time:
var newerThan, olderThan time.Time
Expand All @@ -328,7 +332,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err
if filter.Time.OlderThan != nil {
olderThan = filter.Time.OlderThan.AsTime()
}
query = append(query, dal.FilterTimeRange(olderThan, newerThan))
query = append(query, timelinedal.FilterTimeRange(olderThan, newerThan))

case *pbconsole.EventsQuery_Filter_Id:
var lowerThan, higherThan int64
Expand All @@ -338,7 +342,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err
if filter.Id.HigherThan != nil {
higherThan = *filter.Id.HigherThan
}
query = append(query, dal.FilterIDRange(lowerThan, higherThan))
query = append(query, timelinedal.FilterIDRange(lowerThan, higherThan))
case *pbconsole.EventsQuery_Filter_Call:
var sourceModule optional.Option[string]
if filter.Call.SourceModule != nil {
Expand All @@ -348,7 +352,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err
if filter.Call.DestVerb != nil {
destVerb = optional.Some(*filter.Call.DestVerb)
}
query = append(query, dal.FilterCall(sourceModule, filter.Call.DestModule, destVerb))
query = append(query, timelinedal.FilterCall(sourceModule, filter.Call.DestModule, destVerb))

default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown filter %T", filter))
Expand All @@ -357,9 +361,9 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]dal.TimelineFilter, err
return query, nil
}

func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event {
func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event {
switch event := event.(type) {
case *dal.CallEvent:
case *timelinedal.CallEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
rstr := r.String()
Expand Down Expand Up @@ -391,7 +395,7 @@ func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event {
},
}

case *dal.LogEvent:
case *timelinedal.LogEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
rstr := r.String()
Expand All @@ -414,7 +418,7 @@ func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event {
},
}

case *dal.DeploymentCreatedEvent:
case *timelinedal.DeploymentCreatedEvent:
var replaced *string
if r, ok := event.ReplacedDeployment.Get(); ok {
rstr := r.String()
Expand All @@ -433,7 +437,7 @@ func eventDALToProto(event dal.TimelineEvent) *pbconsole.Event {
},
},
}
case *dal.DeploymentUpdatedEvent:
case *timelinedal.DeploymentUpdatedEvent:
return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Expand Down
54 changes: 29 additions & 25 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import (
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/timeline"
timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal"
"github.com/TBD54566975/ftl/backend/libdal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect"
Expand Down Expand Up @@ -145,7 +147,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
sm := cf.SecretsFromContext(ctx)

admin := admin.NewAdminService(cm, sm, svc.dal)
console := console.NewService(svc.dal)
console := console.NewService(svc.dal, svc.timeline)

ingressHandler := http.Handler(svc)
if len(config.AllowOrigins) > 0 {
Expand Down Expand Up @@ -204,6 +206,7 @@ type Service struct {
tasks *scheduledtask.Scheduler
cronJobs *cronjobs.Service
pubSub *pubsub.Manager
timeline *timeline.Service
controllerListListeners []ControllerListListener

// Map from runnerKey.String() to client.
Expand Down Expand Up @@ -232,33 +235,37 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool) (*Service
config.ControllerTimeout = time.Second * 5
}

encryptionSrv, err := encryption.New(ctx, conn, api.NewBuilder().WithKMSURI(optional.Ptr(config.KMSURI)))
encryption, err := encryption.New(ctx, conn, api.NewBuilder().WithKMSURI(optional.Ptr(config.KMSURI)))
if err != nil {
return nil, fmt.Errorf("failed to create encryption dal: %w", err)
}

db := dal.New(ctx, conn, encryptionSrv)
db := dal.New(ctx, conn, encryption)
ldb := leasesdal.New(conn)
svc := &Service{
tasks: scheduledtask.New(ctx, key, ldb),
dal: db,
leasesdal: ldb,
conn: conn,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)),
config: config,
increaseReplicaFailures: map[string]int{},
}
svc.routes.Store(map[string][]dal.Route{})
svc.schema.Store(&schema.Schema{})

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryptionSrv, conn)
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
svc.cronJobs = cronSvc

pubSub := pubsub.New(ctx, db, svc.tasks, svc)
svc.pubSub = pubSub

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc

svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc)

go svc.syncSchema(ctx)

// Use min, max backoff if we are running in production, otherwise use
Expand Down Expand Up @@ -456,15 +463,12 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

err = s.dal.InsertLogEvent(ctx, &dal.LogEvent{
RequestKey: requestKey,
err = s.timeline.RecordLog(ctx, &timeline.Log{
DeploymentKey: deploymentKey,
Time: msg.TimeStamp.AsTime(),
Level: msg.LogLevel,
Attributes: msg.Attributes,
Message: msg.Message,
Error: optional.Ptr(msg.Error),
RequestKey: requestKey,
Msg: msg,
})

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1069,24 +1073,24 @@ func (s *Service) callWithRequest(

response, err := client.verb.Call(ctx, req)
var resp *connect.Response[ftlv1.CallResponse]
var maybeResponse optional.Option[*ftlv1.CallResponse]
var callResponse either.Either[*ftlv1.CallResponse, error]
if err == nil {
resp = connect.NewResponse(response.Msg)
maybeResponse = optional.Some(resp.Msg)
callResponse = either.LeftOf[error](resp.Msg)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]())
} else {
callResponse = either.RightOf[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
}
s.recordCall(ctx, &Call{
deploymentKey: route.Deployment,
requestKey: requestKey,
parentRequestKey: parentKey,
startTime: start,
destVerb: verbRef,
callers: callers,
callError: optional.Nil(err),
request: req.Msg,
response: maybeResponse,
s.timeline.RecordCall(ctx, &timeline.Call{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
ParentRequestKey: parentKey,
StartTime: start,
DestVerb: verbRef,
Callers: callers,
Request: req.Msg,
Response: callResponse,
})
return resp, err
}
Expand Down Expand Up @@ -1817,7 +1821,7 @@ func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
return time.Hour, nil
}

removed, err := s.dal.DeleteOldEvents(ctx, dal.EventTypeCall, *s.config.EventLogRetention)
removed, err := s.timeline.DeleteOldEvents(ctx, timelinedal.EventTypeCall, *s.config.EventLogRetention)
if err != nil {
return 0, fmt.Errorf("failed to prune call events: %w", err)
}
Expand Down
Loading

0 comments on commit 88fb90c

Please sign in to comment.