From af54cb4c6ea2ac1fe9d516224aa68492b3cecb96 Mon Sep 17 00:00:00 2001 From: Wes Date: Wed, 11 Dec 2024 14:06:48 -0700 Subject: [PATCH] fix: inject timeline client instead of using ctx --- backend/controller/console/console.go | 6 ++-- backend/controller/controller.go | 30 +++++++++++-------- backend/controller/deployment_logs.go | 8 ++--- .../pubsub/internal/dal/async_calls_test.go | 11 +++++-- backend/controller/pubsub/internal/dal/dal.go | 10 ++++--- backend/controller/pubsub/service.go | 12 ++++---- backend/cron/service.go | 8 ++--- backend/cron/service_test.go | 5 ++-- backend/ingress/handler.go | 24 +++++++-------- backend/ingress/handler_test.go | 11 +++++-- backend/ingress/service.go | 15 ++++++---- backend/runner/proxy/proxy.go | 9 +++--- backend/runner/runner.go | 5 ++-- backend/timeline/client.go | 14 --------- cmd/ftl-console/main.go | 2 +- cmd/ftl-controller/main.go | 5 ++-- cmd/ftl-cron/main.go | 6 ++-- cmd/ftl-http-ingress/main.go | 7 ++--- frontend/cli/cmd_replay.go | 2 +- frontend/cli/cmd_serve.go | 9 +++--- frontend/cli/main.go | 1 - internal/routing/verb_routing.go | 21 ++++++------- 22 files changed, 113 insertions(+), 108 deletions(-) diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index 5300d8bed2..62629b086d 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -564,8 +564,7 @@ func buildGraph(sch *schema.Schema, module *schema.Module, out map[string][]stri } func (s *service) GetTimeline(ctx context.Context, req *connect.Request[timelinepb.GetTimelineRequest]) (*connect.Response[timelinepb.GetTimelineResponse], error) { - client := timeline.ClientFromContext(ctx) - resp, err := client.GetTimeline(ctx, connect.NewRequest(req.Msg)) + resp, err := s.timelineClient.GetTimeline(ctx, connect.NewRequest(req.Msg)) if err != nil { return nil, fmt.Errorf("failed to get timeline from service: %w", err) } @@ -573,8 +572,7 @@ func (s *service) GetTimeline(ctx context.Context, req *connect.Request[timeline } func (s *service) StreamTimeline(ctx context.Context, req *connect.Request[timelinepb.StreamTimelineRequest], out *connect.ServerStream[timelinepb.StreamTimelineResponse]) error { - client := timeline.ClientFromContext(ctx) - stream, err := client.StreamTimeline(ctx, req) + stream, err := s.timelineClient.StreamTimeline(ctx, connect.NewRequest(req.Msg)) if err != nil { return fmt.Errorf("failed to stream timeline from service: %w", err) } diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 6d6fa1f7c1..f206c80809 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -107,6 +107,7 @@ func Start( storage *artefacts.OCIArtefactService, cm *cf.Manager[configuration.Configuration], sm *cf.Manager[configuration.Secrets], + timelineClient *timeline.Client, conn *sql.DB, devel bool, ) error { @@ -115,7 +116,7 @@ func Start( logger := log.FromContext(ctx) logger.Debugf("Starting FTL controller") - svc, err := New(ctx, conn, cm, sm, storage, config, devel) + svc, err := New(ctx, conn, cm, sm, timelineClient, storage, config, devel) if err != nil { return err } @@ -156,9 +157,10 @@ type Service struct { cm *cf.Manager[configuration.Configuration] sm *cf.Manager[configuration.Secrets] - tasks *scheduledtask.Scheduler - pubSub *pubsub.Service - storage *artefacts.OCIArtefactService + tasks *scheduledtask.Scheduler + pubSub *pubsub.Service + timelineClient *timeline.Client + storage *artefacts.OCIArtefactService // Map from runnerKey.String() to client. clients *ttlcache.Cache[string, clients] @@ -175,6 +177,7 @@ func New( conn *sql.DB, cm *cf.Manager[configuration.Configuration], sm *cf.Manager[configuration.Secrets], + timelineClient *timeline.Client, storage *artefacts.OCIArtefactService, config Config, devel bool, @@ -200,6 +203,7 @@ func New( cm: cm, sm: sm, tasks: scheduler, + timelineClient: timelineClient, leaser: ldb, key: key, clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)), @@ -209,10 +213,10 @@ func New( controllerState: state.NewInMemoryState(), } - pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState) + pubSub := pubsub.New(ctx, conn, routingTable, svc.controllerState, timelineClient) svc.pubSub = pubSub - svc.deploymentLogsSink = newDeploymentLogsSink(ctx) + svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineClient) // Use min, max backoff if we are running in production, otherwise use // (1s, 1s) (or develBackoff). Will also wrap the job such that it its next @@ -359,7 +363,7 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie requestKey = optional.Some(rkey) } - timeline.ClientFromContext(ctx).Publish(ctx, timeline.Log{ + s.timelineClient.Publish(ctx, timeline.Log{ DeploymentKey: deploymentKey, RequestKey: requestKey, Time: msg.TimeStamp.AsTime(), @@ -459,7 +463,7 @@ func (s *Service) setDeploymentReplicas(ctx context.Context, key model.Deploymen return fmt.Errorf("could not activate deployment: %w", err) } } - timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentUpdated{ + s.timelineClient.Publish(ctx, timeline.DeploymentUpdated{ DeploymentKey: key, MinReplicas: minReplicas, PrevMinReplicas: deployment.MinReplicas, @@ -519,7 +523,7 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re } } - timeline.ClientFromContext(ctx).Publish(ctx, timeline.DeploymentCreated{ + s.timelineClient.Publish(ctx, timeline.DeploymentCreated{ DeploymentKey: newDeploymentKey, Language: newDeployment.Language, ModuleName: newDeployment.Module, @@ -901,7 +905,7 @@ func (s *Service) callWithRequest( err = fmt.Errorf("no routes for module %q", module) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module")) callEvent.Response = result.Err[*ftlv1.CallResponse](err) - timeline.ClientFromContext(ctx).Publish(ctx, callEvent) + s.timelineClient.Publish(ctx, callEvent) return nil, connect.NewError(connect.CodeNotFound, err) } @@ -909,7 +913,7 @@ func (s *Service) callWithRequest( observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported")) err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef)) callEvent.Response = result.Err[*ftlv1.CallResponse](err) - timeline.ClientFromContext(ctx).Publish(ctx, callEvent) + s.timelineClient.Publish(ctx, callEvent) return nil, connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef)) } @@ -917,7 +921,7 @@ func (s *Service) callWithRequest( if err != nil { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: invalid call body")) callEvent.Response = result.Err[*ftlv1.CallResponse](err) - timeline.ClientFromContext(ctx).Publish(ctx, callEvent) + s.timelineClient.Publish(ctx, callEvent) return nil, err } @@ -942,7 +946,7 @@ func (s *Service) callWithRequest( logger.Errorf(err, "Call failed to verb %s for module %s", verbRef.String(), module) } - timeline.ClientFromContext(ctx).Publish(ctx, callEvent) + s.timelineClient.Publish(ctx, callEvent) return resp, err } diff --git a/backend/controller/deployment_logs.go b/backend/controller/deployment_logs.go index 4f1a69705e..0052443330 100644 --- a/backend/controller/deployment_logs.go +++ b/backend/controller/deployment_logs.go @@ -14,13 +14,13 @@ import ( var _ log.Sink = (*deploymentLogsSink)(nil) -func newDeploymentLogsSink(ctx context.Context) *deploymentLogsSink { +func newDeploymentLogsSink(ctx context.Context, timelineClient *timeline.Client) *deploymentLogsSink { sink := &deploymentLogsSink{ logQueue: make(chan log.Entry, 10000), } // Process logs in background - go sink.processLogs(ctx) + go sink.processLogs(ctx, timelineClient) return sink } @@ -40,7 +40,7 @@ func (d *deploymentLogsSink) Log(entry log.Entry) error { return nil } -func (d *deploymentLogsSink) processLogs(ctx context.Context) { +func (d *deploymentLogsSink) processLogs(ctx context.Context, timelineClient *timeline.Client) { for { select { case entry := <-d.logQueue: @@ -69,7 +69,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) { errorStr = optional.Some(entry.Error.Error()) } - timeline.ClientFromContext(ctx).Publish(ctx, &timeline.Log{ + timelineClient.Publish(ctx, &timeline.Log{ RequestKey: request, DeploymentKey: deployment, Time: entry.Time, diff --git a/backend/controller/pubsub/internal/dal/async_calls_test.go b/backend/controller/pubsub/internal/dal/async_calls_test.go index 56fdbead14..6206a46e36 100644 --- a/backend/controller/pubsub/internal/dal/async_calls_test.go +++ b/backend/controller/pubsub/internal/dal/async_calls_test.go @@ -2,6 +2,7 @@ package dal import ( "context" + "net/url" "testing" "github.com/alecthomas/assert/v2" @@ -9,6 +10,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/async" "github.com/TBD54566975/ftl/backend/controller/sql/sqltest" "github.com/TBD54566975/ftl/backend/libdal" + "github.com/TBD54566975/ftl/backend/timeline" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" @@ -18,9 +20,14 @@ func TestNoCallToAcquire(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) conn := sqltest.OpenForTesting(ctx, t) - dal := New(conn) + timelineEndpoint, err := url.Parse("http://localhost:8080") + assert.NoError(t, err) - _, _, err := dal.AcquireAsyncCall(ctx) + timelineClient := timeline.NewClient(ctx, timelineEndpoint) + + dal := New(conn, timelineClient) + + _, _, err = dal.AcquireAsyncCall(ctx) assert.IsError(t, err, libdal.ErrNotFound) assert.EqualError(t, err, "no pending async calls: not found") } diff --git a/backend/controller/pubsub/internal/dal/dal.go b/backend/controller/pubsub/internal/dal/dal.go index 378eadc894..a48ad4b398 100644 --- a/backend/controller/pubsub/internal/dal/dal.go +++ b/backend/controller/pubsub/internal/dal/dal.go @@ -23,10 +23,11 @@ import ( type DAL struct { *libdal.Handle[DAL] - db dalsql.Querier + db dalsql.Querier + timelineClient *timeline.Client } -func New(conn libdal.Connection) *DAL { +func New(conn libdal.Connection, timelineClient *timeline.Client) *DAL { return &DAL{ Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { return &DAL{ @@ -34,7 +35,8 @@ func New(conn libdal.Connection) *DAL { db: dalsql.New(h.Connection), } }), - db: dalsql.New(conn), + db: dalsql.New(conn), + timelineClient: timelineClient, } } @@ -108,7 +110,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t for _, subscription := range subs { now := time.Now().UTC() enqueueTimelineEvent := func(destVerb optional.Option[schema.RefKey], err optional.Option[string]) { - timeline.ClientFromContext(ctx).Publish(ctx, &timeline.PubSubConsume{ + d.timelineClient.Publish(ctx, &timeline.PubSubConsume{ DeploymentKey: subscription.DeploymentKey, RequestKey: subscription.RequestKey, Time: now, diff --git a/backend/controller/pubsub/service.go b/backend/controller/pubsub/service.go index d838ed7242..c89587d4fd 100644 --- a/backend/controller/pubsub/service.go +++ b/backend/controller/pubsub/service.go @@ -49,15 +49,17 @@ type Service struct { verbRouting *routing.VerbCallRouter asyncCallsLock sync.Mutex controllerState state.ControllerState + timelineClient *timeline.Client } -func New(ctx context.Context, conn libdal.Connection, rt *routing.RouteTable, controllerState state.ControllerState) *Service { +func New(ctx context.Context, conn libdal.Connection, rt *routing.RouteTable, controllerState state.ControllerState, timelineClient *timeline.Client) *Service { m := &Service{ - dal: dal.New(conn), + dal: dal.New(conn, timelineClient), eventPublished: make(chan struct{}), routeTable: rt, - verbRouting: routing.NewVerbRouterFromTable(ctx, rt), + verbRouting: routing.NewVerbRouterFromTable(ctx, rt, timelineClient), controllerState: controllerState, + timelineClient: timelineClient, } go m.watchEventStream(ctx) go m.poll(ctx) @@ -196,7 +198,7 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlpubs routes := s.routeTable.Current() route, ok := routes.GetDeployment(module).Get() if ok { - timeline.ClientFromContext(ctx).Publish(ctx, timeline.PubSubPublish{ + s.timelineClient.Publish(ctx, timeline.PubSubPublish{ DeploymentKey: route, RequestKey: requestKey, Time: now, @@ -266,7 +268,7 @@ func (s *Service) ExecuteAsyncCalls(ctx context.Context) (interval time.Duration if e, ok := err.Get(); ok { errStr = optional.Some(e.Error()) } - timeline.ClientFromContext(ctx).Publish(ctx, timeline.AsyncExecute{ + s.timelineClient.Publish(ctx, timeline.AsyncExecute{ DeploymentKey: deployment, RequestKey: call.ParentRequestKey, EventType: eventType, diff --git a/backend/cron/service.go b/backend/cron/service.go index 9b4fc17598..65eac0c4e3 100644 --- a/backend/cron/service.go +++ b/backend/cron/service.go @@ -44,7 +44,7 @@ func (c cronJob) String() string { } // Start the cron service. Blocks until the context is cancelled. -func Start(ctx context.Context, eventSource schemaeventsource.EventSource, client routing.CallClient) error { +func Start(ctx context.Context, eventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timeline.Client) error { logger := log.FromContext(ctx).Scope("cron") ctx = log.ContextWithLogger(ctx, logger) // Map of cron jobs for each module. @@ -55,7 +55,7 @@ func Start(ctx context.Context, eventSource schemaeventsource.EventSource, clien logger.Debugf("Starting cron service") for { - next, ok := scheduleNext(ctx, cronQueue) + next, ok := scheduleNext(ctx, cronQueue, timelineClient) var nextCh <-chan time.Time if ok { logger.Debugf("Next cron job scheduled in %s", next) @@ -128,11 +128,11 @@ func callCronJob(ctx context.Context, verbClient routing.CallClient, cronJob cro } } -func scheduleNext(ctx context.Context, cronQueue []cronJob) (time.Duration, bool) { +func scheduleNext(ctx context.Context, cronQueue []cronJob, timelineClient *timeline.Client) (time.Duration, bool) { if len(cronQueue) == 0 { return 0, false } - timeline.ClientFromContext(ctx).Publish(ctx, timeline.CronScheduled{ + timelineClient.Publish(ctx, timeline.CronScheduled{ DeploymentKey: model.NewDeploymentKey(cronQueue[0].module), Verb: schema.Ref{Module: cronQueue[0].module, Name: cronQueue[0].verb.Name}, ScheduledAt: cronQueue[0].next, diff --git a/backend/cron/service_test.go b/backend/cron/service_test.go index 77363cc89a..0c6b22a1c6 100644 --- a/backend/cron/service_test.go +++ b/backend/cron/service_test.go @@ -66,7 +66,8 @@ func TestCron(t *testing.T) { ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace})) timelineEndpoint, err := url.Parse("http://localhost:8080") assert.NoError(t, err) - ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint)) + + timelineClient := timeline.NewClient(ctx, timelineEndpoint) ctx, cancel := context.WithTimeout(ctx, time.Second*5) t.Cleanup(cancel) @@ -77,7 +78,7 @@ func TestCron(t *testing.T) { requests: requestsch, } - wg.Go(func() error { return Start(ctx, eventSource, client) }) + wg.Go(func() error { return Start(ctx, eventSource, client, timelineClient) }) requests := make([]*ftlv1.CallRequest, 0, 2) diff --git a/backend/ingress/handler.go b/backend/ingress/handler.go index b34ec9cd2b..19c07a17a5 100644 --- a/backend/ingress/handler.go +++ b/backend/ingress/handler.go @@ -22,7 +22,7 @@ import ( ) // handleHTTP HTTP ingress routes. -func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.RequestKey, routesForMethod []ingressRoute, w http.ResponseWriter, r *http.Request, client routing.CallClient) { +func (s *service) handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.RequestKey, routesForMethod []ingressRoute, w http.ResponseWriter, r *http.Request, client routing.CallClient) { logger := log.FromContext(r.Context()).Scope(fmt.Sprintf("ingress:%s:%s", r.Method, r.URL.Path)) logger.Debugf("Start ingress request") @@ -58,7 +58,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques logger.Debugf("bad request: %s", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request")) - recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusBadRequest, err.Error()) + s.recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusBadRequest, err.Error()) return } ingressEvent.RequestBody = body @@ -76,11 +76,11 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques httpCode := connectCodeToHTTP(connectErr.Code()) http.Error(w, http.StatusText(httpCode), httpCode) metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: connect error")) - recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, connectErr.Error()) + s.recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, connectErr.Error()) } else { http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: internal server error")) - recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) + s.recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) } return } @@ -92,7 +92,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques logger.Errorf(err, "could not resolve schema type for verb %s", route.verb) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not resolve schema type for verb")) - recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) + s.recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) return } var responseBody []byte @@ -103,7 +103,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques logger.Errorf(err, "could not unmarhal response for verb %s", verb) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not unmarhal response for verb")) - recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) + s.recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) return } rawBody = response.Body @@ -113,7 +113,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques logger.Errorf(err, "could not create response for verb %s", verb) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not create response for verb")) - recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) + s.recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) return } @@ -143,21 +143,21 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques _, err = w.Write(responseBody) if err == nil { metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]()) - timeline.ClientFromContext(r.Context()).Publish(r.Context(), ingressEvent) + s.timelineClient.Publish(r.Context(), ingressEvent) } else { logger.Errorf(err, "could not write response body") metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body")) - recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) + s.recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error()) } case *ftlv1.CallResponse_Error_: http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) metrics.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("call response: internal server error")) - recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, msg.Error.Message) + s.recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, msg.Error.Message) } } -func recordIngressErrorEvent( +func (s *service) recordIngressErrorEvent( ctx context.Context, ingressEvent timeline.Ingress, statusCode int, @@ -165,7 +165,7 @@ func recordIngressErrorEvent( ) { ingressEvent.ResponseStatus = statusCode ingressEvent.Error = optional.Some(errorMsg) - timeline.ClientFromContext(ctx).Publish(ctx, ingressEvent) + s.timelineClient.Publish(ctx, ingressEvent) } // Copied from the Apache-licensed connect-go source. diff --git a/backend/ingress/handler_test.go b/backend/ingress/handler_test.go index a5e276b81c..5c05b0ced7 100644 --- a/backend/ingress/handler_test.go +++ b/backend/ingress/handler_test.go @@ -19,6 +19,7 @@ import ( "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) func TestIngress(t *testing.T) { @@ -69,8 +70,6 @@ func TestIngress(t *testing.T) { ctx := log.ContextWithNewDefaultLogger(context.Background()) timelineEndpoint, err := url.Parse("http://localhost:8080") assert.NoError(t, err) - ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, timelineEndpoint)) - assert.NoError(t, err) for _, test := range []struct { name string @@ -105,7 +104,13 @@ func TestIngress(t *testing.T) { reqKey := model.NewRequestKey(model.OriginIngress, "test") assert.NoError(t, err) fv := &fakeVerbClient{response: response, t: t} - handleHTTP(time.Now(), sch, reqKey, routes, rec, req, fv) + + svc := &service{ + view: syncView(ctx, schemaeventsource.NewUnattached()), + client: fv, + timelineClient: timeline.NewClient(ctx, timelineEndpoint), + } + svc.handleHTTP(time.Now(), sch, reqKey, routes, rec, req, fv) result := rec.Result() defer result.Body.Close() assert.Equal(t, test.statusCode, rec.Code, "%s: %s", result.Status, rec.Body.Bytes()) diff --git a/backend/ingress/service.go b/backend/ingress/service.go index 08ba38713f..119c8f56e7 100644 --- a/backend/ingress/service.go +++ b/backend/ingress/service.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" + "github.com/TBD54566975/ftl/backend/timeline" "github.com/TBD54566975/ftl/internal/cors" ftlhttp "github.com/TBD54566975/ftl/internal/http" "github.com/TBD54566975/ftl/internal/log" @@ -37,17 +38,19 @@ func (c *Config) Validate() error { type service struct { // Complete schema synchronised from the database. - view *atomic.Value[materialisedView] - client routing.CallClient + view *atomic.Value[materialisedView] + client routing.CallClient + timelineClient *timeline.Client } // Start the HTTP ingress service. Blocks until the context is cancelled. -func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource, client routing.CallClient) error { +func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timeline.Client) error { logger := log.FromContext(ctx).Scope("http-ingress") ctx = log.ContextWithLogger(ctx, logger) svc := &service{ - view: syncView(ctx, schemaEventSource), - client: client, + view: syncView(ctx, schemaEventSource), + client: client, + timelineClient: timelineClient, } ingressHandler := otelhttp.NewHandler(http.Handler(svc), "ftl.ingress") @@ -84,5 +87,5 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { metrics.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal")) return } - handleHTTP(start, state.schema, requestKey, routes, w, r, s.client) + s.handleHTTP(start, state.schema, requestKey, routes, w, r, s.client) } diff --git a/backend/runner/proxy/proxy.go b/backend/runner/proxy/proxy.go index 71a467d266..9b45e42a15 100644 --- a/backend/runner/proxy/proxy.go +++ b/backend/runner/proxy/proxy.go @@ -39,14 +39,16 @@ type Service struct { controllerLeaseService ftlleaseconnect.LeaseServiceClient controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient moduleVerbService map[string]moduleVerbService + timelineClient *timeline.Client } -func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, leaseClient ftlleaseconnect.LeaseServiceClient, controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient) *Service { +func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, leaseClient ftlleaseconnect.LeaseServiceClient, controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient, timelineClient *timeline.Client) *Service { proxy := &Service{ controllerDeploymentService: controllerModuleService, controllerLeaseService: leaseClient, controllerPubsubService: controllerPubsubService, moduleVerbService: map[string]moduleVerbService{}, + timelineClient: timelineClient, } return proxy } @@ -138,7 +140,6 @@ func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module")) return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found")) } - timelineClient := timeline.ClientFromContext(ctx) callers, err := headers.GetCallers(req.Header()) if err != nil { @@ -167,13 +168,13 @@ func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque originalResp, err := verbService.client.Call(ctx, headers.CopyRequestForForwarding(req)) if err != nil { callEvent.Response = result.Err[*ftlv1.CallResponse](err) - timelineClient.Publish(ctx, callEvent) + r.timelineClient.Publish(ctx, callEvent) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) return nil, fmt.Errorf("failed to proxy verb: %w", err) } resp := connect.NewResponse(originalResp.Msg) callEvent.Response = result.Ok(resp.Msg) - timelineClient.Publish(ctx, callEvent) + r.timelineClient.Publish(ctx, callEvent) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]()) return resp, nil } diff --git a/backend/runner/runner.go b/backend/runner/runner.go index afce59fab2..e641b560b3 100644 --- a/backend/runner/runner.go +++ b/backend/runner/runner.go @@ -87,8 +87,6 @@ func Start(ctx context.Context, config Config, storage *artefacts.OCIArtefactSer logger := log.FromContext(ctx).Attrs(map[string]string{"runner": config.Key.String()}) logger.Debugf("Starting FTL Runner") - ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, config.TimelineEndpoint)) - err = manageDeploymentDirectory(logger, config) if err != nil { observability.Runner.StartupFailed(ctx) @@ -365,7 +363,8 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s leaseServiceClient := rpc.Dial(ftlleaseconnect.NewLeaseServiceClient, s.config.LeaseEndpoint.String(), log.Error) - s.proxy = proxy.New(deploymentServiceClient, leaseServiceClient, pubsubClient) + timelineClient := timeline.NewClient(ctx, s.config.TimelineEndpoint) + s.proxy = proxy.New(deploymentServiceClient, leaseServiceClient, pubsubClient, timelineClient) parse, err := url.Parse("http://127.0.0.1:0") if err != nil { diff --git a/backend/timeline/client.go b/backend/timeline/client.go index 5603925587..4fd4a57b5d 100644 --- a/backend/timeline/client.go +++ b/backend/timeline/client.go @@ -20,8 +20,6 @@ const ( maxBatchDelay = 100 * time.Millisecond ) -type timelineContextKey struct{} - type Client struct { timelinev1connect.TimelineServiceClient @@ -40,18 +38,6 @@ func NewClient(ctx context.Context, endpoint *url.URL) *Client { return client } -func ContextWithClient(ctx context.Context, client *Client) context.Context { - return context.WithValue(ctx, timelineContextKey{}, client) -} - -func ClientFromContext(ctx context.Context) *Client { - c, ok := ctx.Value(timelineContextKey{}).(*Client) - if !ok { - panic("Timeline client not found in context") - } - return c -} - //go:sumtype type Event interface { ToEntry() (*timelinepb.CreateEventsRequest_EventEntry, error) diff --git a/cmd/ftl-console/main.go b/cmd/ftl-console/main.go index 74ec20b81f..a3a9da456f 100644 --- a/cmd/ftl-console/main.go +++ b/cmd/ftl-console/main.go @@ -48,7 +48,7 @@ func main() { adminClient := rpc.Dial(ftlv1connect.NewAdminServiceClient, cli.AdminEndpoint.String(), log.Error) eventSource := schemaeventsource.New(ctx, schemaClient) - routeManager := routing.NewVerbRouter(ctx, schemaeventsource.New(ctx, schemaClient)) + routeManager := routing.NewVerbRouter(ctx, schemaeventsource.New(ctx, schemaClient), timelineClient) err = console.Start(ctx, cli.ConsoleConfig, eventSource, controllerClient, timelineClient, adminClient, routeManager) kctx.FatalIfErrorf(err, "failed to start console service") diff --git a/cmd/ftl-controller/main.go b/cmd/ftl-controller/main.go index dcd0b28692..dbf7e7d133 100644 --- a/cmd/ftl-controller/main.go +++ b/cmd/ftl-controller/main.go @@ -69,8 +69,6 @@ func main() { cm, err := manager.New(ctx, configResolver, providers.NewDatabaseConfig(configDal)) kctx.FatalIfErrorf(err) - ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, cli.TimelineEndpoint)) - leaseClient := rpc.Dial(ftlv1connect.NewLeaseServiceClient, cli.LeaseEndpoint.String(), log.Error) ctx = rpc.ContextWithClient(ctx, leaseClient) schemaClient := rpc.Dial(ftlv1connect2.NewSchemaServiceClient, cli.ControllerConfig.Bind.String(), log.Error) @@ -83,6 +81,7 @@ func main() { sm, err := manager.New[cf.Secrets](ctx, dbSecretResolver, asmSecretProvider) kctx.FatalIfErrorf(err) - err = controller.Start(ctx, cli.ControllerConfig, storage, cm, sm, conn, false) + timelineClient := timeline.NewClient(ctx, cli.TimelineEndpoint) + err = controller.Start(ctx, cli.ControllerConfig, storage, cm, sm, timelineClient, conn, false) kctx.FatalIfErrorf(err) } diff --git a/cmd/ftl-cron/main.go b/cmd/ftl-cron/main.go index dfa4331f17..c1ee87a7f3 100644 --- a/cmd/ftl-cron/main.go +++ b/cmd/ftl-cron/main.go @@ -39,9 +39,9 @@ func main() { schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.CronConfig.SchemaServiceEndpoint.String(), log.Error) eventSource := schemaeventsource.New(ctx, schemaClient) - ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, cli.CronConfig.TimelineEndpoint)) + timelineClient := timeline.NewClient(ctx, cli.CronConfig.TimelineEndpoint) + routeManager := routing.NewVerbRouter(ctx, schemaeventsource.New(ctx, schemaClient), timelineClient) - routeManager := routing.NewVerbRouter(ctx, schemaeventsource.New(ctx, schemaClient)) - err = cron.Start(ctx, eventSource, routeManager) + err = cron.Start(ctx, eventSource, routeManager, timelineClient) kctx.FatalIfErrorf(err, "failed to start cron") } diff --git a/cmd/ftl-http-ingress/main.go b/cmd/ftl-http-ingress/main.go index 4db1a5cd7d..15b0799068 100644 --- a/cmd/ftl-http-ingress/main.go +++ b/cmd/ftl-http-ingress/main.go @@ -39,11 +39,10 @@ func main() { err := observability.Init(ctx, false, "", "ftl-http-ingress", ftl.Version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") - ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, cli.TimelineEndpoint)) - schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.SchemaServerEndpoint.String(), log.Error) eventSource := schemaeventsource.New(ctx, schemaClient) - routeManager := routing.NewVerbRouter(ctx, schemaeventsource.New(ctx, schemaClient)) - err = ingress.Start(ctx, cli.HTTPIngressConfig, eventSource, routeManager) + timelineClient := timeline.NewClient(ctx, cli.TimelineEndpoint) + routeManager := routing.NewVerbRouter(ctx, schemaeventsource.New(ctx, schemaClient), timelineClient) + err = ingress.Start(ctx, cli.HTTPIngressConfig, eventSource, routeManager, timelineClient) kctx.FatalIfErrorf(err, "failed to start HTTP ingress") } diff --git a/frontend/cli/cmd_replay.go b/frontend/cli/cmd_replay.go index 9188fa5794..aeca6adf7f 100644 --- a/frontend/cli/cmd_replay.go +++ b/frontend/cli/cmd_replay.go @@ -29,6 +29,7 @@ func (c *replayCmd) Run( ctx context.Context, verbClient ftlv1connect.VerbServiceClient, schemaClient ftlv1connect.SchemaServiceClient, + timelineClient *timeline.Client, ) error { // Wait timeout is for both pings to complete, not each ping individually startTime := time.Now() @@ -37,7 +38,6 @@ func (c *replayCmd) Run( return fmt.Errorf("failed to wait for client: %w", err) } - timelineClient := timeline.ClientFromContext(ctx) if err := rpc.Wait(ctx, backoff.Backoff{Max: time.Second * 2}, c.Wait-time.Since(startTime), timelineClient); err != nil { return fmt.Errorf("failed to wait for console service client: %w", err) } diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index 47dded74d2..9a1d65396a 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -264,7 +264,7 @@ func (s *serveCommonConfig) run( } wg.Go(func() error { - if err := controller.Start(controllerCtx, config, storage, cm, sm, conn, true); err != nil { + if err := controller.Start(controllerCtx, config, storage, cm, sm, timelineClient, conn, true); err != nil { logger.Errorf(err, "controller%d failed: %v", i, err) return fmt.Errorf("controller%d failed: %w", i, err) } @@ -337,7 +337,7 @@ func (s *serveCommonConfig) run( if !s.NoConsole { // Start Console wg.Go(func() error { - err := console.Start(ctx, s.Console, schemaEventSourceFactory(), controllerClient, timelineClient, adminClient, routing.NewVerbRouter(ctx, schemaEventSourceFactory())) + err := console.Start(ctx, s.Console, schemaEventSourceFactory(), controllerClient, timelineClient, adminClient, routing.NewVerbRouter(ctx, schemaEventSourceFactory(), timelineClient)) if err != nil { return fmt.Errorf("console failed: %w", err) } @@ -354,7 +354,7 @@ func (s *serveCommonConfig) run( }) // Start Cron wg.Go(func() error { - err := cron.Start(ctx, schemaEventSourceFactory(), routing.NewVerbRouter(ctx, schemaEventSourceFactory())) + err := cron.Start(ctx, schemaEventSourceFactory(), routing.NewVerbRouter(ctx, schemaEventSourceFactory(), timelineClient), timelineClient) if err != nil { return fmt.Errorf("cron failed: %w", err) } @@ -362,8 +362,7 @@ func (s *serveCommonConfig) run( }) // Start Ingress wg.Go(func() error { - routeManager := routing.NewVerbRouter(ctx, schemaEventSourceFactory()) - err := ingress.Start(ctx, s.Ingress, schemaEventSourceFactory(), routeManager) + err := ingress.Start(ctx, s.Ingress, schemaEventSourceFactory(), routing.NewVerbRouter(ctx, schemaEventSourceFactory(), timelineClient), timelineClient) if err != nil { return fmt.Errorf("ingress failed: %w", err) } diff --git a/frontend/cli/main.go b/frontend/cli/main.go index 1cf976a337..59bec013a5 100644 --- a/frontend/cli/main.go +++ b/frontend/cli/main.go @@ -241,7 +241,6 @@ func makeBindContext(logger *log.Logger, cancel context.CancelFunc) terminal.Kon kctx.BindTo(pubsubClient, (*ftlv1pubsubconnect.LegacyPubsubServiceClient)(nil)) timelineClient := timeline.NewClient(ctx, cli.TimelineEndpoint) - ctx = timeline.ContextWithClient(ctx, timelineClient) kctx.Bind(timelineClient) leaseClient := rpc.Dial(leasev1connext.NewLeaseServiceClient, cli.LeaseEndpoint.String(), log.Error) diff --git a/internal/routing/verb_routing.go b/internal/routing/verb_routing.go index 093ff72f30..1a6893db2f 100644 --- a/internal/routing/verb_routing.go +++ b/internal/routing/verb_routing.go @@ -30,13 +30,13 @@ type CallClient interface { // VerbCallRouter managed clients for the routing service, so calls to a given module can be routed to the correct instance. type VerbCallRouter struct { - routingTable *RouteTable - moduleClients *xsync.MapOf[string, optional.Option[ftlv1connect.VerbServiceClient]] + routingTable *RouteTable + moduleClients *xsync.MapOf[string, optional.Option[ftlv1connect.VerbServiceClient]] + timelineClient *timeline.Client } func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { start := time.Now() - timelineClient := timeline.ClientFromContext(ctx) client, deployment, ok := s.LookupClient(req.Msg.Verb.Module) if !ok { @@ -71,21 +71,22 @@ func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.Ca originalResp, err := client.Call(ctx, req) if err != nil { callEvent.Response = result.Err[*ftlv1.CallResponse](err) - timelineClient.Publish(ctx, callEvent) + s.timelineClient.Publish(ctx, callEvent) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) return nil, fmt.Errorf("failed to call %s: %w", callEvent.DestVerb, err) } resp := connect.NewResponse(originalResp.Msg) callEvent.Response = result.Ok(resp.Msg) - timelineClient.Publish(ctx, callEvent) + s.timelineClient.Publish(ctx, callEvent) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]()) return resp, nil } -func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable) *VerbCallRouter { +func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable, timelineClient *timeline.Client) *VerbCallRouter { svc := &VerbCallRouter{ - routingTable: routeTable, - moduleClients: xsync.NewMapOf[string, optional.Option[ftlv1connect.VerbServiceClient]](), + routingTable: routeTable, + moduleClients: xsync.NewMapOf[string, optional.Option[ftlv1connect.VerbServiceClient]](), + timelineClient: timelineClient, } routeUpdates := svc.routingTable.Subscribe() go func() { @@ -100,8 +101,8 @@ func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable) *VerbCa }() return svc } -func NewVerbRouter(ctx context.Context, changes schemaeventsource.EventSource) *VerbCallRouter { - return NewVerbRouterFromTable(ctx, New(ctx, changes)) +func NewVerbRouter(ctx context.Context, changes schemaeventsource.EventSource, timelineClient *timeline.Client) *VerbCallRouter { + return NewVerbRouterFromTable(ctx, New(ctx, changes), timelineClient) } func (s *VerbCallRouter) LookupClient(module string) (client ftlv1connect.VerbServiceClient, deployment model.DeploymentKey, ok bool) {