Skip to content

Commit

Permalink
feat: introduce cron job metrics (#2256)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jonathanj-square and github-actions[bot] authored Aug 7, 2024
1 parent 10f42d7 commit 005f605
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 7 deletions.
10 changes: 10 additions & 0 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand Down Expand Up @@ -202,6 +203,7 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) {
requestJSON, err := json.Marshal(requestBody)
if err != nil {
logger.Errorf(err, "could not build body for cron job: %v", job.Key)
observability.Cron.JobFailedStart(ctx, job)
return
}

Expand All @@ -214,10 +216,17 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) {

callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()

observability.Cron.JobStarted(ctx, job)
_, err = s.call(callCtx, req, optional.Some(requestKey), s.requestSource)

// Record execution success/failure metric now and leave post job-execution-action observability to logging
if err != nil {
logger.Errorf(err, "failed to execute cron job %v", job.Key)
observability.Cron.JobFailed(ctx, job)
// Do not return, continue to end the job and schedule the next execution
} else {
observability.Cron.JobSuccess(ctx, job)
}

schedule, err := cron.Parse(job.Schedule)
Expand Down Expand Up @@ -274,6 +283,7 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) {
continue
}
logger.Warnf("Killed stale cron job %s", stale.Key)
observability.Cron.JobKilled(ctx, stale)
updatedJobs = append(updatedJobs, updated)
}

Expand Down
8 changes: 4 additions & 4 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func wrapErr(signalName string, err error) error {

func (m *AsyncCallMetrics) Created(ctx context.Context, verb schema.RefKey, origin string, remainingAttempts int64, maybeErr error) {
attrs := extractRefAttrs(verb, origin)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil))
attrs = append(attrs, attribute.Int64(asyncCallRemainingAttemptsAttr, remainingAttempts))

m.created.Add(ctx, 1, metric.WithAttributes(attrs...))
Expand All @@ -103,15 +103,15 @@ func (m *AsyncCallMetrics) RecordQueueDepth(ctx context.Context, queueDepth int6

func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil))
m.acquired.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeFailureMode optional.Option[string]) {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)

failureMode, ok := maybeFailureMode.Get()
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok))
attrs = append(attrs, observability.SuccessOrFailureStatusAttr(!ok))
if ok {
attrs = append(attrs, attribute.String(asyncCallExecFailureModeAttr, failureMode))
}
Expand All @@ -123,7 +123,7 @@ func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, or
msToComplete := timeSinceMS(scheduledAt)

attrs := extractRefAttrs(verb, origin)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil))
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, logBucket(8, msToComplete)))
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/observability/calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *CallMetrics) Request(ctx context.Context, verb *schemapb.Ref, startTime
}

failureMode, ok := maybeFailureMode.Get()
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok))
attrs = append(attrs, observability.SuccessOrFailureStatusAttr(!ok))
if ok {
attrs = append(attrs, attribute.String(callFailureModeAttr, failureMode))
}
Expand Down
107 changes: 107 additions & 0 deletions backend/controller/observability/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package observability

import (
"context"
"fmt"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
cronMeterName = "ftl.cron"
cronJobFullNameAttribute = "ftl.cron.job.full_name"

cronJobKilledStatus = "killed"
cronJobFailedStartStatus = "failed_start"
)

type CronMetrics struct {
jobsActive metric.Int64UpDownCounter
jobsCompleted metric.Int64Counter
jobLatency metric.Int64Histogram
}

