Skip to content

Commit

Permalink
feat: add ftl.call.requests and ftl.async_call.ms_to_complete (#2270
Browse files Browse the repository at this point in the history
)

Replaces the original toy `requests` metric with `ftl.call.requests` and
adds a new `ftl.async_call.ms_to_complete` histogram metric. Latency
attribute is bucketed by powers of 2, rather than 8 as async calls are,
because (1) direct verb calls should be faster and (2) this is a new
metric, so while we know less about realistic production values, it's
better to err on the side of more granularity.

```
InstrumentationScope ftl.call 

Metric #0
Descriptor:
     -> Name: ftl.call.requests
     -> Description: the number of times that the FTL controller receives a verb call request
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.call.run_time_ms.bucket: Str([8,16))
     -> ftl.call.verb.ref: Str(time.time)
     -> ftl.module.name: Str(time)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-08-06 19:50:37.269674 +0000 UTC
Timestamp: 2024-08-06 19:51:17.271013 +0000 UTC
Value: 1

NumberDataPoints #1
Data point attributes:
     -> ftl.call.run_time_ms.bucket: Str([32,64))
     -> ftl.call.verb.ref: Str(echo.echo)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-08-06 19:50:37.269674 +0000 UTC
Timestamp: 2024-08-06 19:51:17.271013 +0000 UTC
Value: 1

Metric #1
Descriptor:
     -> Name: ftl.async_call.ms_to_complete
     -> Description: duration in ms to complete a verb call
     -> Unit: ms
     -> DataType: Histogram
     -> AggregationTemporality: Cumulative

HistogramDataPoints #0
Data point attributes:
     -> ftl.call.verb.ref: Str(echo.echo)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-08-06 19:50:37.269677 +0000 UTC
Timestamp: 2024-08-06 19:51:17.271014 +0000 UTC
Count: 1
Sum: 32.000000
Min: 32.000000
Max: 32.000000
```

Sample output when a verb call fails:
```
     -> ftl.call.failure_mode: Str(invalid request: missing verb)
     -> ftl.call.run_time_ms.bucket: Str(<1)
     -> ftl.call.verb.ref: Str(echo.echo)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(false)
```

Issue: #2194

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
deniseli and github-actions[bot] authored Aug 6, 2024
1 parent 8fcbd33 commit a3151c5
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 14 deletions.
28 changes: 14 additions & 14 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -966,26 +963,18 @@ func (s *Service) callWithRequest(
) (*connect.Response[ftlv1.CallResponse], error) {
start := time.Now()

logger := log.FromContext(ctx)

requestCounter, err := otel.GetMeterProvider().Meter("ftl.call").Int64Counter(
"requests",
metric.WithDescription("Count of FTL verb calls via the controller"))
if err != nil {
logger.Errorf(err, "Failed to instrument otel metric `ftl.call.requests`")
} else {
requestCounter.Add(ctx, 1, metric.WithAttributes(attribute.String("ftl.module.name", req.Msg.Verb.Module), attribute.String("ftl.verb.name", req.Msg.Verb.Name)))
}

if req.Msg.Verb == nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: missing verb"))
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("verb is required"))
}
if req.Msg.Body == nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: missing body"))
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required"))
}

sch, err := s.dal.GetActiveSchema(ctx)
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("schema retrieval failed"))
return nil, err
}

Expand All @@ -994,32 +983,38 @@ func (s *Service) callWithRequest(

if err = sch.ResolveToType(verbRef, verb); err != nil {
if errors.Is(err, schema.ErrNotFound) {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb not found"))
return nil, connect.NewError(connect.CodeNotFound, err)
}
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb resolution failed"))
return nil, err
}

err = ingress.ValidateCallBody(req.Msg.Body, verb, sch)
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: invalid call body"))
return nil, err
}

module := verbRef.Module
routes, ok := s.routes.Load()[module]
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
}
route := routes[rand.Intn(len(routes))] //nolint:gosec
client := s.clientsForRunner(route.Runner, route.Endpoint)

callers, err := headers.GetCallers(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get callers"))
return nil, err
}

if !verb.IsExported() {
for _, caller := range callers {
if caller.Module != module {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("invalid request: verb not exported"))
return nil, connect.NewError(connect.CodePermissionDenied, fmt.Errorf("verb %q is not exported", verbRef))
}
}
Expand All @@ -1033,6 +1028,7 @@ func (s *Service) callWithRequest(
} else {
k, ok, err := headers.GetRequestKey(req.Header())
if err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to get request key"))
return nil, err
} else if !ok {
requestKey = model.NewRequestKey(model.OriginIngress, "grpc")
Expand All @@ -1045,6 +1041,7 @@ func (s *Service) callWithRequest(
if isNewRequestKey {
headers.SetRequestKey(req.Header(), requestKey)
if err = s.dal.CreateRequest(ctx, requestKey, sourceAddress); err != nil {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("failed to create request"))
return nil, err
}
}
Expand All @@ -1059,6 +1056,9 @@ func (s *Service) callWithRequest(
if err == nil {
resp = connect.NewResponse(response.Msg)
maybeResponse = optional.Some(resp.Msg)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.None[string]())
} else {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
}
s.recordCall(ctx, &Call{
deploymentKey: route.Deployment,
Expand Down
72 changes: 72 additions & 0 deletions backend/controller/observability/calls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package observability

import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
callMeterName = "ftl.call"
callVerbRefAttr = "ftl.call.verb.ref"
callFailureModeAttr = "ftl.call.failure_mode"
callRunTimeBucketAttr = "ftl.call.run_time_ms.bucket"
)

type CallMetrics struct {
requests metric.Int64Counter
msToComplete metric.Int64Histogram
}

func initCallMetrics() (*CallMetrics, error) {
result := &CallMetrics{
requests: noop.Int64Counter{},
msToComplete: noop.Int64Histogram{},
}

var err error
meter := otel.Meter(callMeterName)

signalName := fmt.Sprintf("%s.requests", callMeterName)
if result.requests, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the FTL controller receives a verb call request")); err != nil {
return nil, wrapErr(signalName, err)
}

signalName = fmt.Sprintf("%s.ms_to_complete", asyncCallMeterName)
if result.msToComplete, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"),
metric.WithDescription("duration in ms to complete a verb call")); err != nil {
return nil, wrapErr(signalName, err)
}

return result, nil
}

func (m *CallMetrics) Request(ctx context.Context, verb *schemapb.Ref, startTime time.Time, maybeFailureMode optional.Option[string]) {
attrs := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, verb.Module),
attribute.String(callVerbRefAttr, schema.RefFromProto(verb).String()),
}

failureMode, ok := maybeFailureMode.Get()
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok))
if ok {
attrs = append(attrs, attribute.String(callFailureModeAttr, failureMode))
}

msToComplete := timeSinceMS(startTime)
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(callRunTimeBucketAttr, logBucket(2, msToComplete)))
m.requests.Add(ctx, 1, metric.WithAttributes(attrs...))
}
3 changes: 3 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

var (
AsyncCalls *AsyncCallMetrics
Calls *CallMetrics
Deployment *DeploymentMetrics
FSM *FSMMetrics
PubSub *PubSubMetrics
Expand All @@ -23,6 +24,8 @@ func init() {

AsyncCalls, err = initAsyncCallMetrics()
errs = errors.Join(errs, err)
Calls, err = initCallMetrics()
errs = errors.Join(errs, err)
Deployment, err = initDeploymentMetrics()
errs = errors.Join(errs, err)
FSM, err = initFSMMetrics()
Expand Down

0 comments on commit a3151c5

Please sign in to comment.