From c5f8097255bfc4da0bf6573618fc470e98bfb9b5 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 6 Dec 2024 12:12:57 +1100 Subject: [PATCH] =?UTF-8?q?fix:=20some=20events=20weren=E2=80=99t=20being?= =?UTF-8?q?=20published=20to=20new=20timeline=20service=20(#3650)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/controller/controller.go | 28 +++++++++++--------- backend/controller/timeline/timeline.go | 35 +++---------------------- 2 files changed, 19 insertions(+), 44 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 9b2494dc9d..79f9e46b68 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -22,6 +22,7 @@ import ( "github.com/alecthomas/kong" "github.com/alecthomas/types/either" "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/result" "github.com/jackc/pgx/v5" "github.com/jellydator/ttlcache/v3" "github.com/jpillora/backoff" @@ -43,7 +44,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/pubsub" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" - "github.com/TBD54566975/ftl/backend/controller/timeline" + oldtimeline "github.com/TBD54566975/ftl/backend/controller/timeline" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect" ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1" @@ -55,6 +56,7 @@ import ( "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/backend/timeline" frontend "github.com/TBD54566975/ftl/frontend/console" "github.com/TBD54566975/ftl/internal/configuration" cf "github.com/TBD54566975/ftl/internal/configuration/manager" @@ -207,7 +209,7 @@ type Service struct { tasks *scheduledtask.Scheduler pubSub *pubsub.Service registry artefacts.Service - timeline *timeline.Service + timeline *oldtimeline.Service controllerListListeners []ControllerListListener // Map from runnerKey.String() to client. @@ -273,7 +275,7 @@ func New( } svc.registry = storage - timelineSvc := timeline.New(ctx, conn, encryption) + timelineSvc := oldtimeline.New(ctx, conn, encryption) svc.timeline = timelineSvc pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc)) svc.pubSub = pubSub @@ -463,7 +465,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie requestKey = optional.Some(rkey) } - s.timeline.EnqueueEvent(ctx, &timeline.Log{ + timeline.Publish(ctx, &timeline.Log{ DeploymentKey: deploymentKey, RequestKey: requestKey, Time: msg.TimeStamp.AsTime(), @@ -901,7 +903,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldepl routes := s.routeTable.Current() route, ok := routes.GetDeployment(module).Get() if ok { - s.timeline.EnqueueEvent(ctx, &timeline.PubSubPublish{ + timeline.Publish(ctx, &timeline.PubSubPublish{ DeploymentKey: route, RequestKey: requestKey, Time: now, @@ -1018,16 +1020,16 @@ func (s *Service) callWithRequest( if currentCaller != nil && currentCaller.Module != module && !verb.IsExported() { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported")) err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef)) - callEvent.Response = either.RightOf[*ftlv1.CallResponse](err) - s.timeline.EnqueueEvent(ctx, callEvent) + callEvent.Response = result.Err[*ftlv1.CallResponse](err) + timeline.Publish(ctx, callEvent) return nil, connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef)) } err = validateCallBody(req.Msg.Body, verb, sch) if err != nil { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: invalid call body")) - callEvent.Response = either.RightOf[*ftlv1.CallResponse](err) - s.timeline.EnqueueEvent(ctx, callEvent) + callEvent.Response = result.Err[*ftlv1.CallResponse](err) + timeline.Publish(ctx, callEvent) return nil, err } @@ -1044,15 +1046,15 @@ func (s *Service) callWithRequest( var resp *connect.Response[ftlv1.CallResponse] if err == nil { resp = connect.NewResponse(response.Msg) - callEvent.Response = either.LeftOf[error](resp.Msg) + callEvent.Response = result.Ok(resp.Msg) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]()) } else { - callEvent.Response = either.RightOf[*ftlv1.CallResponse](err) + callEvent.Response = result.Err[*ftlv1.CallResponse](err) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) logger.Errorf(err, "Call failed to verb %s for module %s", verbRef.String(), module) } - s.timeline.EnqueueEvent(ctx, callEvent) + timeline.Publish(ctx, callEvent) return resp, err } @@ -1259,7 +1261,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration if e, ok := err.Get(); ok { errStr = optional.Some(e.Error()) } - s.timeline.EnqueueEvent(ctx, &timeline.AsyncExecute{ + timeline.Publish(ctx, &timeline.AsyncExecute{ DeploymentKey: deployment, RequestKey: call.ParentRequestKey, EventType: eventType, diff --git a/backend/controller/timeline/timeline.go b/backend/controller/timeline/timeline.go index 2bda7d5d6e..f437436e47 100644 --- a/backend/controller/timeline/timeline.go +++ b/backend/controller/timeline/timeline.go @@ -3,13 +3,9 @@ package timeline import ( "context" stdsql "database/sql" - "time" - - "github.com/alecthomas/atomic" "github.com/TBD54566975/ftl/backend/controller/encryption" "github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql" - "github.com/TBD54566975/ftl/internal/log" ) type EventType = sql.EventType @@ -41,39 +37,16 @@ type InEvent interface { } type Service struct { - ctx context.Context - conn *stdsql.DB - encryption *encryption.Service - events chan Event - lastDroppedError atomic.Value[time.Time] - lastFailedError atomic.Value[time.Time] + ctx context.Context + conn *stdsql.DB + encryption *encryption.Service } func New(ctx context.Context, conn *stdsql.DB, encryption *encryption.Service) *Service { - var s *Service - events := make(chan Event, 1000) - s = &Service{ + s := &Service{ ctx: ctx, conn: conn, encryption: encryption, - events: events, } return s } - -// EnqueueEvent asynchronously enqueues an event for insertion into the timeline. -func (s *Service) EnqueueEvent(ctx context.Context, inEvent InEvent) { - event, err := inEvent.toEvent() - if err != nil { - log.FromContext(ctx).Warnf("Failed to convert event to event: %v", err) - return - } - select { - case s.events <- event: - default: - if time.Since(s.lastDroppedError.Load()) > 10*time.Second { - log.FromContext(ctx).Warnf("Dropping event %T due to full queue", event) - s.lastDroppedError.Store(time.Now()) - } - } -}