From 6be800aa95c86b353ce56ea3a1108a2ad8a2620b Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Tue, 10 Dec 2024 12:17:41 +1100 Subject: [PATCH 1/6] feat: publish to timeline and telemetry when calling via p2p routing # Conflicts: # backend/protos/xyz/block/ftl/deployment/v1/deployment.pb.go # backend/runner/proxy/proxy.go # python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.py --- backend/controller/controller.go | 26 ++++-- .../block/ftl/deployment/v1/deployment.pb.go | 85 ++++++++++--------- .../block/ftl/deployment/v1/deployment.proto | 2 +- backend/runner/proxy/proxy.go | 79 ++++++++++++++--- .../block/ftl/deployment/v1/deployment_pb.ts | 6 +- internal/deploymentcontext/to_proto.go | 6 +- .../block/ftl/deployment/v1/deployment_pb2.py | 22 ++--- .../ftl/deployment/v1/deployment_pb2.pyi | 8 +- 8 files changed, 147 insertions(+), 87 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index dc392ae27a..3d16a97b0e 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -727,8 +727,12 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request configs, err := s.cm.MapForModule(ctx, module) routeTable := map[string]string{} for _, module := range callableModuleNames { - if route, ok := routeView.GetForModule(module).Get(); ok { - routeTable[module] = route.String() + deployment, ok := routeView.GetDeployment(module).Get() + if !ok { + continue + } + if route, ok := routeView.Get(deployment).Get(); ok { + routeTable[deployment.String()] = route.String() } } if deployment.Schema.Runtime != nil && deployment.Schema.Runtime.Deployment != nil { @@ -855,13 +859,6 @@ func (s *Service) callWithRequest( currentCaller = callers[len(callers)-1] } - module := verbRef.Module - route, ok := routes.GetForModule(module).Get() - if !ok { - observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module")) - return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module)) - } - var requestKey model.RequestKey var isNewRequestKey bool if k, ok := key.Get(); ok { @@ -885,11 +882,13 @@ func (s *Service) callWithRequest( headers.SetRequestKey(req.Header(), requestKey) } + module := verbRef.Module deployment, ok := routes.GetDeployment(module).Get() if !ok { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment")) return nil, fmt.Errorf("deployment not found for module %q", module) } + callEvent := &timeline.Call{ DeploymentKey: deployment, RequestKey: requestKey, @@ -900,6 +899,15 @@ func (s *Service) callWithRequest( Request: req.Msg, } + route, ok := routes.Get(deployment).Get() + if !ok { + err = fmt.Errorf("no routes for module %q", module) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module")) + callEvent.Response = result.Err[*ftlv1.CallResponse](err) + timeline.ClientFromContext(ctx).Publish(ctx, callEvent) + return nil, connect.NewError(connect.CodeNotFound, err) + } + if currentCaller != nil && currentCaller.Module != module && !verb.IsExported() { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported")) err = connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef)) diff --git a/backend/protos/xyz/block/ftl/deployment/v1/deployment.pb.go b/backend/protos/xyz/block/ftl/deployment/v1/deployment.pb.go index 600e401b00..28ca150445 100644 --- a/backend/protos/xyz/block/ftl/deployment/v1/deployment.pb.go +++ b/backend/protos/xyz/block/ftl/deployment/v1/deployment.pb.go @@ -266,8 +266,8 @@ type GetDeploymentContextResponse_Route struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Module string `protobuf:"bytes,1,opt,name=module,proto3" json:"module,omitempty"` - Uri string `protobuf:"bytes,2,opt,name=uri,proto3" json:"uri,omitempty"` + Deployment string `protobuf:"bytes,1,opt,name=deployment,proto3" json:"deployment,omitempty"` + Uri string `protobuf:"bytes,2,opt,name=uri,proto3" json:"uri,omitempty"` } func (x *GetDeploymentContextResponse_Route) Reset() { @@ -300,9 +300,9 @@ func (*GetDeploymentContextResponse_Route) Descriptor() ([]byte, []int) { return file_xyz_block_ftl_deployment_v1_deployment_proto_rawDescGZIP(), []int{1, 1} } -func (x *GetDeploymentContextResponse_Route) GetModule() string { +func (x *GetDeploymentContextResponse_Route) GetDeployment() string { if x != nil { - return x.Module + return x.Deployment } return "" } @@ -327,7 +327,7 @@ var file_xyz_block_ftl_deployment_v1_deployment_proto_rawDesc = []byte{ 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x64, 0x65, 0x70, 0x6c, - 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xcb, 0x06, 0x0a, 0x1c, 0x47, 0x65, 0x74, 0x44, 0x65, + 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xd3, 0x06, 0x0a, 0x1c, 0x47, 0x65, 0x74, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x12, @@ -364,43 +364,44 @@ var file_xyz_block_ftl_deployment_v1_deployment_proto_rawDesc = []byte{ 0x31, 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x44, 0x62, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x64, - 0x73, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x64, 0x73, 0x6e, 0x1a, 0x31, 0x0a, - 0x05, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x6f, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x10, - 0x0a, 0x03, 0x75, 0x72, 0x69, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x69, - 0x1a, 0x3a, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, - 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x3a, 0x0a, 0x0c, - 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, - 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, - 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x4a, 0x0a, 0x06, 0x44, 0x62, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x17, 0x0a, 0x13, 0x44, 0x42, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, - 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x44, - 0x42, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x50, 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, - 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x44, 0x42, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4d, 0x59, 0x53, - 0x51, 0x4c, 0x10, 0x02, 0x32, 0xef, 0x01, 0x0a, 0x11, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, - 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4a, 0x0a, 0x04, 0x50, 0x69, - 0x6e, 0x67, 0x12, 0x1d, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, - 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1e, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, - 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x03, 0x90, 0x02, 0x01, 0x12, 0x8d, 0x01, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x44, 0x65, - 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, - 0x38, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, - 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, - 0x74, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, - 0x78, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x39, 0x2e, 0x78, 0x79, 0x7a, 0x2e, - 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, - 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, 0x70, 0x6c, 0x6f, - 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x4f, 0x50, 0x01, 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x54, 0x42, 0x44, 0x35, 0x34, 0x35, 0x36, 0x36, 0x39, - 0x37, 0x35, 0x2f, 0x66, 0x74, 0x6c, 0x2f, 0x62, 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x78, 0x79, 0x7a, 0x2f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2f, - 0x66, 0x74, 0x6c, 0x2f, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x2f, 0x76, - 0x31, 0x3b, 0x66, 0x74, 0x6c, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x64, 0x73, 0x6e, 0x1a, 0x39, 0x0a, + 0x05, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, + 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x64, 0x65, 0x70, 0x6c, + 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x69, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x69, 0x1a, 0x3a, 0x0a, 0x0c, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x3a, 0x0a, 0x0c, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x22, 0x4a, 0x0a, 0x06, 0x44, 0x62, 0x54, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x13, 0x44, 0x42, + 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x00, 0x12, 0x14, 0x0a, 0x10, 0x44, 0x42, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x50, + 0x4f, 0x53, 0x54, 0x47, 0x52, 0x45, 0x53, 0x10, 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x44, 0x42, 0x5f, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4d, 0x59, 0x53, 0x51, 0x4c, 0x10, 0x02, 0x32, 0xef, 0x01, 0x0a, + 0x11, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x12, 0x4a, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1d, 0x2e, 0x78, 0x79, 0x7a, + 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, + 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x78, 0x79, 0x7a, 0x2e, + 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x69, 0x6e, + 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x03, 0x90, 0x02, 0x01, 0x12, 0x8d, + 0x01, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, + 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x12, 0x38, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, + 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, 0x6c, 0x2e, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, + 0x6e, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, + 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x39, 0x2e, 0x78, 0x79, 0x7a, 0x2e, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2e, 0x66, 0x74, + 0x6c, 0x2e, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x31, 0x2e, + 0x47, 0x65, 0x74, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, + 0x74, 0x65, 0x78, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x4f, + 0x50, 0x01, 0x5a, 0x4b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x54, + 0x42, 0x44, 0x35, 0x34, 0x35, 0x36, 0x36, 0x39, 0x37, 0x35, 0x2f, 0x66, 0x74, 0x6c, 0x2f, 0x62, + 0x61, 0x63, 0x6b, 0x65, 0x6e, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x78, 0x79, + 0x7a, 0x2f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x2f, 0x66, 0x74, 0x6c, 0x2f, 0x64, 0x65, 0x70, 0x6c, + 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x2f, 0x76, 0x31, 0x3b, 0x66, 0x74, 0x6c, 0x76, 0x31, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/backend/protos/xyz/block/ftl/deployment/v1/deployment.proto b/backend/protos/xyz/block/ftl/deployment/v1/deployment.proto index dbed5594e0..9829ea7e89 100644 --- a/backend/protos/xyz/block/ftl/deployment/v1/deployment.proto +++ b/backend/protos/xyz/block/ftl/deployment/v1/deployment.proto @@ -25,7 +25,7 @@ message GetDeploymentContextResponse { } message Route { - string module = 1; + string deployment = 1; string uri = 2; } diff --git a/backend/runner/proxy/proxy.go b/backend/runner/proxy/proxy.go index 2bd2130af5..86d87155a6 100644 --- a/backend/runner/proxy/proxy.go +++ b/backend/runner/proxy/proxy.go @@ -3,9 +3,11 @@ package proxy import ( "context" "fmt" + "time" "connectrpc.com/connect" + "github.com/TBD54566975/ftl/backend/controller/observability" ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1" ftldeploymentconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1/ftlv1connect" ftllease "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/lease/v1" @@ -14,19 +16,29 @@ import ( ftlv1connect2 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/pubsub/v1/ftlv1connect" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/backend/timeline" "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/rpc/headers" + "github.com/TBD54566975/ftl/internal/schema" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/result" ) var _ ftlv1connect.VerbServiceHandler = &Service{} var _ ftldeploymentconnect.DeploymentServiceHandler = &Service{} +type moduleVerbService struct { + client ftlv1connect.VerbServiceClient + deployment model.DeploymentKey +} + type Service struct { controllerDeploymentService ftldeploymentconnect.DeploymentServiceClient controllerLeaseService ftlleaseconnect.LeaseServiceClient controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient - moduleVerbService map[string]ftlv1connect.VerbServiceClient + moduleVerbService map[string]moduleVerbService } func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, leaseClient ftlleaseconnect.LeaseServiceClient, controllerPubsubService ftlv1connect2.LegacyPubsubServiceClient) *Service { @@ -34,7 +46,7 @@ func New(controllerModuleService ftldeploymentconnect.DeploymentServiceClient, l controllerDeploymentService: controllerModuleService, controllerLeaseService: leaseClient, controllerPubsubService: controllerPubsubService, - moduleVerbService: map[string]ftlv1connect.VerbServiceClient{}, + moduleVerbService: map[string]moduleVerbService{}, } return proxy } @@ -50,13 +62,20 @@ func (r *Service) GetDeploymentContext(ctx context.Context, c *connect.Request[f if rcv { logger.Debugf("Received DeploymentContext from module: %v", moduleContext.Msg()) - newRouteTable := map[string]ftlv1connect.VerbServiceClient{} + newRouteTable := map[string]moduleVerbService{} for _, route := range moduleContext.Msg().Routes { - logger.Debugf("Adding route: %s -> %s", route.Module, route.Uri) - if client, ok := r.moduleVerbService[route.Module]; ok { - newRouteTable[route.Module] = client - } else { - newRouteTable[route.Module] = rpc.Dial(ftlv1connect.NewVerbServiceClient, route.Uri, log.Error) + logger.Debugf("Adding route: %s -> %s", route.Deployment, route.Uri) + + deploment, err := model.ParseDeploymentKey(route.Deployment) + if err != nil { + return fmt.Errorf("failed to parse deployment key: %w", err) + } + module := deploment.Payload.Module + if existing, ok := r.moduleVerbService[module]; !ok || existing.deployment.String() != deploment.String() { + newRouteTable[module] = moduleVerbService{ + client: rpc.Dial(ftlv1connect.NewVerbServiceClient, route.Uri, log.Error), + deployment: deploment, + } } } r.moduleVerbService = newRouteTable @@ -112,18 +131,50 @@ func (r *Service) Ping(ctx context.Context, c *connect.Request[ftlv1.PingRequest return connect.NewResponse(&ftlv1.PingResponse{}), nil } -func (r *Service) Call(ctx context.Context, c *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { - - client, ok := r.moduleVerbService[c.Msg.Verb.Module] +func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { + start := time.Now() + verbService, ok := r.moduleVerbService[req.Msg.Verb.Module] if !ok { - return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", c.Msg.Verb.Module)) + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", req.Msg.Verb.Module)) + } + timelineClient := timeline.ClientFromContext(ctx) + + callers, err := headers.GetCallers(req.Header()) + if err != nil { + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers")) + return nil, err + } + + requestKey, ok, err := headers.GetRequestKey(req.Header()) + if err != nil { + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get request key")) + return nil, fmt.Errorf("could not process headers for request key: %w", err) + } else if !ok { + requestKey = model.NewRequestKey(model.OriginIngress, "grpc") + headers.SetRequestKey(req.Header(), requestKey) + } + + callEvent := &timeline.Call{ + DeploymentKey: verbService.deployment, + RequestKey: requestKey, + StartTime: start, + DestVerb: schema.RefFromProto(req.Msg.Verb), + Callers: callers, + Request: req.Msg, } - call, err := client.Call(ctx, headers.CopyRequestForForwarding(c)) + originalResp, err := verbService.client.Call(ctx, headers.CopyRequestForForwarding(req)) if err != nil { + callEvent.Response = result.Err[*ftlv1.CallResponse](err) + timelineClient.Publish(ctx, callEvent) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) return nil, fmt.Errorf("failed to proxy verb: %w", err) } - return call, nil + resp := connect.NewResponse(originalResp.Msg) + callEvent.Response = result.Ok(resp.Msg) + timelineClient.Publish(ctx, callEvent) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]()) + return resp, nil } // ResetSubscription is legacy, it will go once the DB based pubsub is removed diff --git a/frontend/console/src/protos/xyz/block/ftl/deployment/v1/deployment_pb.ts b/frontend/console/src/protos/xyz/block/ftl/deployment/v1/deployment_pb.ts index be07af04e3..5a6ce4dffc 100644 --- a/frontend/console/src/protos/xyz/block/ftl/deployment/v1/deployment_pb.ts +++ b/frontend/console/src/protos/xyz/block/ftl/deployment/v1/deployment_pb.ts @@ -190,9 +190,9 @@ export class GetDeploymentContextResponse_DSN extends Message { /** - * @generated from field: string module = 1; + * @generated from field: string deployment = 1; */ - module = ""; + deployment = ""; /** * @generated from field: string uri = 2; @@ -207,7 +207,7 @@ export class GetDeploymentContextResponse_Route extends Message [ - { no: 1, name: "module", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 1, name: "deployment", kind: "scalar", T: 9 /* ScalarType.STRING */ }, { no: 2, name: "uri", kind: "scalar", T: 9 /* ScalarType.STRING */ }, ]); diff --git a/internal/deploymentcontext/to_proto.go b/internal/deploymentcontext/to_proto.go index 522d67b81a..59486bc9b0 100644 --- a/internal/deploymentcontext/to_proto.go +++ b/internal/deploymentcontext/to_proto.go @@ -18,10 +18,10 @@ func (m DeploymentContext) ToProto() *ftlv1.GetDeploymentContextResponse { }) } routes := make([]*ftlv1.GetDeploymentContextResponse_Route, 0, len(m.routes)) - for name, entry := range m.routes { + for dep, entry := range m.routes { routes = append(routes, &ftlv1.GetDeploymentContextResponse_Route{ - Module: name, - Uri: entry, + Deployment: dep, + Uri: entry, }) } return &ftlv1.GetDeploymentContextResponse{ diff --git a/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.py b/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.py index 77d32fe86b..4ea0d1fad2 100644 --- a/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.py +++ b/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.py @@ -25,7 +25,7 @@ from xyz.block.ftl.v1 import ftl_pb2 as xyz_dot_block_dot_ftl_dot_v1_dot_ftl__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n,xyz/block/ftl/deployment/v1/deployment.proto\x12\x1bxyz.block.ftl.deployment.v1\x1a\x1axyz/block/ftl/v1/ftl.proto\"=\n\x1bGetDeploymentContextRequest\x12\x1e\n\ndeployment\x18\x01 \x01(\tR\ndeployment\"\xcb\x06\n\x1cGetDeploymentContextResponse\x12\x16\n\x06module\x18\x01 \x01(\tR\x06module\x12\x1e\n\ndeployment\x18\x02 \x01(\tR\ndeployment\x12`\n\x07\x63onfigs\x18\x03 \x03(\x0b\x32\x46.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.ConfigsEntryR\x07\x63onfigs\x12`\n\x07secrets\x18\x04 \x03(\x0b\x32\x46.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.SecretsEntryR\x07secrets\x12[\n\tdatabases\x18\x05 \x03(\x0b\x32=.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.DSNR\tdatabases\x12W\n\x06routes\x18\x06 \x03(\x0b\x32?.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.RouteR\x06routes\x1a\x81\x01\n\x03\x44SN\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12T\n\x04type\x18\x02 \x01(\x0e\x32@.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.DbTypeR\x04type\x12\x10\n\x03\x64sn\x18\x03 \x01(\tR\x03\x64sn\x1a\x31\n\x05Route\x12\x16\n\x06module\x18\x01 \x01(\tR\x06module\x12\x10\n\x03uri\x18\x02 \x01(\tR\x03uri\x1a:\n\x0c\x43onfigsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\x0cR\x05value:\x02\x38\x01\x1a:\n\x0cSecretsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\x0cR\x05value:\x02\x38\x01\"J\n\x06\x44\x62Type\x12\x17\n\x13\x44\x42_TYPE_UNSPECIFIED\x10\x00\x12\x14\n\x10\x44\x42_TYPE_POSTGRES\x10\x01\x12\x11\n\rDB_TYPE_MYSQL\x10\x02\x32\xef\x01\n\x11\x44\x65ploymentService\x12J\n\x04Ping\x12\x1d.xyz.block.ftl.v1.PingRequest\x1a\x1e.xyz.block.ftl.v1.PingResponse\"\x03\x90\x02\x01\x12\x8d\x01\n\x14GetDeploymentContext\x12\x38.xyz.block.ftl.deployment.v1.GetDeploymentContextRequest\x1a\x39.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse0\x01\x42OP\x01ZKgithub.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1;ftlv1b\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n,xyz/block/ftl/deployment/v1/deployment.proto\x12\x1bxyz.block.ftl.deployment.v1\x1a\x1axyz/block/ftl/v1/ftl.proto\"=\n\x1bGetDeploymentContextRequest\x12\x1e\n\ndeployment\x18\x01 \x01(\tR\ndeployment\"\xd3\x06\n\x1cGetDeploymentContextResponse\x12\x16\n\x06module\x18\x01 \x01(\tR\x06module\x12\x1e\n\ndeployment\x18\x02 \x01(\tR\ndeployment\x12`\n\x07\x63onfigs\x18\x03 \x03(\x0b\x32\x46.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.ConfigsEntryR\x07\x63onfigs\x12`\n\x07secrets\x18\x04 \x03(\x0b\x32\x46.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.SecretsEntryR\x07secrets\x12[\n\tdatabases\x18\x05 \x03(\x0b\x32=.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.DSNR\tdatabases\x12W\n\x06routes\x18\x06 \x03(\x0b\x32?.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.RouteR\x06routes\x1a\x81\x01\n\x03\x44SN\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12T\n\x04type\x18\x02 \x01(\x0e\x32@.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse.DbTypeR\x04type\x12\x10\n\x03\x64sn\x18\x03 \x01(\tR\x03\x64sn\x1a\x39\n\x05Route\x12\x1e\n\ndeployment\x18\x01 \x01(\tR\ndeployment\x12\x10\n\x03uri\x18\x02 \x01(\tR\x03uri\x1a:\n\x0c\x43onfigsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\x0cR\x05value:\x02\x38\x01\x1a:\n\x0cSecretsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\x0cR\x05value:\x02\x38\x01\"J\n\x06\x44\x62Type\x12\x17\n\x13\x44\x42_TYPE_UNSPECIFIED\x10\x00\x12\x14\n\x10\x44\x42_TYPE_POSTGRES\x10\x01\x12\x11\n\rDB_TYPE_MYSQL\x10\x02\x32\xef\x01\n\x11\x44\x65ploymentService\x12J\n\x04Ping\x12\x1d.xyz.block.ftl.v1.PingRequest\x1a\x1e.xyz.block.ftl.v1.PingResponse\"\x03\x90\x02\x01\x12\x8d\x01\n\x14GetDeploymentContext\x12\x38.xyz.block.ftl.deployment.v1.GetDeploymentContextRequest\x1a\x39.xyz.block.ftl.deployment.v1.GetDeploymentContextResponse0\x01\x42OP\x01ZKgithub.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1;ftlv1b\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -42,17 +42,17 @@ _globals['_GETDEPLOYMENTCONTEXTREQUEST']._serialized_start=105 _globals['_GETDEPLOYMENTCONTEXTREQUEST']._serialized_end=166 _globals['_GETDEPLOYMENTCONTEXTRESPONSE']._serialized_start=169 - _globals['_GETDEPLOYMENTCONTEXTRESPONSE']._serialized_end=1012 + _globals['_GETDEPLOYMENTCONTEXTRESPONSE']._serialized_end=1020 _globals['_GETDEPLOYMENTCONTEXTRESPONSE_DSN']._serialized_start=636 _globals['_GETDEPLOYMENTCONTEXTRESPONSE_DSN']._serialized_end=765 _globals['_GETDEPLOYMENTCONTEXTRESPONSE_ROUTE']._serialized_start=767 - _globals['_GETDEPLOYMENTCONTEXTRESPONSE_ROUTE']._serialized_end=816 - _globals['_GETDEPLOYMENTCONTEXTRESPONSE_CONFIGSENTRY']._serialized_start=818 - _globals['_GETDEPLOYMENTCONTEXTRESPONSE_CONFIGSENTRY']._serialized_end=876 - _globals['_GETDEPLOYMENTCONTEXTRESPONSE_SECRETSENTRY']._serialized_start=878 - _globals['_GETDEPLOYMENTCONTEXTRESPONSE_SECRETSENTRY']._serialized_end=936 - _globals['_GETDEPLOYMENTCONTEXTRESPONSE_DBTYPE']._serialized_start=938 - _globals['_GETDEPLOYMENTCONTEXTRESPONSE_DBTYPE']._serialized_end=1012 - _globals['_DEPLOYMENTSERVICE']._serialized_start=1015 - _globals['_DEPLOYMENTSERVICE']._serialized_end=1254 + _globals['_GETDEPLOYMENTCONTEXTRESPONSE_ROUTE']._serialized_end=824 + _globals['_GETDEPLOYMENTCONTEXTRESPONSE_CONFIGSENTRY']._serialized_start=826 + _globals['_GETDEPLOYMENTCONTEXTRESPONSE_CONFIGSENTRY']._serialized_end=884 + _globals['_GETDEPLOYMENTCONTEXTRESPONSE_SECRETSENTRY']._serialized_start=886 + _globals['_GETDEPLOYMENTCONTEXTRESPONSE_SECRETSENTRY']._serialized_end=944 + _globals['_GETDEPLOYMENTCONTEXTRESPONSE_DBTYPE']._serialized_start=946 + _globals['_GETDEPLOYMENTCONTEXTRESPONSE_DBTYPE']._serialized_end=1020 + _globals['_DEPLOYMENTSERVICE']._serialized_start=1023 + _globals['_DEPLOYMENTSERVICE']._serialized_end=1262 # @@protoc_insertion_point(module_scope) diff --git a/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.pyi b/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.pyi index ad54429598..dc7ebd1e4e 100644 --- a/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.pyi +++ b/python-runtime/ftl/src/ftl/protos/xyz/block/ftl/deployment/v1/deployment_pb2.pyi @@ -33,12 +33,12 @@ class GetDeploymentContextResponse(_message.Message): dsn: str def __init__(self, name: _Optional[str] = ..., type: _Optional[_Union[GetDeploymentContextResponse.DbType, str]] = ..., dsn: _Optional[str] = ...) -> None: ... class Route(_message.Message): - __slots__ = ("module", "uri") - MODULE_FIELD_NUMBER: _ClassVar[int] + __slots__ = ("deployment", "uri") + DEPLOYMENT_FIELD_NUMBER: _ClassVar[int] URI_FIELD_NUMBER: _ClassVar[int] - module: str + deployment: str uri: str - def __init__(self, module: _Optional[str] = ..., uri: _Optional[str] = ...) -> None: ... + def __init__(self, deployment: _Optional[str] = ..., uri: _Optional[str] = ...) -> None: ... class ConfigsEntry(_message.Message): __slots__ = ("key", "value") KEY_FIELD_NUMBER: _ClassVar[int] From 21126d338008af6290cbc623be425a08209b9f46 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 9 Dec 2024 05:50:53 +0000 Subject: [PATCH 2/6] chore(autofmt): Automated formatting --- backend/runner/proxy/proxy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/runner/proxy/proxy.go b/backend/runner/proxy/proxy.go index 86d87155a6..01fba2f04f 100644 --- a/backend/runner/proxy/proxy.go +++ b/backend/runner/proxy/proxy.go @@ -6,6 +6,8 @@ import ( "time" "connectrpc.com/connect" + "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/result" "github.com/TBD54566975/ftl/backend/controller/observability" ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1" @@ -22,8 +24,6 @@ import ( "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/rpc/headers" "github.com/TBD54566975/ftl/internal/schema" - "github.com/alecthomas/types/optional" - "github.com/alecthomas/types/result" ) var _ ftlv1connect.VerbServiceHandler = &Service{} From ab64559bd41b82cb2270d26d9b0d1588d26835bd Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Mon, 9 Dec 2024 16:58:09 +1100 Subject: [PATCH 3/6] feat: publish to timeline and telemetry when calling via p2p routing --- backend/runner/proxy/proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/runner/proxy/proxy.go b/backend/runner/proxy/proxy.go index 01fba2f04f..be82ff6f02 100644 --- a/backend/runner/proxy/proxy.go +++ b/backend/runner/proxy/proxy.go @@ -142,7 +142,7 @@ func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque callers, err := headers.GetCallers(req.Header()) if err != nil { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers")) - return nil, err + return nil, fmt.Errorf("could not get callers from headers: %w", err) } requestKey, ok, err := headers.GetRequestKey(req.Header()) From bdaaf3b4f9dc0a1e2e873b202b51a7066d89f553 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Tue, 10 Dec 2024 08:55:26 +1100 Subject: [PATCH 4/6] verb router does telemetry and timeline updates --- backend/runner/proxy/proxy.go | 3 +- internal/routing/verb_routing.go | 69 ++++++++++++++++++++++++++++---- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/backend/runner/proxy/proxy.go b/backend/runner/proxy/proxy.go index be82ff6f02..71a467d266 100644 --- a/backend/runner/proxy/proxy.go +++ b/backend/runner/proxy/proxy.go @@ -135,7 +135,8 @@ func (r *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque start := time.Now() verbService, ok := r.moduleVerbService[req.Msg.Verb.Module] if !ok { - return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found in runners route table: %s", req.Msg.Verb.Module)) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module")) + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found")) } timelineClient := timeline.ClientFromContext(ctx) diff --git a/internal/routing/verb_routing.go b/internal/routing/verb_routing.go index afd9898039..093ff72f30 100644 --- a/internal/routing/verb_routing.go +++ b/internal/routing/verb_routing.go @@ -3,15 +3,22 @@ package routing import ( "context" "fmt" + "time" "connectrpc.com/connect" "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/result" "github.com/puzpuzpuz/xsync/v3" + "github.com/TBD54566975/ftl/backend/controller/observability" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/backend/timeline" "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/rpc" + "github.com/TBD54566975/ftl/internal/rpc/headers" + "github.com/TBD54566975/ftl/internal/schema" "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) @@ -28,15 +35,51 @@ type VerbCallRouter struct { } func (s *VerbCallRouter) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { - client, ok := s.LookupClient(req.Msg.Verb.Module).Get() + start := time.Now() + timelineClient := timeline.ClientFromContext(ctx) + + client, deployment, ok := s.LookupClient(req.Msg.Verb.Module) if !ok { - return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("module not found")) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to find deployment for module")) + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("deployment not found")) + } + + callers, err := headers.GetCallers(req.Header()) + if err != nil { + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers")) + return nil, fmt.Errorf("could not get callers from headers: %w", err) } - call, err := client.Call(ctx, req) + + requestKey, ok, err := headers.GetRequestKey(req.Header()) if err != nil { - return nil, fmt.Errorf("failed to call module %s: %w", req.Msg.Verb.Module, err) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get request key")) + return nil, fmt.Errorf("could not process headers for request key: %w", err) + } else if !ok { + requestKey = model.NewRequestKey(model.OriginIngress, "grpc") + headers.SetRequestKey(req.Header(), requestKey) + } + + callEvent := &timeline.Call{ + DeploymentKey: deployment, + RequestKey: requestKey, + StartTime: start, + DestVerb: schema.RefFromProto(req.Msg.Verb), + Callers: callers, + Request: req.Msg, } - return call, nil + + originalResp, err := client.Call(ctx, req) + if err != nil { + callEvent.Response = result.Err[*ftlv1.CallResponse](err) + timelineClient.Publish(ctx, callEvent) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) + return nil, fmt.Errorf("failed to call %s: %w", callEvent.DestVerb, err) + } + resp := connect.NewResponse(originalResp.Msg) + callEvent.Response = result.Ok(resp.Msg) + timelineClient.Publish(ctx, callEvent) + observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]()) + return resp, nil } func NewVerbRouterFromTable(ctx context.Context, routeTable *RouteTable) *VerbCallRouter { @@ -61,13 +104,23 @@ func NewVerbRouter(ctx context.Context, changes schemaeventsource.EventSource) * return NewVerbRouterFromTable(ctx, New(ctx, changes)) } -func (s *VerbCallRouter) LookupClient(module string) optional.Option[ftlv1connect.VerbServiceClient] { +func (s *VerbCallRouter) LookupClient(module string) (client ftlv1connect.VerbServiceClient, deployment model.DeploymentKey, ok bool) { res, _ := s.moduleClients.LoadOrCompute(module, func() optional.Option[ftlv1connect.VerbServiceClient] { - route, ok := s.routingTable.Current().GetForModule(module).Get() + current := s.routingTable.Current() + var ok bool + deployment, ok = current.GetDeployment(module).Get() + if !ok { + return optional.None[ftlv1connect.VerbServiceClient]() + } + route, ok := current.Get(deployment).Get() if !ok { return optional.None[ftlv1connect.VerbServiceClient]() } return optional.Some[ftlv1connect.VerbServiceClient](rpc.Dial(ftlv1connect.NewVerbServiceClient, route.String(), log.Error)) }) - return res + client, ok = res.Get() + if !ok { + return nil, deployment, false + } + return client, deployment, true } From bbc9976dfae633e5d9a5b2a3a4c6dc3102aa2a1a Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Tue, 10 Dec 2024 09:22:02 +1100 Subject: [PATCH 5/6] Runner can talk to timeline --- backend/runner/runner.go | 4 ++++ charts/ftl/templates/runner.yaml | 2 ++ 2 files changed, 6 insertions(+) diff --git a/backend/runner/runner.go b/backend/runner/runner.go index 430f4d1809..afce59fab2 100644 --- a/backend/runner/runner.go +++ b/backend/runner/runner.go @@ -39,6 +39,7 @@ import ( "github.com/TBD54566975/ftl/backend/runner/observability" "github.com/TBD54566975/ftl/backend/runner/proxy" "github.com/TBD54566975/ftl/backend/runner/pubsub" + "github.com/TBD54566975/ftl/backend/timeline" "github.com/TBD54566975/ftl/common/plugin" "github.com/TBD54566975/ftl/internal/download" "github.com/TBD54566975/ftl/internal/dsn" @@ -59,6 +60,7 @@ type Config struct { Key model.RunnerKey `help:"Runner key (auto)."` ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"` LeaseEndpoint *url.URL `name:"ftl-lease-endpoint" help:"Lease endpoint endpoint." env:"FTL_LEASE_ENDPOINT" default:"http://127.0.0.1:8895"` + TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"` TemplateDir string `help:"Template directory to copy into each deployment, if any." type:"existingdir"` DeploymentDir string `help:"Directory to store deployments in." default:"${deploymentdir}"` DeploymentKeepHistory int `help:"Number of deployments to keep history for." default:"3"` @@ -85,6 +87,8 @@ func Start(ctx context.Context, config Config, storage *artefacts.OCIArtefactSer logger := log.FromContext(ctx).Attrs(map[string]string{"runner": config.Key.String()}) logger.Debugf("Starting FTL Runner") + ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, config.TimelineEndpoint)) + err = manageDeploymentDirectory(logger, config) if err != nil { observability.Runner.StartupFailed(ctx) diff --git a/charts/ftl/templates/runner.yaml b/charts/ftl/templates/runner.yaml index d6370fa0d9..8ee0a4b85f 100644 --- a/charts/ftl/templates/runner.yaml +++ b/charts/ftl/templates/runner.yaml @@ -66,6 +66,8 @@ data: value: "http://0.0.0.0:{{ (index .Values.runner.ports 0).containerPort }}" - name: FTL_LEASE_ENDPOINT value: http://ftl-lease:8892 + - name: FTL_TIMELINE_ENDPOINT + value: "http://{{ .Values.timeline.service.name }}:{{ .Values.timeline.service.port }}" - name: FTL_ARTEFACT_REGISTRY_ALLOW_INSECURE value: "{{ .Values.registry.allowInsecure }}" - name: FTL_ARTEFACT_REGISTRY_USERNAME From cb0661e7f7f0dda42cc192f475a8354027307834 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Tue, 10 Dec 2024 11:25:39 +1100 Subject: [PATCH 6/6] cron now has a timeline client --- cmd/ftl-cron/main.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/ftl-cron/main.go b/cmd/ftl-cron/main.go index dc8954717a..dfa4331f17 100644 --- a/cmd/ftl-cron/main.go +++ b/cmd/ftl-cron/main.go @@ -9,6 +9,7 @@ import ( "github.com/TBD54566975/ftl" "github.com/TBD54566975/ftl/backend/cron" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/backend/timeline" _ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota. "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/observability" @@ -35,11 +36,11 @@ func main() { err := observability.Init(ctx, false, "", "ftl-cron", ftl.Version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") - // ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, cli.TimelineEndpoint)) - schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.CronConfig.SchemaServiceEndpoint.String(), log.Error) eventSource := schemaeventsource.New(ctx, schemaClient) + ctx = timeline.ContextWithClient(ctx, timeline.NewClient(ctx, cli.CronConfig.TimelineEndpoint)) + routeManager := routing.NewVerbRouter(ctx, schemaeventsource.New(ctx, schemaClient)) err = cron.Start(ctx, eventSource, routeManager) kctx.FatalIfErrorf(err, "failed to start cron")