From 73487ab218bdd4f0d14d7d01d0cb5db2a5e500b5 Mon Sep 17 00:00:00 2001 From: Jon Johnson <113393155+jonathanj-square@users.noreply.github.com> Date: Mon, 22 Jul 2024 14:14:07 -0700 Subject: [PATCH] call and fsm metric instrumentation (still WIP) --- backend/controller/controller.go | 24 +- backend/controller/dal/fsm.go | 5 + backend/controller/observability.go | 114 ---------- .../controller/observability/observability.go | 207 ++++++++++++++++++ 4 files changed, 227 insertions(+), 123 deletions(-) delete mode 100644 backend/controller/observability.go create mode 100644 backend/controller/observability/observability.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 1672654903..057ef27d88 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -7,6 +7,7 @@ import ( "encoding/binary" "errors" "fmt" + "github.com/TBD54566975/ftl/backend/controller/observability" "hash" "io" "math/rand" @@ -1005,6 +1006,11 @@ func (s *Service) callWithRequest( ctx = rpc.WithVerbs(ctx, append(callers, verbRef)) headers.AddCaller(req.Header(), schema.RefFromProto(req.Msg.Verb)) + observability.RecordCallBegin(ctx, &observability.CallBegin{ + DestVerb: verbRef, + Callers: callers, + }) + response, err := client.verb.Call(ctx, req) var resp *connect.Response[ftlv1.CallResponse] var maybeResponse optional.Option[*ftlv1.CallResponse] @@ -1012,15 +1018,15 @@ func (s *Service) callWithRequest( resp = connect.NewResponse(response.Msg) maybeResponse = optional.Some(resp.Msg) } - s.recordCall(ctx, &Call{ - deploymentKey: route.Deployment, - requestKey: requestKey, - startTime: start, - destVerb: verbRef, - callers: callers, - callError: optional.Nil(err), - request: req.Msg, - response: maybeResponse, + observability.RecordCallEnd(ctx, s.dal, &observability.CallEnd{ + DeploymentKey: route.Deployment, + RequestKey: requestKey, + StartTime: start, + DestVerb: verbRef, + Callers: callers, + CallError: optional.Nil(err), + Request: req.Msg, + Response: maybeResponse, }) return resp, err } diff --git a/backend/controller/dal/fsm.go b/backend/controller/dal/fsm.go index a88f2c42ec..3463660928 100644 --- a/backend/controller/dal/fsm.go +++ b/backend/controller/dal/fsm.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/TBD54566975/ftl/backend/controller/observability" "time" "github.com/alecthomas/types/optional" @@ -57,11 +58,15 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi } return fmt.Errorf("failed to start FSM transition: %w", err) } + + observability.RecordFsmTransitionBegin(ctx, fsm) + return nil } func (d *DAL) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string) error { _, err := d.db.FinishFSMTransition(ctx, fsm, instanceKey) + observability.RecordFsmTransitionSuccess(ctx, fsm) return dalerrs.TranslatePGError(err) } diff --git a/backend/controller/observability.go b/backend/controller/observability.go deleted file mode 100644 index 1e81b3b9b5..0000000000 --- a/backend/controller/observability.go +++ /dev/null @@ -1,114 +0,0 @@ -package controller - -import ( - "context" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "time" - - "github.com/alecthomas/types/optional" - - "github.com/TBD54566975/ftl/backend/controller/dal" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" - "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/model" -) - -const name = "ftl.xyz/ftl/runner" - -var ( - meter = otel.Meter(name) - requestsMetric metric.Int64Counter - failureMetric metric.Int64Counter - latencyMetric metric.Int64Histogram - - moduleNameAttr = func(name string) attribute.KeyValue { - return attribute.String("ftl.module.name", name) - } - sourceVerbAttr = func(name string) attribute.KeyValue { - return attribute.String("ftl.verb.src", name) - } - destinationVerbAttr = func(name string) attribute.KeyValue { - return attribute.String("ftl.verb.dest", name) - } -) - -type Call struct { - deploymentKey model.DeploymentKey - requestKey model.RequestKey - startTime time.Time - destVerb *schema.Ref - callers []*schema.Ref - request *ftlv1.CallRequest - response optional.Option[*ftlv1.CallResponse] - callError optional.Option[error] -} - -func init() { - requestsMetric, _ = meter.Int64Counter("ftl.call.requests", - metric.WithDescription("number of verb calls"), - metric.WithUnit("{count}")) - - failureMetric, _ = meter.Int64Counter("ftl.call.failures", - metric.WithDescription("number of verb call failures"), - metric.WithUnit("{count}")) - - latencyMetric, _ = meter.Int64Histogram("ftl.call.latency", - metric.WithDescription("verb call latency"), - metric.WithUnit("{ms}")) -} - -func (s *Service) recordCall(ctx context.Context, call *Call) { - logger := log.FromContext(ctx) - var sourceVerb optional.Option[schema.Ref] - var srcAttr attribute.KeyValue - var moduleAttr attribute.KeyValue - if len(call.callers) > 0 { - sourceVerb = optional.Some(*call.callers[0]) - srcAttr = sourceVerbAttr(call.callers[0].Name) - moduleAttr = moduleNameAttr(call.callers[0].Module) - } else { - srcAttr = sourceVerbAttr("unknown") - moduleAttr = moduleNameAttr("unknown") - } - - destAttr := destinationVerbAttr(call.destVerb.Name) - - requestsMetric.Add(ctx, 1, metric.WithAttributes(moduleAttr, srcAttr, destAttr)) - - var errorStr optional.Option[string] - var stack optional.Option[string] - var responseBody []byte - - if callError, ok := call.callError.Get(); ok { - errorStr = optional.Some(callError.Error()) - failureMetric.Add(ctx, 1, metric.WithAttributes(moduleAttr, srcAttr, destAttr)) - } else if response, ok := call.response.Get(); ok { - responseBody = response.GetBody() - if callError := response.GetError(); callError != nil { - errorStr = optional.Some(callError.Message) - stack = optional.Ptr(callError.Stack) - failureMetric.Add(ctx, 1, metric.WithAttributes(moduleAttr, srcAttr, destAttr)) - } - } - - latencyMetric.Record(ctx, time.Now().Sub(call.startTime).Milliseconds(), metric.WithAttributes(moduleAttr, srcAttr, destAttr)) - - err := s.dal.InsertCallEvent(ctx, &dal.CallEvent{ - Time: call.startTime, - DeploymentKey: call.deploymentKey, - RequestKey: optional.Some(call.requestKey), - Duration: time.Since(call.startTime), - SourceVerb: sourceVerb, - DestVerb: *call.destVerb, - Request: call.request.GetBody(), - Response: responseBody, - Error: errorStr, - Stack: stack, - }) - if err != nil { - logger.Errorf(err, "failed to record call") - } -} diff --git a/backend/controller/observability/observability.go b/backend/controller/observability/observability.go new file mode 100644 index 0000000000..0e5fd3d39e --- /dev/null +++ b/backend/controller/observability/observability.go @@ -0,0 +1,207 @@ +package observability + +import ( + "context" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "time" + + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/backend/controller/dal" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" +) + +const name = "ftl.xyz/ftl/runner" + +type metricAttributeBuilders struct { + moduleName func(name string) attribute.KeyValue + featureName func(name string) attribute.KeyValue + destinationVerb func(name string) attribute.KeyValue +} + +type callMetrics struct { + requests metric.Int64Counter + failures metric.Int64Counter + active metric.Int64UpDownCounter + latency metric.Int64Histogram +} + +type fsmMetrics struct { + active metric.Int64UpDownCounter + transitions metric.Int64Counter + failures metric.Int64Counter +} + +type observableMetrics struct { + meter metric.Meter + attributes metricAttributeBuilders + calls *callMetrics + fsm *fsmMetrics +} + +var ( + metrics = observableMetrics{ + meter: otel.Meter(name), + // TODO: move to a initialization method + attributes: metricAttributeBuilders{ + moduleName: func(name string) attribute.KeyValue { + return attribute.String("ftl.module.name", name) + }, + featureName: func(name string) attribute.KeyValue { + return attribute.String("ftl.feature.name", name) + }, + destinationVerb: func(name string) attribute.KeyValue { + return attribute.String("ftl.verb.dest", name) + }, + }, + calls: &callMetrics{}, + fsm: &fsmMetrics{}, + } +) + +type CallBegin struct { + DestVerb *schema.Ref + Callers []*schema.Ref +} + +type CallEnd struct { + DeploymentKey model.DeploymentKey + RequestKey model.RequestKey + StartTime time.Time + DestVerb *schema.Ref + Callers []*schema.Ref + Request *ftlv1.CallRequest + Response optional.Option[*ftlv1.CallResponse] + CallError optional.Option[error] +} + +func init() { + metrics.calls.requests, _ = metrics.meter.Int64Counter("ftl.call.requests", + metric.WithDescription("number of verb calls"), + metric.WithUnit("{count}")) + + metrics.calls.failures, _ = metrics.meter.Int64Counter("ftl.call.failures", + metric.WithDescription("number of verb call failures"), + metric.WithUnit("{count}")) + + metrics.calls.active, _ = metrics.meter.Int64UpDownCounter("ftl.call.active", + metric.WithDescription("number of in flight calls"), + metric.WithUnit("{count}")) + + metrics.calls.latency, _ = metrics.meter.Int64Histogram("ftl.call.latency", + metric.WithDescription("verb call latency"), + metric.WithUnit("{ms}")) + + metrics.fsm.active, _ = metrics.meter.Int64UpDownCounter("ftl.fsm.active", + metric.WithDescription("number of in flight fsm transitions"), + metric.WithUnit("{count}")) + + metrics.fsm.transitions, _ = metrics.meter.Int64Counter("ftl.fsm.transitions", + metric.WithDescription("number of attempted transitions"), + metric.WithUnit("{count}")) + + metrics.fsm.failures, _ = metrics.meter.Int64Counter("ftl.fsm.failures", + metric.WithDescription("number of fsm transition failures"), + metric.WithUnit("{count}")) + +} + +func RecordFsmTransitionBegin(ctx context.Context, fsm schema.RefKey) { + moduleAttr := metrics.attributes.moduleName(fsm.Module) + featureAttr := metrics.attributes.featureName(fsm.Name) + + metrics.fsm.transitions.Add(ctx, 1, metric.WithAttributes(moduleAttr, featureAttr)) + metrics.fsm.active.Add(ctx, 1, metric.WithAttributes(moduleAttr, featureAttr)) +} + +func RecordFsmTransitionSuccess(ctx context.Context, fsm schema.RefKey) { + moduleAttr := metrics.attributes.moduleName(fsm.Module) + featureAttr := metrics.attributes.featureName(fsm.Name) + + metrics.fsm.active.Add(ctx, -1, metric.WithAttributes(moduleAttr, featureAttr)) +} + +func recordFsmTransitionFailure(ctx context.Context, fsm schema.RefKey) { + moduleAttr := metrics.attributes.moduleName(fsm.Module) + featureAttr := metrics.attributes.featureName(fsm.Name) + + metrics.fsm.active.Add(ctx, -1, metric.WithAttributes(moduleAttr, featureAttr)) + metrics.fsm.failures.Add(ctx, 1, metric.WithAttributes(moduleAttr, featureAttr)) +} + +func RecordCallBegin(ctx context.Context, call *CallBegin) { + var featureName attribute.KeyValue + var moduleName attribute.KeyValue + if len(call.Callers) > 0 { + featureName = metrics.attributes.featureName(call.Callers[0].Name) + moduleName = metrics.attributes.moduleName(call.Callers[0].Module) + } else { + featureName = metrics.attributes.featureName("unknown") + moduleName = metrics.attributes.moduleName("unknown") + } + + destinationVerb := metrics.attributes.destinationVerb(call.DestVerb.Name) + + metrics.calls.active.Add(ctx, 1, metric.WithAttributes(moduleName, featureName, destinationVerb)) +} + +func RecordCallEnd(ctx context.Context, d *dal.DAL, call *CallEnd) { + logger := log.FromContext(ctx) + var sourceVerb optional.Option[schema.Ref] + var featureName attribute.KeyValue + var moduleName attribute.KeyValue + + // TODO avoid having to find the source (pass it in `CallEnd` and `CallStart` instead) + if len(call.Callers) > 0 { + sourceVerb = optional.Some(*call.Callers[0]) + featureName = metrics.attributes.featureName(call.Callers[0].Name) + moduleName = metrics.attributes.moduleName(call.Callers[0].Module) + } else { + featureName = metrics.attributes.featureName("unknown") + moduleName = metrics.attributes.moduleName("unknown") + } + + destinationVerb := metrics.attributes.destinationVerb(call.DestVerb.Name) + + metrics.calls.requests.Add(ctx, 1, metric.WithAttributes(moduleName, featureName, destinationVerb)) + metrics.calls.active.Add(ctx, -1, metric.WithAttributes(moduleName, featureName, destinationVerb)) + + var errorStr optional.Option[string] + var stack optional.Option[string] + var responseBody []byte + + if callError, ok := call.CallError.Get(); ok { + errorStr = optional.Some(callError.Error()) + metrics.calls.failures.Add(ctx, 1, metric.WithAttributes(moduleName, featureName, destinationVerb)) + } else if response, ok := call.Response.Get(); ok { + responseBody = response.GetBody() + if callError := response.GetError(); callError != nil { + errorStr = optional.Some(callError.Message) + stack = optional.Ptr(callError.Stack) + metrics.calls.failures.Add(ctx, 1, metric.WithAttributes(moduleName, featureName, destinationVerb)) + } + } + + metrics.calls.latency.Record(ctx, time.Now().Sub(call.StartTime).Milliseconds(), metric.WithAttributes(moduleName, featureName, destinationVerb)) + + err := d.InsertCallEvent(ctx, &dal.CallEvent{ + Time: call.StartTime, + DeploymentKey: call.DeploymentKey, + RequestKey: optional.Some(call.RequestKey), + Duration: time.Since(call.StartTime), + SourceVerb: sourceVerb, + DestVerb: *call.DestVerb, + Request: call.Request.GetBody(), + Response: responseBody, + Error: errorStr, + Stack: stack, + }) + if err != nil { + logger.Errorf(err, "failed to record call") + } +}