Skip to content

Commit

Permalink
feat: deployment metrics instrumentation (#2224)
Browse files Browse the repository at this point in the history
Adds controller and runner deployment metrics.

* `ftl.deployments.controller.reconciliation.failures`
* `ftl.deployments.controller.reconciliations.active`
* `ftl.deployments.controller.replicas.added`
* `ftl.deployments.runner.failures`
* `ftl.deployments.runner.active`

Sample output captured here

---

```
ScopeMetrics #4
ScopeMetrics SchemaURL:
InstrumentationScope ftl.deployments.controller

Metric #0
Descriptor:
     -> Name: ftl.deployments.controller.reconciliations.active
     -> Description: the number of active deployment reconciliation tasks
     -> Unit:
     -> DataType: Sum
     -> IsMonotonic: false
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.deployment.key: Str(dpl-echo-4xty4b1iks6plwgj)
     -> ftl.module.name: Str(echo)
StartTimestamp: 2024-08-01 18:41:42.628275 +0000 UTC
Timestamp: 2024-08-01 18:42:07.631714 +0000 UTC
Value: 0

NumberDataPoints #1
Data point attributes:
     -> ftl.deployment.key: Str(dpl-time-4d23618ccc6mwce8)
     -> ftl.module.name: Str(time)
StartTimestamp: 2024-08-01 18:41:42.628275 +0000 UTC
Timestamp: 2024-08-01 18:42:07.631714 +0000 UTC
Value: 0

Metric #1
Descriptor:
     -> Name: ftl.deployments.controller.replicas.added
     -> Description: the number of runner replicas added (or removed) by the deployment reconciliation tasks
     -> Unit:
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.deployment.key: Str(dpl-echo-4xty4b1iks6plwgj)
     -> ftl.module.name: Str(echo)
StartTimestamp: 2024-08-01 18:41:42.628278 +0000 UTC
Timestamp: 2024-08-01 18:42:07.631714 +0000 UTC
Value: 1

NumberDataPoints #1
Data point attributes:
     -> ftl.deployment.key: Str(dpl-time-4d23618ccc6mwce8)
     -> ftl.module.name: Str(time)
StartTimestamp: 2024-08-01 18:41:42.628278 +0000 UTC
Timestamp: 2024-08-01 18:42:07.631714 +0000 UTC
Value: 1
```

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jonathanj-square and github-actions[bot] authored Aug 2, 2024
1 parent 63446b8 commit a5bb4b9
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 13 deletions.
11 changes: 11 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,9 @@ func (s *Service) reconcileDeployments(ctx context.Context) (time.Duration, erro
if require > 0 {
deploymentLogger.Debugf("Need %d more runners for %s", require, reconcile.Deployment)
wg.Go(func(ctx context.Context) error {
observability.Deployment.ReconciliationStart(ctx, reconcile.Module, reconcile.Deployment.String())
defer observability.Deployment.ReconciliationComplete(ctx, reconcile.Module, reconcile.Deployment.String())

if err := s.deploy(ctx, deployment); err != nil {
lock.Lock()
failureCount := s.increaseReplicaFailures[deployment.Key.String()] + 1
Expand All @@ -1237,6 +1240,7 @@ func (s *Service) reconcileDeployments(ctx context.Context) (time.Duration, erro
} else {
deploymentLogger.Debugf("Failed to increase deployment replicas (%d): %s", failureCount, err)
}
observability.Deployment.ReconciliationFailure(ctx, reconcile.Module, reconcile.Deployment.String())
} else {
lock.Lock()
delete(s.increaseReplicaFailures, deployment.Key.String())
Expand All @@ -1246,22 +1250,29 @@ func (s *Service) reconcileDeployments(ctx context.Context) (time.Duration, erro
if reconcile.AssignedReplicas+1 == reconcile.RequiredReplicas {
deploymentLogger.Infof("Deployed %s", reconcile.Deployment)
}
observability.Deployment.ReplicasUpdated(ctx, reconcile.Module, reconcile.Deployment.String(), require)
}
return nil
})
} else if require < 0 {
observability.Deployment.ReconciliationStart(ctx, reconcile.Module, reconcile.Deployment.String())
defer observability.Deployment.ReconciliationComplete(ctx, reconcile.Module, reconcile.Deployment.String())

deploymentLogger.Debugf("Need %d less runners for %s", -require, reconcile.Deployment)
wg.Go(func(ctx context.Context) error {
ok, err := s.terminateRandomRunner(ctx, deployment.Key)
if err != nil {
deploymentLogger.Warnf("Failed to terminate runner: %s", err)
observability.Deployment.ReconciliationFailure(ctx, reconcile.Module, reconcile.Deployment.String())
} else if ok {
deploymentLogger.Debugf("Reconciled %s to %d/%d replicas", reconcile.Deployment, reconcile.AssignedReplicas-1, reconcile.RequiredReplicas)
if reconcile.AssignedReplicas-1 == reconcile.RequiredReplicas {
deploymentLogger.Infof("Stopped %s", reconcile.Deployment)
}
observability.Deployment.ReplicasUpdated(ctx, reconcile.Module, reconcile.Deployment.String(), require)
} else {
deploymentLogger.Warnf("Failed to terminate runner: no runners found")
observability.Deployment.ReconciliationFailure(ctx, reconcile.Module, reconcile.Deployment.String())
}
return nil
})
Expand Down
97 changes: 97 additions & 0 deletions backend/controller/observability/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package observability

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/TBD54566975/ftl/internal/observability"
)

const (
deploymentMeterName = "ftl.deployments.controller"
)

type DeploymentMetrics struct {
reconciliationFailures metric.Int64Counter
reconciliationsActive metric.Int64UpDownCounter
replicasAdded metric.Int64Counter
replicasRemoved metric.Int64Counter
}

func initDeploymentMetrics() (*DeploymentMetrics, error) {
result := &DeploymentMetrics{}

var errs error
var err error

meter := otel.Meter(deploymentMeterName)

counter := fmt.Sprintf("%s.reconciliation.failures", deploymentMeterName)
if result.reconciliationFailures, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of failed runner deployment reconciliation tasks")); err != nil {
result.reconciliationFailures, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.reconciliations.active", deploymentMeterName)
if result.reconciliationsActive, err = meter.Int64UpDownCounter(
counter,
metric.WithDescription("the number of active deployment reconciliation tasks")); err != nil {
result.reconciliationsActive, errs = handleInt64UpDownCounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.replicas.added", deploymentMeterName)
if result.replicasAdded, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of runner replicas added by the deployment reconciliation tasks")); err != nil {
result.replicasAdded, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.replicas.removed", deploymentMeterName)
if result.replicasRemoved, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of runner replicas removed by the deployment reconciliation tasks")); err != nil {
result.replicasRemoved, errs = handleInt64CounterError(counter, err, errs)
}

return result, errs
}

func (m *DeploymentMetrics) ReconciliationFailure(ctx context.Context, module string, key string) {
m.reconciliationFailures.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(observability.RunnerDeploymentKeyAttribute, key),
))
}

func (m *DeploymentMetrics) ReconciliationStart(ctx context.Context, module string, key string) {
m.reconciliationsActive.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(observability.RunnerDeploymentKeyAttribute, key),
))
}

