Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: publish to timeline and telemetry when calling via p2p routing #3687

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,8 +727,12 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
configs, err := s.cm.MapForModule(ctx, module)
routeTable := map[string]string{}
for _, module := range callableModuleNames {
if route, ok := routeView.GetForModule(module).Get(); ok {
routeTable[module] = route.String()
deployment, ok := routeView.GetDeployment(module).Get()
if !ok {
continue
}
if route, ok := routeView.Get(deployment).Get(); ok {
routeTable[deployment.String()] = route.String()
}
}
if deployment.Schema.Runtime != nil && deployment.Schema.Runtime.Deployment != nil {
Expand Down Expand Up @@ -855,13 +859,6 @@ func (s *Service) callWithRequest(
currentCaller = callers[len(callers)-1]
}

module := verbRef.Module
route, ok := routes.GetForModule(module).Get()
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
}

var requestKey model.RequestKey
var isNewRequestKey bool
if k, ok := key.Get(); ok {
Expand All @@ -885,11 +882,13 @@ func (s *Service) callWithRequest(
headers.SetRequestKey(req.Header(), requestKey)
}

module := verbRef.Module
deployment, ok := routes.GetDeployment(module).Get()
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment"))
return nil, fmt.Errorf("deployment not found for module %q", module)
}

callEvent := &timeline.Call{
DeploymentKey: deployment,
RequestKey: requestKey,
Expand All @@ -900,6 +899,15 @@ func (s *Service) callWithRequest(
Request: req.Msg,
}

route, ok := routes.Get(deployment).Get()
if !ok {
err = fmt.Errorf("no routes for module %q", module)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timeline.ClientFromContext(ctx).Publish(ctx, callEvent)
return nil, connect.NewError(connect.CodeNotFound, err)
}

if currentCaller != nil && currentCaller.Module != module && !verb.IsExported() {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported"))
err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
Expand Down
85 changes: 43 additions & 42 deletions backend/protos/xyz/block/ftl/deployment/v1/deployment.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
}

message Route {
string module = 1;
string deployment = 1;

Check failure on line 28 in backend/protos/xyz/block/ftl/deployment/v1/deployment.proto

View workflow job for this annotation

GitHub Actions / Proto Breaking Change Check

Field "1" with name "deployment" on message "Route" changed option "json_name" from "module" to "deployment".

Check failure on line 28 in backend/protos/xyz/block/ftl/deployment/v1/deployment.proto

View workflow job for this annotation

GitHub Actions / Proto Breaking Change Check

Field "1" on message "Route" changed name from "module" to "deployment".
string uri = 2;
}

Expand Down
80 changes: 66 additions & 14 deletions backend/runner/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package proxy
import (
"context"
"fmt"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/result"

"github.com/TBD54566975/ftl/backend/controller/observability"
ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
ftldeploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect"
ftllease "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1"
Expand All @@ -14,27 +18,35 @@ import (
ftlv1connect2 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/pubsub/v1/ftlv1connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/schema"
)

var _ ftlv1connect.VerbServiceHandler = &Service{}
var _ ftldeploymentconnect.DeploymentServiceHandler = &Service{}

type moduleVerbService struct {
client ftlv1connect.VerbServiceClient
deployment model.DeploymentKey
}

type Service struct {
controllerDeploymentService ftldeploymentconnect.DeploymentServiceClient
controllerLeaseService ftlleaseconnect.LeaseServiceClient
controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient
moduleVerbService map[string]ftlv1connect.VerbServiceClient
moduleVerbService map[string]moduleVerbService
}

func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, leaseClient ftlleaseconnect.LeaseServiceClient, controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient) *Service {
proxy := &Service{
controllerDeploymentService: controllerModuleService,
controllerLeaseService: leaseClient,
controllerPubsubService: controllerPubsubService,
moduleVerbService: map[string]ftlv1connect.VerbServiceClient{},
moduleVerbService: map[string]moduleVerbService{},
}
return proxy
}
Expand All @@ -50,13 +62,20 @@ func (r *Service) GetDeploymentContext(ctx context.Context, c *connect.Request[f

if rcv {
logger.Debugf("Received DeploymentContext from module: %v", moduleContext.Msg())
newRouteTable := map[string]ftlv1connect.VerbServiceClient{}
newRouteTable := map[string]moduleVerbService{}
for _, route := range moduleContext.Msg().Routes {
logger.Debugf("Adding route: %s -> %s", route.Module, route.Uri)
if client, ok := r.moduleVerbService[route.Module]; ok {
newRouteTable[route.Module] = client
} else {
newRouteTable[route.Module] = rpc.Dial(ftlv1connect.NewVerbServiceClient, route.Uri, log.Error)
logger.Debugf("Adding route: %s -> %s", route.Deployment, route.Uri)

deploment, err := model.ParseDeploymentKey(route.Deployment)
if err != nil {
return fmt.Errorf("failed to parse deployment key: %w", err)
}
module := deploment.Payload.Module
if existing, ok := r.moduleVerbService[module]; !ok || existing.deployment.String() != deploment.String() {
newRouteTable[module] = moduleVerbService{
client: rpc.Dial(ftlv1connect.NewVerbServiceClient, route.Uri, log.Error),
deployment: deploment,
}
}
}
r.moduleVerbService = newRouteTable
Expand Down Expand Up @@ -112,18 +131,51 @@ func (r *Service) Ping(ctx context.Context, c *connect.Request[ftlv1.PingRequest
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (r *Service) Call(ctx context.Context, c *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {

client, ok := r.moduleVerbService[c.Msg.Verb.Module]
func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
start := time.Now()
verbService, ok := r.moduleVerbService[req.Msg.Verb.Module]
if !ok {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", c.Msg.Verb.Module))
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found"))
}
timelineClient := timeline.ClientFromContext(ctx)

callers, err := headers.GetCallers(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers"))
return nil, fmt.Errorf("could not get callers from headers: %w", err)
}

requestKey, ok, err := headers.GetRequestKey(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get request key"))
return nil, fmt.Errorf("could not process headers for request key: %w", err)
} else if !ok {
requestKey = model.NewRequestKey(model.OriginIngress, "grpc")
headers.SetRequestKey(req.Header(), requestKey)
}

callEvent := &timeline.Call{
DeploymentKey: verbService.deployment,
RequestKey: requestKey,
StartTime: start,
DestVerb: schema.RefFromProto(req.Msg.Verb),
Callers: callers,
Request: req.Msg,
}

call, err := client.Call(ctx, headers.CopyRequestForForwarding(c))
originalResp, err := verbService.client.Call(ctx, headers.CopyRequestForForwarding(req))
if err != nil {
callEvent.Response = result.Err[*ftlv1.CallResponse](err)
timelineClient.Publish(ctx, callEvent)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
return nil, fmt.Errorf("failed to proxy verb: %w", err)
}
return call, nil
resp := connect.NewResponse(originalResp.Msg)
callEvent.Response = result.Ok(resp.Msg)
timelineClient.Publish(ctx, callEvent)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]())
return resp, nil
}

// ResetSubscription is legacy, it will go once the DB based pubsub is removed
Expand Down
4 changes: 4 additions & 0 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/TBD54566975/ftl/backend/runner/observability"
"github.com/TBD54566975/ftl/backend/runner/proxy"
"github.com/TBD54566975/ftl/backend/runner/pubsub"
"github.com/TBD54566975/ftl/backend/timeline"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/download"
"github.com/TBD54566975/ftl/internal/dsn"
Expand All @@ -59,6 +60,7 @@ type Config struct {
Key model.RunnerKey `help:"Runner key (auto)."`
ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
LeaseEndpoint *url.URL `name:"ftl-lease-endpoint" help:"Lease endpoint endpoint." env:"FTL_LEASE_ENDPOINT" default:"http://127.0.0.1:8895"`
TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"`
TemplateDir string `help:"Template directory to copy into each deployment, if any." type:"existingdir"`
DeploymentDir string `help:"Directory to store deployments in." default:"${deploymentdir}"`
DeploymentKeepHistory int `help:"Number of deployments to keep history for." default:"3"`
Expand All @@ -85,6 +87,8 @@ func Start(ctx context.Context, config Config, storage *artefacts.OCIArtefactSer
logger := log.FromContext(ctx).Attrs(map[string]string{"runner": config.Key.String()})
logger.Debugf("Starting FTL Runner")

ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, config.TimelineEndpoint))

err = manageDeploymentDirectory(logger, config)
if err != nil {
observability.Runner.StartupFailed(ctx)
Expand Down
Loading
Loading