Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: async call otel metrics #2209

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scaling/localscaling"
Expand Down Expand Up @@ -1327,8 +1328,10 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
logger.Tracef("No async calls to execute")
return time.Second * 2, nil
} else if err != nil {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err)
return 0, err
}
observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil)
defer call.Release() //nolint:errcheck

logger = logger.Scope(fmt.Sprintf("%s:%s", call.Origin, call.Verb))
Expand All @@ -1343,15 +1346,18 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
failed := false
if err != nil {
logger.Warnf("Async call could not be called: %v", err)
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.Some("async call could not be called"))
callResult = either.RightOf[[]byte](err.Error())
failed = true
} else if perr := resp.Msg.GetError(); perr != nil {
logger.Warnf("Async call failed: %s", perr.Message)
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.Some("async call failed"))
callResult = either.RightOf[[]byte](perr.Message)
failed = true
} else {
logger.Debugf("Async call succeeded")
callResult = either.LeftOf[string](resp.Msg.GetBody())
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.None[string]())
}
err = s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx) error {
if failed && call.RemainingAttempts > 0 {
Expand All @@ -1371,8 +1377,10 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
}
})
if err != nil {
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err)
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil)
go func() {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
Expand Down
102 changes: 102 additions & 0 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package observability

import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
asyncCallMeterName = "ftl.async_call"
asyncCallOriginAttr = "ftl.async_call.origin"
asyncCallVerbRefAttr = "ftl.async_call.verb.ref"
asyncCallTimeSinceScheduledAtAttr = "ftl.async_call.time_since_scheduled_at_ms"
asyncCallExecFailureModeAttr = "ftl.async_call.execution.failure_mode"
)

type AsyncCallMetrics struct {
meter metric.Meter
acquired metric.Int64Counter
executed metric.Int64Counter
completed metric.Int64Counter
}

func initAsyncCallMetrics() (*AsyncCallMetrics, error) {
result := &AsyncCallMetrics{}
var errs error
var err error

result.meter = otel.Meter(asyncCallMeterName)

counterName := fmt.Sprintf("%s.acquired", asyncCallMeterName)
if result.acquired, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries acquiring an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.acquired = noop.Int64Counter{}
}

counterName = fmt.Sprintf("%s.executed", asyncCallMeterName)
if result.executed, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries executing an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.executed = noop.Int64Counter{}
}

counterName = fmt.Sprintf("%s.completed", asyncCallMeterName)
if result.completed, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries completing an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.completed = noop.Int64Counter{}
}

return result, errs
}

func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
m.acquired.Add(ctx, 1, metric.WithAttributes(extractAsyncCallAndMaybeErrAttrs(verb, origin, scheduledAt, maybeErr)...))
}

func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeFailureMode optional.Option[string]) {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)

failureMode, ok := maybeFailureMode.Get()
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok))
if ok {
attrs = append(attrs, attribute.String(asyncCallExecFailureModeAttr, failureMode))
}

m.executed.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
m.completed.Add(ctx, 1, metric.WithAttributes(extractAsyncCallAndMaybeErrAttrs(verb, origin, scheduledAt, maybeErr)...))
}

func extractAsyncCallAndMaybeErrAttrs(verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) []attribute.KeyValue {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)
return append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
}

func extractAsyncCallAttrs(verb schema.RefKey, origin string, scheduledAt time.Time) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, verb.Module),
attribute.String(asyncCallVerbRefAttr, verb.String()),
attribute.String(asyncCallOriginAttr, origin),
attribute.Int64(asyncCallTimeSinceScheduledAtAttr, time.Since(scheduledAt).Milliseconds()),
}
}
7 changes: 5 additions & 2 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import (
)

var (
FSM *FSMMetrics
PubSub *PubSubMetrics
AsyncCalls *AsyncCallMetrics
FSM *FSMMetrics
PubSub *PubSubMetrics
)

func init() {
var errs error
var err error

AsyncCalls, err = initAsyncCallMetrics()
errs = errors.Join(errs, err)
FSM, err = initFSMMetrics()
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
Expand Down
Loading