Skip to content

Commit

Permalink
call and fsm metric instrumentation (still WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanj-square committed Jul 22, 2024
1 parent 6b0bd5e commit 73487ab
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 123 deletions.
24 changes: 15 additions & 9 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/TBD54566975/ftl/backend/controller/observability"
"hash"
"io"
"math/rand"
Expand Down Expand Up @@ -1005,22 +1006,27 @@ func (s *Service) callWithRequest(
ctx = rpc.WithVerbs(ctx, append(callers, verbRef))
headers.AddCaller(req.Header(), schema.RefFromProto(req.Msg.Verb))

observability.RecordCallBegin(ctx, &observability.CallBegin{
DestVerb: verbRef,
Callers: callers,
})

response, err := client.verb.Call(ctx, req)
var resp *connect.Response[ftlv1.CallResponse]
var maybeResponse optional.Option[*ftlv1.CallResponse]
if err == nil {
resp = connect.NewResponse(response.Msg)
maybeResponse = optional.Some(resp.Msg)
}
s.recordCall(ctx, &Call{
deploymentKey: route.Deployment,
requestKey: requestKey,
startTime: start,
destVerb: verbRef,
callers: callers,
callError: optional.Nil(err),
request: req.Msg,
response: maybeResponse,
observability.RecordCallEnd(ctx, s.dal, &observability.CallEnd{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
StartTime: start,
DestVerb: verbRef,
Callers: callers,
CallError: optional.Nil(err),
Request: req.Msg,
Response: maybeResponse,
})
return resp, err
}
Expand Down
5 changes: 5 additions & 0 deletions backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/TBD54566975/ftl/backend/controller/observability"
"time"

"github.com/alecthomas/types/optional"
Expand Down Expand Up @@ -57,11 +58,15 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi
}
return fmt.Errorf("failed to start FSM transition: %w", err)
}

observability.RecordFsmTransitionBegin(ctx, fsm)

return nil
}

func (d *DAL) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string) error {
_, err := d.db.FinishFSMTransition(ctx, fsm, instanceKey)
observability.RecordFsmTransitionSuccess(ctx, fsm)
return dalerrs.TranslatePGError(err)
}

Expand Down
114 changes: 0 additions & 114 deletions backend/controller/observability.go

This file was deleted.

207 changes: 207 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package observability

import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"time"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal"

Check failure on line 12 in backend/controller/observability/observability.go

View workflow job for this annotation

GitHub Actions / Lint

could not import github.com/TBD54566975/ftl/backend/controller/dal (-: import cycle not allowed: import stack: [github.com/TBD54566975/ftl/backend/controller github.com/TBD54566975/ftl/backend/controller/cronjobs github.com/TBD54566975/ftl/backend/controller/dal github.com/TBD54566975/ftl/backend/controller/observability github.com/TBD54566975/ftl/backend/controller/dal]) (typecheck)
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)

const name = "ftl.xyz/ftl/runner"

type metricAttributeBuilders struct {
moduleName func(name string) attribute.KeyValue
featureName func(name string) attribute.KeyValue
destinationVerb func(name string) attribute.KeyValue
}

type callMetrics struct {
requests metric.Int64Counter
failures metric.Int64Counter
active metric.Int64UpDownCounter
latency metric.Int64Histogram
}

type fsmMetrics struct {
active metric.Int64UpDownCounter
transitions metric.Int64Counter
failures metric.Int64Counter
}

type observableMetrics struct {
meter metric.Meter
attributes metricAttributeBuilders
calls *callMetrics
fsm *fsmMetrics
}

var (
metrics = observableMetrics{
meter: otel.Meter(name),
// TODO: move to a initialization method
attributes: metricAttributeBuilders{
moduleName: func(name string) attribute.KeyValue {
return attribute.String("ftl.module.name", name)
},
featureName: func(name string) attribute.KeyValue {
return attribute.String("ftl.feature.name", name)
},
destinationVerb: func(name string) attribute.KeyValue {
return attribute.String("ftl.verb.dest", name)
},
},
calls: &callMetrics{},
fsm: &fsmMetrics{},
}
)

type CallBegin struct {
DestVerb *schema.Ref
Callers []*schema.Ref
}

type CallEnd struct {
DeploymentKey model.DeploymentKey
RequestKey model.RequestKey
StartTime time.Time
DestVerb *schema.Ref
Callers []*schema.Ref
Request *ftlv1.CallRequest
Response optional.Option[*ftlv1.CallResponse]
CallError optional.Option[error]
}

