Skip to content

Commit

Permalink
feat: publish to timeline and telemetry when calling via p2p routing
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Dec 10, 2024
1 parent 1eb50a0 commit 8259fad
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 94 deletions.
26 changes: 17 additions & 9 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,12 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
configs, err := s.cm.MapForModule(ctx, module)
routeTable := map[string]string{}
for _, module := range callableModuleNames {
if route, ok := routeView.GetForModule(module).Get(); ok {
routeTable[module] = route.String()
deployment, ok := routeView.GetDeployment(module).Get()
if !ok {
continue
}
if route, ok := routeView.Get(deployment).Get(); ok {
routeTable[deployment.String()] = route.String()
}
}
if deployment.Schema.Runtime != nil && deployment.Schema.Runtime.Deployment != nil {
Expand Down Expand Up @@ -897,13 +901,6 @@ func (s *Service) callWithRequest(
currentCaller = callers[len(callers)-1]
}

module := verbRef.Module
route, ok := routes.GetForModule(module).Get()
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
}

var requestKey model.RequestKey
var isNewRequestKey bool
if k, ok := key.Get(); ok {
Expand All @@ -927,11 +924,13 @@ func (s *Service) callWithRequest(
headers.SetRequestKey(req.Header(), requestKey)
}

module := verbRef.Module
deployment, ok := routes.GetDeployment(module).Get()
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment"))
return nil, fmt.Errorf("deployment not found for module %q", module)
}

callEvent := &timeline.Call{
DeploymentKey: deployment,
RequestKey: requestKey,
Expand All @@ -942,6 +941,15 @@ func (s *Service) callWithRequest(
Request: req.Msg,
}

route, ok := routes.Get(deployment).Get()
if !ok {
err = fmt.Errorf("no routes for module %q", module)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
return nil, connect.NewError(connect.CodeNotFound, err)
}

if currentCaller != nil && currentCaller.Module != module && !verb.IsExported() {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported"))
err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
Expand Down
99 changes: 50 additions & 49 deletions backend/protos/xyz/block/ftl/deployment/v1/deployment.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ message GetDeploymentContextResponse {
}

message Route {
string module = 1;
string deployment = 1;
string uri = 2;
}

Expand Down
79 changes: 65 additions & 14 deletions backend/runner/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,46 @@ package proxy
import (
"context"
"fmt"
"time"

"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/controller/observability"
ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
ftldeploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect"
ftllease "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1"
ftlleaseconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1/ftlv1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"
)

var _ ftlv1connect.VerbServiceHandler = &Service{}
var _ ftldeploymentconnect.DeploymentServiceHandler = &Service{}

type moduleVerbService struct {
client ftlv1connect.VerbServiceClient
deployment model.DeploymentKey
}

type Service struct {
controllerDeploymentService ftldeploymentconnect.DeploymentServiceClient
controllerLeaseService ftlleaseconnect.LeaseServiceClient
moduleVerbService map[string]ftlv1connect.VerbServiceClient
moduleVerbService map[string]moduleVerbService
}

func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, leaseClient ftlleaseconnect.LeaseServiceClient) *Service {
proxy := &Service{
controllerDeploymentService: controllerModuleService,
controllerLeaseService: leaseClient,
moduleVerbService: map[string]ftlv1connect.VerbServiceClient{},
moduleVerbService: map[string]moduleVerbService{},
}
return proxy
}
Expand All @@ -46,13 +58,20 @@ func (r *Service) GetDeploymentContext(ctx context.Context, c *connect.Request[f

if rcv {
logger.Debugf("Received DeploymentContext from module: %v", moduleContext.Msg())
newRouteTable := map[string]ftlv1connect.VerbServiceClient{}
newRouteTable := map[string]moduleVerbService{}
for _, route := range moduleContext.Msg().Routes {
logger.Debugf("Adding route: %s -> %s", route.Module, route.Uri)
if client, ok := r.moduleVerbService[route.Module]; ok {
newRouteTable[route.Module] = client
} else {
newRouteTable[route.Module] = rpc.Dial(ftlv1connect.NewVerbServiceClient, route.Uri, log.Error)
logger.Debugf("Adding route: %s -> %s", route.Deployment, route.Uri)

deploment, err := model.ParseDeploymentKey(route.Deployment)
if err != nil {
return fmt.Errorf("failed to parse deployment key: %w", err)
}
module := deploment.Payload.Module
if existing, ok := r.moduleVerbService[module]; !ok || existing.deployment.String() != deploment.String() {
newRouteTable[module] = moduleVerbService{
client: rpc.Dial(ftlv1connect.NewVerbServiceClient, route.Uri, log.Error),
deployment: deploment,
}
}
}
r.moduleVerbService = newRouteTable
Expand Down Expand Up @@ -108,16 +127,48 @@ func (r *Service) Ping(ctx context.Context, c *connect.Request[ftlv1.PingRequest
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (r *Service) Call(ctx context.Context, c *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {

client, ok := r.moduleVerbService[c.Msg.Verb.Module]
func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
start := time.Now()
verbService, ok := r.moduleVerbService[req.Msg.Verb.Module]
if !ok {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", c.Msg.Verb.Module))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", req.Msg.Verb.Module))
}
timelineClient := timeline.ClientFromContext(ctx)

callers, err := headers.GetCallers(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers"))
return nil, err
}

requestKey, ok, err := headers.GetRequestKey(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get request key"))
return nil, fmt.Errorf("could not process headers for request key: %w", err)
} else if !ok {
requestKey = model.NewRequestKey(model.OriginIngress, "grpc")
headers.SetRequestKey(req.Header(), requestKey)
}

callEvent := &timeline.Call{
DeploymentKey: verbService.deployment,
RequestKey: requestKey,
StartTime: start,
DestVerb: schema.RefFromProto(req.Msg.Verb),
Callers: callers,
Request: req.Msg,
}

call, err := client.Call(ctx, headers.CopyRequestForForwarding(c))
originalResp, err := verbService.client.Call(ctx, headers.CopyRequestForForwarding(req))
if err != nil {
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timelineClient.Publish(ctx, callEvent)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
return nil, fmt.Errorf("failed to proxy verb: %w", err)
}
return call, nil
resp := connect.NewResponse(originalResp.Msg)
callEvent.Response = result.Ok(resp.Msg)
timelineClient.Publish(ctx, callEvent)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]())
return resp, nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8259fad

Please sign in to comment.