func (m *DeploymentMetrics) ReconciliationComplete(ctx context.Context, module string, key string) {
m.reconciliationsActive.Add(ctx, -1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(observability.RunnerDeploymentKeyAttribute, key),
))
}

func (m *DeploymentMetrics) ReplicasUpdated(ctx context.Context, module string, key string, delta int) {
if delta < 0 {
m.replicasRemoved.Add(ctx, int64(-delta), metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(observability.RunnerDeploymentKeyAttribute, key),
))
} else if delta > 0 {
m.replicasAdded.Add(ctx, int64(delta), metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(observability.RunnerDeploymentKeyAttribute, key),
))
}
}
16 changes: 16 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"errors"
"fmt"
"time"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
)

var (
AsyncCalls *AsyncCallMetrics
Deployment *DeploymentMetrics
FSM *FSMMetrics
PubSub *PubSubMetrics
)
Expand All @@ -18,6 +22,8 @@ func init() {

AsyncCalls, err = initAsyncCallMetrics()
errs = errors.Join(errs, err)
Deployment, err = initDeploymentMetrics()
errs = errors.Join(errs, err)
FSM, err = initFSMMetrics()
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
Expand All @@ -28,6 +34,16 @@ func init() {
}
}

//nolint:unparam
func handleInt64CounterError(counter string, err error, errs error) (metric.Int64Counter, error) {
return noop.Int64Counter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

//nolint:unparam
func handleInt64UpDownCounterError(counter string, err error, errs error) (metric.Int64UpDownCounter, error) {
return noop.Int64UpDownCounter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

func timeSinceMS(start time.Time) int64 {
return time.Since(start).Milliseconds()
}
65 changes: 65 additions & 0 deletions backend/runner/observability/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package observability

import (
"context"
"fmt"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/TBD54566975/ftl/internal/observability"
)

const (
deploymentMeterName = "ftl.deployments.runner"
)

type DeploymentMetrics struct {
failure metric.Int64Counter
active metric.Int64UpDownCounter
}

func initDeploymentMetrics() (*DeploymentMetrics, error) {
result := &DeploymentMetrics{}

var errs error
var err error

meter := otel.Meter(deploymentMeterName)

counter := fmt.Sprintf("%s.failures", deploymentMeterName)
if result.failure, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of deployment failures")); err != nil {
result.failure, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.active", deploymentMeterName)
if result.active, err = meter.Int64UpDownCounter(
counter,
metric.WithDescription("the number of active deployments")); err != nil {
result.active, errs = handleInt64UpDownCounterError(counter, err, errs)
}

return result, errs
}

func (m *DeploymentMetrics) Failure(ctx context.Context, key optional.Option[string]) {
m.failure.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.RunnerDeploymentKeyAttribute, key.Default("unknown")),
))
}

