Skip to content

Commit

Permalink
feat: otel metrics for ingress (#2310)
Browse files Browse the repository at this point in the history
```
InstrumentationScope ftl.ingress 

Metric #0
Descriptor:
     -> Name: ftl.ingress.requests
     -> Description: the number of ingress requests that the FTL controller receives
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.ingress.method: Str(GET)
     -> ftl.ingress.path: Str(/http/echo)
     -> ftl.ingress.run_time_ms.bucket: Str([16,32))
     -> ftl.ingress.verb.ref: Str(echo.getEcho)
     -> ftl.module.name: Str(echo)
StartTimestamp: 2024-08-12 17:44:51.923107 +0000 UTC
Timestamp: 2024-08-12 17:45:26.923594 +0000 UTC
Value: 1

Metric #1
Descriptor:
     -> Name: ftl.ingress.ms_to_complete
     -> Description: duration in ms to complete an ingress request
     -> Unit: ms
     -> DataType: Histogram
     -> AggregationTemporality: Cumulative

HistogramDataPoints #0
Data point attributes:
     -> ftl.ingress.method: Str(GET)
     -> ftl.ingress.path: Str(/http/echo)
     -> ftl.ingress.verb.ref: Str(echo.getEcho)
     -> ftl.module.name: Str(echo)
StartTimestamp: 2024-08-12 17:44:51.92311 +0000 UTC
Timestamp: 2024-08-12 17:45:26.923622 +0000 UTC
Count: 1
Sum: 27.000000
Min: 27.000000
Max: 27.000000
```

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and gak committed Aug 13, 2024
1 parent 667988a commit 0284ec4
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 4 deletions.
7 changes: 6 additions & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,22 +301,27 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling
}

func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()

routes, err := s.dal.GetIngressRoutes(r.Context(), r.Method)
if err != nil {
if errors.Is(err, dalerrs.ErrNotFound) {
http.NotFound(w, r)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal"))
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("failed to resolve route from dal"))
return
}
sch, err := s.dal.GetActiveSchema(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("could not get active schema"))
return
}
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(sch, requestKey, routes, w, r, s.callWithRequest)
ingress.Handle(start, sch, requestKey, routes, w, r, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down
21 changes: 19 additions & 2 deletions backend/controller/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"encoding/json"
"errors"
"net/http"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/observability"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand All @@ -20,6 +22,7 @@ import (

// Handle HTTP ingress routes.
func Handle(
startTime time.Time,
sch *schema.Schema,
requestKey model.RequestKey,
routes []dal.IngressRoute,
Expand All @@ -33,24 +36,29 @@ func Handle(
if err != nil {
if errors.Is(err, dalerrs.ErrNotFound) {
http.NotFound(w, r)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), startTime, optional.Some("route not found"))
return
}
logger.Errorf(err, "failed to resolve route for %s %s", r.Method, r.URL.Path)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), startTime, optional.Some("failed to resolve route"))
return
}

verbRef := &schemapb.Ref{Module: route.Module, Name: route.Verb}

body, err := BuildRequestBody(route, r, sch)
if err != nil {
// Only log at debug, as this is a client side error
logger.Debugf("bad request: %s", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request"))
return
}

creq := connect.NewRequest(&ftlv1.CallRequest{
Metadata: &ftlv1.Metadata{},
Verb: &schemapb.Ref{Module: route.Module, Name: route.Verb},
Verb: verbRef,
Body: body,
})

Expand All @@ -60,8 +68,10 @@ func Handle(
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
httpCode := connectCodeToHTTP(connectErr.Code())
http.Error(w, http.StatusText(httpCode), httpCode)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: connect error"))
} else {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: internal server error"))
}
return
}
Expand All @@ -72,6 +82,7 @@ func Handle(
if err != nil {
logger.Errorf(err, "could not resolve schema type for verb %s", route.Verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not resolve schema type for verb"))
return
}
var responseBody []byte
Expand All @@ -81,6 +92,7 @@ func Handle(
if err := json.Unmarshal(msg.Body, &response); err != nil {
logger.Errorf(err, "could not unmarhal response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not unmarhal response for verb"))
return
}

Expand All @@ -89,6 +101,7 @@ func Handle(
if err != nil {
logger.Errorf(err, "could not create response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not create response for verb"))
return
}

Expand All @@ -105,12 +118,16 @@ func Handle(
responseBody = msg.Body
}
_, err = w.Write(responseBody)
if err != nil {
if err == nil {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]())
} else {
logger.Errorf(err, "Could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body"))
}

