Skip to content

Commit

Permalink
verb router does telemetry and timeline updates
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 10, 2024
1 parent b8367cd commit f51c0a9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 9 deletions.
3 changes: 2 additions & 1 deletion backend/runner/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque
start := time.Now()
verbService, ok := r.moduleVerbService[req.Msg.Verb.Module]
if !ok {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", req.Msg.Verb.Module))
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found"))
}
timelineClient := timeline.ClientFromContext(ctx)

Expand Down
69 changes: 61 additions & 8 deletions internal/routing/verb_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package routing
import (
"context"
"fmt"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/backend/controller/observability"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

Expand All @@ -28,15 +35,51 @@ type VerbCallRouter struct {
}

func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
client, ok := s.LookupClient(req.Msg.Verb.Module).Get()
start := time.Now()
timelineClient := timeline.ClientFromContext(ctx)

client, deployment, ok := s.LookupClient(req.Msg.Verb.Module)
if !ok {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found"))
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found"))
}

callers, err := headers.GetCallers(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers"))
return nil, fmt.Errorf("could not get callers from headers: %w", err)
}
call, err := client.Call(ctx, req)

requestKey, ok, err := headers.GetRequestKey(req.Header())
if err != nil {
return nil, fmt.Errorf("failed to call module %s: %w", req.Msg.Verb.Module, err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get request key"))
return nil, fmt.Errorf("could not process headers for request key: %w", err)
} else if !ok {
requestKey = model.NewRequestKey(model.OriginIngress, "grpc")
headers.SetRequestKey(req.Header(), requestKey)
}

callEvent := &timeline.Call{
DeploymentKey: deployment,
RequestKey: requestKey,
StartTime: start,
DestVerb: schema.RefFromProto(req.Msg.Verb),
Callers: callers,
Request: req.Msg,
}
return call, nil

originalResp, err := client.Call(ctx, req)
if err != nil {
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timelineClient.Publish(ctx, callEvent)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
return nil, fmt.Errorf("failed to call %s: %w", callEvent.DestVerb, err)
}
resp := connect.NewResponse(originalResp.Msg)
callEvent.Response = result.Ok(resp.Msg)
timelineClient.Publish(ctx, callEvent)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]())
return resp, nil
}

func NewVerbRouter(ctx context.Context, changes schemaeventsource.EventSource) *VerbCallRouter {
Expand All @@ -58,13 +101,23 @@ func NewVerbRouter(ctx context.Context, changes schemaeventsource.EventSource) *
return svc
}

func (s *VerbCallRouter) LookupClient(module string) optional.Option[ftlv1connect.VerbServiceClient] {
func (s *VerbCallRouter) LookupClient(module string) (client ftlv1connect.VerbServiceClient, deployment model.DeploymentKey, ok bool) {
res, _ := s.moduleClients.LoadOrCompute(module, func() optional.Option[ftlv1connect.VerbServiceClient] {
route, ok := s.routingTable.Current().GetForModule(module).Get()
current := s.routingTable.Current()
var ok bool
deployment, ok = current.GetDeployment(module).Get()
if !ok {
return optional.None[ftlv1connect.VerbServiceClient]()
}
route, ok := current.Get(deployment).Get()
if !ok {
return optional.None[ftlv1connect.VerbServiceClient]()
}
return optional.Some[ftlv1connect.VerbServiceClient](rpc.Dial(ftlv1connect.NewVerbServiceClient, route.String(), log.Error))
})
return res
client, ok = res.Get()
if !ok {
return nil, deployment, false
}
return client, deployment, true
}

0 comments on commit f51c0a9

Please sign in to comment.