func (m *DeploymentMetrics) Started(ctx context.Context, key string) {
m.active.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.RunnerDeploymentKeyAttribute, key),
))
}

func (m *DeploymentMetrics) Completed(ctx context.Context, key string) {
m.active.Add(ctx, -1, metric.WithAttributes(
attribute.String(observability.RunnerDeploymentKeyAttribute, key),
))
}
23 changes: 21 additions & 2 deletions backend/runner/observability/observability.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
package observability

import (
"errors"
"fmt"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
)

var (
Runner *RunnerMetrics
Runner *RunnerMetrics
Deployment *DeploymentMetrics
)

func init() {
var errs error
var err error

Runner, err = initRunnerMetrics()
errs = errors.Join(errs, err)
Deployment, err = initDeploymentMetrics()
errs = errors.Join(errs, err)

if err != nil {
if errs != nil {
panic(fmt.Errorf("could not initialize runner metrics: %w", err))
}
}

//nolint:unparam
func handleInt64CounterError(counter string, err error, errs error) (metric.Int64Counter, error) {
return noop.Int64Counter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

//nolint:unparam
func handleInt64UpDownCounterError(counter string, err error, errs error) (metric.Int64UpDownCounter, error) {
return noop.Int64UpDownCounter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}
13 changes: 3 additions & 10 deletions backend/runner/observability/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package observability

import (
"context"
"errors"
"fmt"
"strings"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/observability"
Expand Down Expand Up @@ -39,21 +37,21 @@ func initRunnerMetrics() (*RunnerMetrics, error) {
if result.startupFailures, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of runner startup failures")); err != nil {
result.startupFailures, errs = handleInitErrors(counter, err, errs)
result.startupFailures, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.registration.heartbeats", runnerMeterName)
if result.registrationHeartbeats, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of successful runner (re-)registrations")); err != nil {
result.registrationHeartbeats, errs = handleInitErrors(counter, err, errs)
result.registrationHeartbeats, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.registration.failures", runnerMeterName)
if result.registrationFailures, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of failures encountered while attempting to register a runner")); err != nil {
result.registrationFailures, errs = handleInitErrors(counter, err, errs)
result.registrationFailures, errs = handleInt64CounterError(counter, err, errs)
}

return result, errs
Expand All @@ -77,11 +75,6 @@ func (m *RunnerMetrics) StartupFailed(ctx context.Context) {
m.startupFailures.Add(ctx, 1)
}

//nolint:unparam
func handleInitErrors(counter string, err error, errs error) (metric.Int64Counter, error) {
return noop.Int64Counter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

func runnerStateToString(state ftlv1.RunnerState) string {
return strings.ToLower(strings.TrimPrefix(state.String(), "RUNNER_"))
}
Loading

0 comments on commit a5bb4b9

Please sign in to comment.