From 4570d939880b3de53781ee287af8dacbe58cf3ae Mon Sep 17 00:00:00 2001 From: Safeer Jiwan Date: Mon, 12 Aug 2024 14:13:31 -0700 Subject: [PATCH] feat: otel metrics for ingress (#2310) ``` InstrumentationScope ftl.ingress Metric #0 Descriptor: -> Name: ftl.ingress.requests -> Description: the number of ingress requests that the FTL controller receives -> Unit: 1 -> DataType: Sum -> IsMonotonic: true -> AggregationTemporality: Cumulative NumberDataPoints #0 Data point attributes: -> ftl.ingress.method: Str(GET) -> ftl.ingress.path: Str(/http/echo) -> ftl.ingress.run_time_ms.bucket: Str([16,32)) -> ftl.ingress.verb.ref: Str(echo.getEcho) -> ftl.module.name: Str(echo) StartTimestamp: 2024-08-12 17:44:51.923107 +0000 UTC Timestamp: 2024-08-12 17:45:26.923594 +0000 UTC Value: 1 Metric #1 Descriptor: -> Name: ftl.ingress.ms_to_complete -> Description: duration in ms to complete an ingress request -> Unit: ms -> DataType: Histogram -> AggregationTemporality: Cumulative HistogramDataPoints #0 Data point attributes: -> ftl.ingress.method: Str(GET) -> ftl.ingress.path: Str(/http/echo) -> ftl.ingress.verb.ref: Str(echo.getEcho) -> ftl.module.name: Str(echo) StartTimestamp: 2024-08-12 17:44:51.92311 +0000 UTC Timestamp: 2024-08-12 17:45:26.923622 +0000 UTC Count: 1 Sum: 27.000000 Min: 27.000000 Max: 27.000000 ``` --------- Co-authored-by: github-actions[bot] --- backend/controller/controller.go | 7 +- backend/controller/ingress/handler.go | 21 ++++- backend/controller/ingress/handler_test.go | 3 +- backend/controller/observability/ingress.go | 76 +++++++++++++++++++ .../controller/observability/observability.go | 3 + 5 files changed, 106 insertions(+), 4 deletions(-) create mode 100644 backend/controller/observability/ingress.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index fa329ead78..0ef15118de 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -339,22 +339,27 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling } func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + start := time.Now() + routes, err := s.dal.GetIngressRoutes(r.Context(), r.Method) if err != nil { if errors.Is(err, dalerrs.ErrNotFound) { http.NotFound(w, r) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal")) return } http.Error(w, err.Error(), http.StatusInternalServerError) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("failed to resolve route from dal")) return } sch, err := s.dal.GetActiveSchema(r.Context()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("could not get active schema")) return } requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path)) - ingress.Handle(sch, requestKey, routes, w, r, s.callWithRequest) + ingress.Handle(start, sch, requestKey, routes, w, r, s.callWithRequest) } func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) { diff --git a/backend/controller/ingress/handler.go b/backend/controller/ingress/handler.go index 3201bb9767..47207dacc0 100644 --- a/backend/controller/ingress/handler.go +++ b/backend/controller/ingress/handler.go @@ -5,11 +5,13 @@ import ( "encoding/json" "errors" "net/http" + "time" "connectrpc.com/connect" "github.com/alecthomas/types/optional" "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/observability" dalerrs "github.com/TBD54566975/ftl/backend/dal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" @@ -20,6 +22,7 @@ import ( // Handle HTTP ingress routes. func Handle( + startTime time.Time, sch *schema.Schema, requestKey model.RequestKey, routes []dal.IngressRoute, @@ -33,24 +36,29 @@ func Handle( if err != nil { if errors.Is(err, dalerrs.ErrNotFound) { http.NotFound(w, r) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), startTime, optional.Some("route not found")) return } logger.Errorf(err, "failed to resolve route for %s %s", r.Method, r.URL.Path) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), startTime, optional.Some("failed to resolve route")) return } + verbRef := &schemapb.Ref{Module: route.Module, Name: route.Verb} + body, err := BuildRequestBody(route, r, sch) if err != nil { // Only log at debug, as this is a client side error logger.Debugf("bad request: %s", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request")) return } creq := connect.NewRequest(&ftlv1.CallRequest{ Metadata: &ftlv1.Metadata{}, - Verb: &schemapb.Ref{Module: route.Module, Name: route.Verb}, + Verb: verbRef, Body: body, }) @@ -60,8 +68,10 @@ func Handle( if connectErr := new(connect.Error); errors.As(err, &connectErr) { httpCode := connectCodeToHTTP(connectErr.Code()) http.Error(w, http.StatusText(httpCode), httpCode) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: connect error")) } else { http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: internal server error")) } return } @@ -72,6 +82,7 @@ func Handle( if err != nil { logger.Errorf(err, "could not resolve schema type for verb %s", route.Verb) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not resolve schema type for verb")) return } var responseBody []byte @@ -81,6 +92,7 @@ func Handle( if err := json.Unmarshal(msg.Body, &response); err != nil { logger.Errorf(err, "could not unmarhal response for verb %s", verb) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not unmarhal response for verb")) return } @@ -89,6 +101,7 @@ func Handle( if err != nil { logger.Errorf(err, "could not create response for verb %s", verb) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not create response for verb")) return } @@ -105,12 +118,16 @@ func Handle( responseBody = msg.Body } _, err = w.Write(responseBody) - if err != nil { + if err == nil { + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]()) + } else { logger.Errorf(err, "Could not write response body") + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body")) } case *ftlv1.CallResponse_Error_: http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("call response: internal server error")) } } diff --git a/backend/controller/ingress/handler_test.go b/backend/controller/ingress/handler_test.go index 527992c04d..7251e88860 100644 --- a/backend/controller/ingress/handler_test.go +++ b/backend/controller/ingress/handler_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "net/url" "testing" + "time" "connectrpc.com/connect" "github.com/alecthomas/assert/v2" @@ -99,7 +100,7 @@ func TestIngress(t *testing.T) { req := httptest.NewRequest(test.method, test.path, bytes.NewBuffer(test.payload)).WithContext(ctx) req.URL.RawQuery = test.query.Encode() reqKey := model.NewRequestKey(model.OriginIngress, "test") - ingress.Handle(sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) { + ingress.Handle(time.Now(), sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) { body, err := encoding.Marshal(response) assert.NoError(t, err) return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: body}}), nil diff --git a/backend/controller/observability/ingress.go b/backend/controller/observability/ingress.go new file mode 100644 index 0000000000..ebae91ad80 --- /dev/null +++ b/backend/controller/observability/ingress.go @@ -0,0 +1,76 @@ +package observability + +import ( + "context" + "fmt" + "time" + + "github.com/alecthomas/types/optional" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + + schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/observability" +) + +const ( + ingressMeterName = "ftl.ingress" + ingressMethodAttr = "ftl.ingress.method" + ingressPathAttr = "ftl.ingress.path" + ingressVerbRefAttr = "ftl.ingress.verb.ref" + ingressFailureModeAttr = "ftl.ingress.failure_mode" + ingressRunTimeBucketAttr = "ftl.ingress.run_time_ms.bucket" +) + +type IngressMetrics struct { + requests metric.Int64Counter + msToComplete metric.Int64Histogram +} + +func initIngressMetrics() (*IngressMetrics, error) { + result := &IngressMetrics{ + requests: noop.Int64Counter{}, + msToComplete: noop.Int64Histogram{}, + } + + var err error + meter := otel.Meter(ingressMeterName) + + signalName := fmt.Sprintf("%s.requests", ingressMeterName) + if result.requests, err = meter.Int64Counter(signalName, metric.WithUnit("1"), + metric.WithDescription("the number of ingress requests that the FTL controller receives")); err != nil { + return nil, wrapErr(signalName, err) + } + + signalName = fmt.Sprintf("%s.ms_to_complete", ingressMeterName) + if result.msToComplete, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"), + metric.WithDescription("duration in ms to complete an ingress request")); err != nil { + return nil, wrapErr(signalName, err) + } + + return result, nil +} + +func (m *IngressMetrics) Request(ctx context.Context, method string, path string, verb optional.Option[*schemapb.Ref], startTime time.Time, failureMode optional.Option[string]) { + attrs := []attribute.KeyValue{ + attribute.String(ingressMethodAttr, method), + attribute.String(ingressPathAttr, path), + } + if v, ok := verb.Get(); ok { + attrs = append(attrs, + attribute.String(observability.ModuleNameAttribute, v.Module), + attribute.String(ingressVerbRefAttr, schema.RefFromProto(v).String())) + } + if f, ok := failureMode.Get(); ok { + attrs = append(attrs, attribute.String(ingressFailureModeAttr, f)) + } + + msToComplete := timeSinceMS(startTime) + m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...)) + + attrs = append(attrs, attribute.String(ingressRunTimeBucketAttr, logBucket(2, msToComplete))) + m.requests.Add(ctx, 1, metric.WithAttributes(attrs...)) +} diff --git a/backend/controller/observability/observability.go b/backend/controller/observability/observability.go index ca952c2fb7..57851b0e0e 100644 --- a/backend/controller/observability/observability.go +++ b/backend/controller/observability/observability.go @@ -12,6 +12,7 @@ var ( Calls *CallMetrics Deployment *DeploymentMetrics FSM *FSMMetrics + Ingress *IngressMetrics PubSub *PubSubMetrics Cron *CronMetrics ) @@ -28,6 +29,8 @@ func init() { errs = errors.Join(errs, err) FSM, err = initFSMMetrics() errs = errors.Join(errs, err) + Ingress, err = initIngressMetrics() + errs = errors.Join(errs, err) PubSub, err = initPubSubMetrics() errs = errors.Join(errs, err) Cron, err = initCronMetrics()