case *ftlv1.CallResponse_Error_:
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("call response: internal server error"))
}
}

Expand Down
3 changes: 2 additions & 1 deletion backend/controller/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/assert/v2"
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestIngress(t *testing.T) {
req := httptest.NewRequest(test.method, test.path, bytes.NewBuffer(test.payload)).WithContext(ctx)
req.URL.RawQuery = test.query.Encode()
reqKey := model.NewRequestKey(model.OriginIngress, "test")
ingress.Handle(sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
ingress.Handle(time.Now(), sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
body, err := encoding.Marshal(response)
assert.NoError(t, err)
return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: body}}), nil
Expand Down
76 changes: 76 additions & 0 deletions backend/controller/observability/ingress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package observability

import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
ingressMeterName = "ftl.ingress"
ingressMethodAttr = "ftl.ingress.method"
ingressPathAttr = "ftl.ingress.path"
ingressVerbRefAttr = "ftl.ingress.verb.ref"
ingressFailureModeAttr = "ftl.ingress.failure_mode"
ingressRunTimeBucketAttr = "ftl.ingress.run_time_ms.bucket"
)

type IngressMetrics struct {
requests metric.Int64Counter
msToComplete metric.Int64Histogram
}

func initIngressMetrics() (*IngressMetrics, error) {
result := &IngressMetrics{
requests: noop.Int64Counter{},
msToComplete: noop.Int64Histogram{},
}

var err error
meter := otel.Meter(ingressMeterName)

signalName := fmt.Sprintf("%s.requests", ingressMeterName)
if result.requests, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of ingress requests that the FTL controller receives")); err != nil {
return nil, wrapErr(signalName, err)
}

signalName = fmt.Sprintf("%s.ms_to_complete", ingressMeterName)
if result.msToComplete, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"),
metric.WithDescription("duration in ms to complete an ingress request")); err != nil {
return nil, wrapErr(signalName, err)
}

return result, nil
}

func (m *IngressMetrics) Request(ctx context.Context, method string, path string, verb optional.Option[*schemapb.Ref], startTime time.Time, failureMode optional.Option[string]) {
attrs := []attribute.KeyValue{
attribute.String(ingressMethodAttr, method),
attribute.String(ingressPathAttr, path),
}
if v, ok := verb.Get(); ok {
attrs = append(attrs,
attribute.String(observability.ModuleNameAttribute, v.Module),
attribute.String(ingressVerbRefAttr, schema.RefFromProto(v).String()))
}
if f, ok := failureMode.Get(); ok {
attrs = append(attrs, attribute.String(ingressFailureModeAttr, f))
}

msToComplete := timeSinceMS(startTime)
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(ingressRunTimeBucketAttr, logBucket(2, msToComplete)))
m.requests.Add(ctx, 1, metric.WithAttributes(attrs...))
}
3 changes: 3 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var (
Calls *CallMetrics
Deployment *DeploymentMetrics
FSM *FSMMetrics
Ingress *IngressMetrics
PubSub *PubSubMetrics
Cron *CronMetrics
)
Expand All @@ -28,6 +29,8 @@ func init() {
errs = errors.Join(errs, err)
FSM, err = initFSMMetrics()
errs = errors.Join(errs, err)
Ingress, err = initIngressMetrics()
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)
Cron, err = initCronMetrics()
Expand Down

0 comments on commit 0284ec4

Please sign in to comment.