From f51c0a928ec4c71476c396253d8a53459d7eb2a8 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Tue, 10 Dec 2024 08:55:26 +1100 Subject: [PATCH] verb router does telemetry and timeline updates --- backend/runner/proxy/proxy.go | 3 +- internal/routing/verb_routing.go | 69 ++++++++++++++++++++++++++++---- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/backend/runner/proxy/proxy.go b/backend/runner/proxy/proxy.go index 53c1e6d812..1e8047a598 100644 --- a/backend/runner/proxy/proxy.go +++ b/backend/runner/proxy/proxy.go @@ -131,7 +131,8 @@ func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque start := time.Now() verbService, ok := r.moduleVerbService[req.Msg.Verb.Module] if !ok { - return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", req.Msg.Verb.Module)) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module")) + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found")) } timelineClient := timeline.ClientFromContext(ctx) diff --git a/internal/routing/verb_routing.go b/internal/routing/verb_routing.go index 5c2e49ee60..f4a270c35d 100644 --- a/internal/routing/verb_routing.go +++ b/internal/routing/verb_routing.go @@ -3,15 +3,22 @@ package routing import ( "context" "fmt" + "time" "connectrpc.com/connect" "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/result" "github.com/puzpuzpuz/xsync/v3" + "github.com/TBD54566975/ftl/backend/controller/observability" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/backend/timeline" "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/rpc/headers" + "github.com/TBD54566975/ftl/internal/schema" "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) @@ -28,15 +35,51 @@ type VerbCallRouter struct { } func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { - client, ok := s.LookupClient(req.Msg.Verb.Module).Get() + start := time.Now() + timelineClient := timeline.ClientFromContext(ctx) + + client, deployment, ok := s.LookupClient(req.Msg.Verb.Module) if !ok { - return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found")) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module")) + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found")) + } + + callers, err := headers.GetCallers(req.Header()) + if err != nil { + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers")) + return nil, fmt.Errorf("could not get callers from headers: %w", err) } - call, err := client.Call(ctx, req) + + requestKey, ok, err := headers.GetRequestKey(req.Header()) if err != nil { - return nil, fmt.Errorf("failed to call module %s: %w", req.Msg.Verb.Module, err) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get request key")) + return nil, fmt.Errorf("could not process headers for request key: %w", err) + } else if !ok { + requestKey = model.NewRequestKey(model.OriginIngress, "grpc") + headers.SetRequestKey(req.Header(), requestKey) + } + + callEvent := &timeline.Call{ + DeploymentKey: deployment, + RequestKey: requestKey, + StartTime: start, + DestVerb: schema.RefFromProto(req.Msg.Verb), + Callers: callers, + Request: req.Msg, } - return call, nil + + originalResp, err := client.Call(ctx, req) + if err != nil { + callEvent.Response = result.Err[*ftlv1.CallResponse](err) + timelineClient.Publish(ctx, callEvent) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) + return nil, fmt.Errorf("failed to call %s: %w", callEvent.DestVerb, err) + } + resp := connect.NewResponse(originalResp.Msg) + callEvent.Response = result.Ok(resp.Msg) + timelineClient.Publish(ctx, callEvent) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]()) + return resp, nil } func NewVerbRouter(ctx context.Context, changes schemaeventsource.EventSource) *VerbCallRouter { @@ -58,13 +101,23 @@ func NewVerbRouter(ctx context.Context, changes schemaeventsource.EventSource) * return svc } -func (s *VerbCallRouter) LookupClient(module string) optional.Option[ftlv1connect.VerbServiceClient] { +func (s *VerbCallRouter) LookupClient(module string) (client ftlv1connect.VerbServiceClient, deployment model.DeploymentKey, ok bool) { res, _ := s.moduleClients.LoadOrCompute(module, func() optional.Option[ftlv1connect.VerbServiceClient] { - route, ok := s.routingTable.Current().GetForModule(module).Get() + current := s.routingTable.Current() + var ok bool + deployment, ok = current.GetDeployment(module).Get() + if !ok { + return optional.None[ftlv1connect.VerbServiceClient]() + } + route, ok := current.Get(deployment).Get() if !ok { return optional.None[ftlv1connect.VerbServiceClient]() } return optional.Some[ftlv1connect.VerbServiceClient](rpc.Dial(ftlv1connect.NewVerbServiceClient, route.String(), log.Error)) }) - return res + client, ok = res.Get() + if !ok { + return nil, deployment, false + } + return client, deployment, true }