Skip to content

Commit

Permalink
feat: publish to timeline and telemetry when calling via p2p routing (#…
Browse files Browse the repository at this point in the history
…3687)

- [x] Runner -> runner routing
- [x] `VerbCallRouter` for everything else
- [x] Provide timeline endpoint and attach timeline client to ctx in
each service that does p2p routing

Runners now need to know the deployment they are routing to to record
events properly.
Previously, routing in the deployment context only mapped module to
endpoint. I've changed this to pass along deployment instead.

Also fixes as issue where runners would keep old verb clients around
pointing to the original endpoint for a module even if it had since
changed.

closes #3669

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
matt2e and github-actions[bot] authored Dec 10, 2024
1 parent 36506d4 commit 17409f6
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 97 deletions.
26 changes: 17 additions & 9 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,12 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
configs, err := s.cm.MapForModule(ctx, module)
routeTable := map[string]string{}
for _, module := range callableModuleNames {
if route, ok := routeView.GetForModule(module).Get(); ok {
routeTable[module] = route.String()
deployment, ok := routeView.GetDeployment(module).Get()
if !ok {
continue
}
if route, ok := routeView.Get(deployment).Get(); ok {
routeTable[deployment.String()] = route.String()
}
}
if deployment.Schema.Runtime != nil && deployment.Schema.Runtime.Deployment != nil {
Expand Down Expand Up @@ -854,13 +858,6 @@ func (s *Service) callWithRequest(
currentCaller = callers[len(callers)-1]
}

module := verbRef.Module
route, ok := routes.GetForModule(module).Get()
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
}

var requestKey model.RequestKey
var isNewRequestKey bool
if k, ok := key.Get(); ok {
Expand All @@ -884,11 +881,13 @@ func (s *Service) callWithRequest(
headers.SetRequestKey(req.Header(), requestKey)
}

module := verbRef.Module
deployment, ok := routes.GetDeployment(module).Get()
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment"))
return nil, fmt.Errorf("deployment not found for module %q", module)
}

callEvent := &timeline.Call{
DeploymentKey: deployment,
RequestKey: requestKey,
Expand All @@ -899,6 +898,15 @@ func (s *Service) callWithRequest(
Request: req.Msg,
}

route, ok := routes.Get(deployment).Get()
if !ok {
err = fmt.Errorf("no routes for module %q", module)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
return nil, connect.NewError(connect.CodeNotFound, err)
}

if currentCaller != nil && currentCaller.Module != module && !verb.IsExported() {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported"))
err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
Expand Down
85 changes: 43 additions & 42 deletions backend/protos/xyz/block/ftl/deployment/v1/deployment.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ message GetDeploymentContextResponse {
}

message Route {
string module = 1;
string deployment = 1;
string uri = 2;
}

Expand Down
80 changes: 66 additions & 14 deletions backend/runner/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package proxy
import (
"context"
"fmt"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"

"github.com/TBD54566975/ftl/backend/controller/observability"
ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
ftldeploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect"
ftllease "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1"
Expand All @@ -14,27 +18,35 @@ import (
ftlv1connect2 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/pubsub/v1/ftlv1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/schema"
)

var _ ftlv1connect.VerbServiceHandler = &Service{}
var _ ftldeploymentconnect.DeploymentServiceHandler = &Service{}

type moduleVerbService struct {
client ftlv1connect.VerbServiceClient
deployment model.DeploymentKey
}

type Service struct {
controllerDeploymentService ftldeploymentconnect.DeploymentServiceClient
controllerLeaseService ftlleaseconnect.LeaseServiceClient
controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient
moduleVerbService map[string]ftlv1connect.VerbServiceClient
moduleVerbService map[string]moduleVerbService
}

func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, leaseClient ftlleaseconnect.LeaseServiceClient, controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient) *Service {
proxy := &Service{
controllerDeploymentService: controllerModuleService,
controllerLeaseService: leaseClient,
controllerPubsubService: controllerPubsubService,
moduleVerbService: map[string]ftlv1connect.VerbServiceClient{},
moduleVerbService: map[string]moduleVerbService{},
}
return proxy
}
Expand All @@ -50,13 +62,20 @@ func (r *Service) GetDeploymentContext(ctx context.Context, c *connect.Request[f

if rcv {
logger.Debugf("Received DeploymentContext from module: %v", moduleContext.Msg())
newRouteTable := map[string]ftlv1connect.VerbServiceClient{}
newRouteTable := map[string]moduleVerbService{}
for _, route := range moduleContext.Msg().Routes {
logger.Debugf("Adding route: %s -> %s", route.Module, route.Uri)
if client, ok := r.moduleVerbService[route.Module]; ok {
newRouteTable[route.Module] = client
} else {
newRouteTable[route.Module] = rpc.Dial(ftlv1connect.NewVerbServiceClient, route.Uri, log.Error)
logger.Debugf("Adding route: %s -> %s", route.Deployment, route.Uri)

deploment, err := model.ParseDeploymentKey(route.Deployment)
if err != nil {
return fmt.Errorf("failed to parse deployment key: %w", err)
}
module := deploment.Payload.Module
if existing, ok := r.moduleVerbService[module]; !ok || existing.deployment.String() != deploment.String() {
newRouteTable[module] = moduleVerbService{
client: rpc.Dial(ftlv1connect.NewVerbServiceClient, route.Uri, log.Error),
deployment: deploment,
}
}
}
r.moduleVerbService = newRouteTable
Expand Down Expand Up @@ -112,18 +131,51 @@ func (r *Service) Ping(ctx context.Context, c *connect.Request[ftlv1.PingRequest
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (r *Service) Call(ctx context.Context, c *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {

client, ok := r.moduleVerbService[c.Msg.Verb.Module]
func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
start := time.Now()
verbService, ok := r.moduleVerbService[req.Msg.Verb.Module]
if !ok {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", c.Msg.Verb.Module))
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found"))
}
timelineClient := timeline.ClientFromContext(ctx)

callers, err := headers.GetCallers(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers"))
return nil, fmt.Errorf("could not get callers from headers: %w", err)
}

requestKey, ok, err := headers.GetRequestKey(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get request key"))
return nil, fmt.Errorf("could not process headers for request key: %w", err)
} else if !ok {
requestKey = model.NewRequestKey(model.OriginIngress, "grpc")
headers.SetRequestKey(req.Header(), requestKey)
}

callEvent := &timeline.Call{
DeploymentKey: verbService.deployment,
RequestKey: requestKey,
StartTime: start,
DestVerb: schema.RefFromProto(req.Msg.Verb),
Callers: callers,
Request: req.Msg,
}

call, err := client.Call(ctx, headers.CopyRequestForForwarding(c))
originalResp, err := verbService.client.Call(ctx, headers.CopyRequestForForwarding(req))
if err != nil {
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timelineClient.Publish(ctx, callEvent)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
return nil, fmt.Errorf("failed to proxy verb: %w", err)
}
return call, nil
resp := connect.NewResponse(originalResp.Msg)
callEvent.Response = result.Ok(resp.Msg)
timelineClient.Publish(ctx, callEvent)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]())
return resp, nil
}

// ResetSubscription is legacy, it will go once the DB based pubsub is removed
Expand Down
4 changes: 4 additions & 0 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/TBD54566975/ftl/backend/runner/observability"
"github.com/TBD54566975/ftl/backend/runner/proxy"
"github.com/TBD54566975/ftl/backend/runner/pubsub"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/download"
"github.com/TBD54566975/ftl/internal/dsn"
Expand All @@ -59,6 +60,7 @@ type Config struct {
Key model.RunnerKey `help:"Runner key (auto)."`
ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
LeaseEndpoint *url.URL `name:"ftl-lease-endpoint" help:"Lease endpoint endpoint." env:"FTL_LEASE_ENDPOINT" default:"http://127.0.0.1:8895"`
TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"`
TemplateDir string `help:"Template directory to copy into each deployment, if any." type:"existingdir"`
DeploymentDir string `help:"Directory to store deployments in." default:"${deploymentdir}"`
DeploymentKeepHistory int `help:"Number of deployments to keep history for." default:"3"`
Expand All @@ -85,6 +87,8 @@ func Start(ctx context.Context, config Config, storage *artefacts.OCIArtefactSer
logger := log.FromContext(ctx).Attrs(map[string]string{"runner": config.Key.String()})
logger.Debugf("Starting FTL Runner")

ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, config.TimelineEndpoint))

err = manageDeploymentDirectory(logger, config)
if err != nil {
observability.Runner.StartupFailed(ctx)
Expand Down
Loading

0 comments on commit 17409f6

Please sign in to comment.