From 916027b94b3046ac3795cf09e515b5d33a20d934 Mon Sep 17 00:00:00 2001 From: Denise Li Date: Tue, 30 Jul 2024 15:47:49 -0400 Subject: [PATCH 1/3] init temp cleanup break dep cycle --- backend/controller/controller.go | 9 ++ .../controller/observability/async_calls.go | 102 ++++++++++++++++++ .../controller/observability/observability.go | 7 +- 3 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 backend/controller/observability/async_calls.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index e1f13c77ab..dc0f834731 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -43,6 +43,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/ingress" "github.com/TBD54566975/ftl/backend/controller/leases" + "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/pubsub" "github.com/TBD54566975/ftl/backend/controller/scaling" "github.com/TBD54566975/ftl/backend/controller/scaling/localscaling" @@ -1327,8 +1328,10 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) logger.Tracef("No async calls to execute") return time.Second * 2, nil } else if err != nil { + observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err) return 0, err } + observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil) defer call.Release() //nolint:errcheck logger = logger.Scope(fmt.Sprintf("%s:%s", call.Origin, call.Verb)) @@ -1343,15 +1346,18 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) failed := false if err != nil { logger.Warnf("Async call could not be called: %v", err) + observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.Some("async call could not be called")) callResult = either.RightOf[[]byte](err.Error()) failed = true } else if perr := resp.Msg.GetError(); perr != nil { logger.Warnf("Async call failed: %s", perr.Message) + observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.Some("async call failed")) callResult = either.RightOf[[]byte](perr.Message) failed = true } else { logger.Debugf("Async call succeeded") callResult = either.LeftOf[string](resp.Msg.GetBody()) + observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.None[string]()) } err = s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx) error { if failed && call.RemainingAttempts > 0 { @@ -1371,8 +1377,11 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) } }) if err != nil { + // failed + observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err) return 0, fmt.Errorf("failed to complete async call: %w", err) } + observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil) go func() { // Post-commit notification based on origin switch origin := call.Origin.(type) { diff --git a/backend/controller/observability/async_calls.go b/backend/controller/observability/async_calls.go new file mode 100644 index 0000000000..0cc610cec3 --- /dev/null +++ b/backend/controller/observability/async_calls.go @@ -0,0 +1,102 @@ +package observability + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/observability" + "github.com/alecthomas/types/optional" +) + +const ( + asyncCallMeterName = "ftl.async_call" + asyncCallOriginAttr = "ftl.async_call.origin" + asyncCallVerbRefAttr = "ftl.async_call.verb.ref" + asyncCallTimeSinceScheduledAtAttr = "ftl.async_call.time_since_scheduled_at_ms" + asyncCallExecFailureModeAttr = "ftl.async_call.execution.failure_mode" +) + +type AsyncCallMetrics struct { + meter metric.Meter + acquired metric.Int64Counter + executed metric.Int64Counter + completed metric.Int64Counter +} + +func initAsyncCallMetrics() (*AsyncCallMetrics, error) { + result := &AsyncCallMetrics{} + var errs error + var err error + + result.meter = otel.Meter(asyncCallMeterName) + + counterName := fmt.Sprintf("%s.acquired", asyncCallMeterName) + if result.acquired, err = result.meter.Int64Counter( + counterName, + metric.WithUnit("1"), + metric.WithDescription("the number of times that the controller tries acquiring an async call")); err != nil { + errs = handleInitCounterError(errs, err, counterName) + result.acquired = noop.Int64Counter{} + } + + counterName = fmt.Sprintf("%s.executed", asyncCallMeterName) + if result.executed, err = result.meter.Int64Counter( + counterName, + metric.WithUnit("1"), + metric.WithDescription("the number of times that the controller tries executing an async call")); err != nil { + errs = handleInitCounterError(errs, err, counterName) + result.executed = noop.Int64Counter{} + } + + counterName = fmt.Sprintf("%s.completed", asyncCallMeterName) + if result.completed, err = result.meter.Int64Counter( + counterName, + metric.WithUnit("1"), + metric.WithDescription("the number of times that the controller tries completing an async call")); err != nil { + errs = handleInitCounterError(errs, err, counterName) + result.completed = noop.Int64Counter{} + } + + return result, errs +} + +func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) { + m.acquired.Add(ctx, 1, metric.WithAttributes(extractAsyncCallAndMaybeErrAttrs(verb, origin, scheduledAt, maybeErr)...)) +} + +func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeFailureMode optional.Option[string]) { + attrs := extractAsyncCallAttrs(verb, origin, scheduledAt) + + failureMode, ok := maybeFailureMode.Get() + attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok)) + if ok { + attrs = append(attrs, attribute.String(asyncCallExecFailureModeAttr, failureMode)) + } + + m.executed.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) { + m.completed.Add(ctx, 1, metric.WithAttributes(extractAsyncCallAndMaybeErrAttrs(verb, origin, scheduledAt, maybeErr)...)) +} + +func extractAsyncCallAndMaybeErrAttrs(verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) []attribute.KeyValue { + attrs := extractAsyncCallAttrs(verb, origin, scheduledAt) + return append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil)) +} + +func extractAsyncCallAttrs(verb schema.RefKey, origin string, scheduledAt time.Time) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String(observability.ModuleNameAttribute, verb.Module), + attribute.String(asyncCallVerbRefAttr, verb.String()), + attribute.String(asyncCallOriginAttr, origin), + attribute.Int64(asyncCallTimeSinceScheduledAtAttr, time.Since(scheduledAt).Milliseconds()), + } +} diff --git a/backend/controller/observability/observability.go b/backend/controller/observability/observability.go index da8165527b..ffc3830ca7 100644 --- a/backend/controller/observability/observability.go +++ b/backend/controller/observability/observability.go @@ -6,14 +6,17 @@ import ( ) var ( - FSM *FSMMetrics - PubSub *PubSubMetrics + AsyncCalls *AsyncCallMetrics + FSM *FSMMetrics + PubSub *PubSubMetrics ) func init() { var errs error var err error + AsyncCalls, err = initAsyncCallMetrics() + errs = errors.Join(errs, err) FSM, err = initFSMMetrics() errs = errors.Join(errs, err) PubSub, err = initPubSubMetrics() From ac9483677071659b7b44638cba0a263e1027eaf0 Mon Sep 17 00:00:00 2001 From: Denise Li Date: Tue, 30 Jul 2024 20:38:47 -0400 Subject: [PATCH 2/3] comment --- backend/controller/controller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index dc0f834731..ff872e8045 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -1377,7 +1377,6 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) } }) if err != nil { - // failed observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err) return 0, fmt.Errorf("failed to complete async call: %w", err) } From 90dc7096d22dcd26472844fdd549911ac3126e8c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 31 Jul 2024 00:48:08 +0000 Subject: [PATCH 3/3] chore(autofmt): Automated formatting --- backend/controller/observability/async_calls.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/controller/observability/async_calls.go b/backend/controller/observability/async_calls.go index 0cc610cec3..f85d04d0b0 100644 --- a/backend/controller/observability/async_calls.go +++ b/backend/controller/observability/async_calls.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/alecthomas/types/optional" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -12,7 +13,6 @@ import ( "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/observability" - "github.com/alecthomas/types/optional" ) const (