func initCronMetrics() (*CronMetrics, error) {
result := &CronMetrics{}

var errs error
var err error

meter := otel.Meter(deploymentMeterName)

counter := fmt.Sprintf("%s.jobs.completed", cronMeterName)
if result.jobsCompleted, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of cron jobs completed; successful or otherwise")); err != nil {
result.jobsCompleted, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.jobs.active", cronMeterName)
if result.jobsActive, err = meter.Int64UpDownCounter(
counter,
metric.WithDescription("the number of actively executing cron jobs")); err != nil {
result.jobsActive, errs = handleInt64UpDownCounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.job.latency", cronMeterName)
if result.jobLatency, err = meter.Int64Histogram(
counter,
metric.WithDescription("the latency between the scheduled execution time of a cron job"),
metric.WithUnit("ms")); err != nil {
result.jobLatency, errs = handleInt64HistogramCounterError(counter, err, errs)
}

return result, errs
}

func (m *CronMetrics) JobStarted(ctx context.Context, job model.CronJob) {
m.jobsActive.Add(ctx, 1, cronAttributes(job, optional.None[string]()))
}

func (m *CronMetrics) JobSuccess(ctx context.Context, job model.CronJob) {
m.jobCompleted(ctx, job, observability.SuccessStatus)
}

func (m *CronMetrics) JobKilled(ctx context.Context, job model.CronJob) {
m.jobCompleted(ctx, job, cronJobKilledStatus)
}

func (m *CronMetrics) JobFailedStart(ctx context.Context, job model.CronJob) {
completionAttributes := cronAttributes(job, optional.Some(cronJobFailedStartStatus))

elapsed := timeSinceMS(job.NextExecution)
m.jobLatency.Record(ctx, elapsed, completionAttributes)
m.jobsCompleted.Add(ctx, 1, completionAttributes)
}

func (m *CronMetrics) JobFailed(ctx context.Context, job model.CronJob) {
m.jobCompleted(ctx, job, observability.FailureStatus)
}

func (m *CronMetrics) jobCompleted(ctx context.Context, job model.CronJob, status string) {
elapsed := timeSinceMS(job.NextExecution)

m.jobsActive.Add(ctx, -1, cronAttributes(job, optional.None[string]()))

completionAttributes := cronAttributes(job, optional.Some(status))
m.jobLatency.Record(ctx, elapsed, completionAttributes)
m.jobsCompleted.Add(ctx, 1, completionAttributes)
}

func cronAttributes(job model.CronJob, maybeStatus optional.Option[string]) metric.MeasurementOption {
attributes := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, job.Key.Payload.Module),
attribute.String(cronJobFullNameAttribute, job.Key.String()),
attribute.String(observability.RunnerDeploymentKeyAttribute, job.DeploymentKey.String()),
}
if status, ok := maybeStatus.Get(); ok {
attributes = append(attributes, attribute.String(observability.OutcomeStatusNameAttribute, status))
}
return metric.WithAttributes(attributes...)
}
8 changes: 8 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
Deployment *DeploymentMetrics
FSM *FSMMetrics
PubSub *PubSubMetrics
Cron *CronMetrics
)

func init() {
Expand All @@ -32,6 +33,8 @@ func init() {
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)
Cron, err = initCronMetrics()
errs = errors.Join(errs, err)

if err != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w", errs))
Expand All @@ -48,6 +51,11 @@ func handleInt64UpDownCounterError(counter string, err error, errs error) (metri
return noop.Int64UpDownCounter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

//nolint:unparam
func handleInt64HistogramCounterError(counter string, err error, errs error) (metric.Int64Histogram, error) {
return noop.Int64Histogram{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

func timeSinceMS(start time.Time) int64 {
return time.Since(start).Milliseconds()
}
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller str
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: module, Name: topic}.String()),
attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()),
attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil),
observability.SuccessOrFailureStatusAttr(maybeErr == nil),
}

m.published.Add(ctx, 1, metric.WithAttributes(attrs...))
Expand Down
14 changes: 13 additions & 1 deletion internal/observability/attributes.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
package observability

import "go.opentelemetry.io/otel/attribute"

const (
ModuleNameAttribute = "ftl.module.name"
StatusSucceededAttribute = "ftl.status.succeeded"
OutcomeStatusNameAttribute = "ftl.outcome.status"
RunnerDeploymentKeyAttribute = "ftl.deployment.key"

SuccessStatus = "success"
FailureStatus = "failure"
)

func SuccessOrFailureStatusAttr(succeeded bool) attribute.KeyValue {
if succeeded {
return attribute.String(OutcomeStatusNameAttribute, SuccessStatus)
}
return attribute.String(OutcomeStatusNameAttribute, FailureStatus)
}

0 comments on commit 005f605

Please sign in to comment.