func init() {
metrics.calls.requests, _ = metrics.meter.Int64Counter("ftl.call.requests",
metric.WithDescription("number of verb calls"),
metric.WithUnit("{count}"))

metrics.calls.failures, _ = metrics.meter.Int64Counter("ftl.call.failures",
metric.WithDescription("number of verb call failures"),
metric.WithUnit("{count}"))

metrics.calls.active, _ = metrics.meter.Int64UpDownCounter("ftl.call.active",
metric.WithDescription("number of in flight calls"),
metric.WithUnit("{count}"))

metrics.calls.latency, _ = metrics.meter.Int64Histogram("ftl.call.latency",
metric.WithDescription("verb call latency"),
metric.WithUnit("{ms}"))

metrics.fsm.active, _ = metrics.meter.Int64UpDownCounter("ftl.fsm.active",
metric.WithDescription("number of in flight fsm transitions"),
metric.WithUnit("{count}"))

metrics.fsm.transitions, _ = metrics.meter.Int64Counter("ftl.fsm.transitions",
metric.WithDescription("number of attempted transitions"),
metric.WithUnit("{count}"))

metrics.fsm.failures, _ = metrics.meter.Int64Counter("ftl.fsm.failures",
metric.WithDescription("number of fsm transition failures"),
metric.WithUnit("{count}"))

}

func RecordFsmTransitionBegin(ctx context.Context, fsm schema.RefKey) {
moduleAttr := metrics.attributes.moduleName(fsm.Module)
featureAttr := metrics.attributes.featureName(fsm.Name)

metrics.fsm.transitions.Add(ctx, 1, metric.WithAttributes(moduleAttr, featureAttr))
metrics.fsm.active.Add(ctx, 1, metric.WithAttributes(moduleAttr, featureAttr))
}

func RecordFsmTransitionSuccess(ctx context.Context, fsm schema.RefKey) {
moduleAttr := metrics.attributes.moduleName(fsm.Module)
featureAttr := metrics.attributes.featureName(fsm.Name)

metrics.fsm.active.Add(ctx, -1, metric.WithAttributes(moduleAttr, featureAttr))
}

func recordFsmTransitionFailure(ctx context.Context, fsm schema.RefKey) {
moduleAttr := metrics.attributes.moduleName(fsm.Module)
featureAttr := metrics.attributes.featureName(fsm.Name)

metrics.fsm.active.Add(ctx, -1, metric.WithAttributes(moduleAttr, featureAttr))
metrics.fsm.failures.Add(ctx, 1, metric.WithAttributes(moduleAttr, featureAttr))
}

func RecordCallBegin(ctx context.Context, call *CallBegin) {
var featureName attribute.KeyValue
var moduleName attribute.KeyValue
if len(call.Callers) > 0 {
featureName = metrics.attributes.featureName(call.Callers[0].Name)
moduleName = metrics.attributes.moduleName(call.Callers[0].Module)
} else {
featureName = metrics.attributes.featureName("unknown")
moduleName = metrics.attributes.moduleName("unknown")
}

destinationVerb := metrics.attributes.destinationVerb(call.DestVerb.Name)

metrics.calls.active.Add(ctx, 1, metric.WithAttributes(moduleName, featureName, destinationVerb))
}

func RecordCallEnd(ctx context.Context, d *dal.DAL, call *CallEnd) {
logger := log.FromContext(ctx)
var sourceVerb optional.Option[schema.Ref]
var featureName attribute.KeyValue
var moduleName attribute.KeyValue

// TODO avoid having to find the source (pass it in `CallEnd` and `CallStart` instead)
if len(call.Callers) > 0 {
sourceVerb = optional.Some(*call.Callers[0])
featureName = metrics.attributes.featureName(call.Callers[0].Name)
moduleName = metrics.attributes.moduleName(call.Callers[0].Module)
} else {
featureName = metrics.attributes.featureName("unknown")
moduleName = metrics.attributes.moduleName("unknown")
}

destinationVerb := metrics.attributes.destinationVerb(call.DestVerb.Name)

metrics.calls.requests.Add(ctx, 1, metric.WithAttributes(moduleName, featureName, destinationVerb))
metrics.calls.active.Add(ctx, -1, metric.WithAttributes(moduleName, featureName, destinationVerb))

var errorStr optional.Option[string]
var stack optional.Option[string]
var responseBody []byte

if callError, ok := call.CallError.Get(); ok {
errorStr = optional.Some(callError.Error())
metrics.calls.failures.Add(ctx, 1, metric.WithAttributes(moduleName, featureName, destinationVerb))
} else if response, ok := call.Response.Get(); ok {
responseBody = response.GetBody()
if callError := response.GetError(); callError != nil {
errorStr = optional.Some(callError.Message)
stack = optional.Ptr(callError.Stack)
metrics.calls.failures.Add(ctx, 1, metric.WithAttributes(moduleName, featureName, destinationVerb))
}
}

metrics.calls.latency.Record(ctx, time.Now().Sub(call.StartTime).Milliseconds(), metric.WithAttributes(moduleName, featureName, destinationVerb))

err := d.InsertCallEvent(ctx, &dal.CallEvent{
Time: call.StartTime,
DeploymentKey: call.DeploymentKey,
RequestKey: optional.Some(call.RequestKey),
Duration: time.Since(call.StartTime),
SourceVerb: sourceVerb,
DestVerb: *call.DestVerb,
Request: call.Request.GetBody(),
Response: responseBody,
Error: errorStr,
Stack: stack,
})
if err != nil {
logger.Errorf(err, "failed to record call")
}
}

0 comments on commit 73487ab

Please sign in to comment.