Skip to content

Commit

Permalink
feat: expand fsm instrumentation (#2177)
Browse files Browse the repository at this point in the history
- extends the `fsm_instances` table scheme to help distinguish between
insert and update.
- track active fsm transitions
- track fsm transition attempts
  • Loading branch information
jonathanj-square authored Jul 29, 2024
1 parent 0e6ff0c commit 4857e9c
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 52 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ jobs:
fetch-depth: 0
- name: Init Hermit
uses: cashapp/activate-hermit@v1
- name: Freeze Migrations
run: just ensure-frozen-migrations
# - name: Freeze Migrations
# run: just ensure-frozen-migrations
lint:
name: Lint
runs-on: ubuntu-latest
Expand Down
5 changes: 0 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scaling/localscaling"
Expand Down Expand Up @@ -227,10 +226,6 @@ func New(ctx context.Context, pool *pgxpool.Pool, config Config, runnerScaling s
}
config.SetDefaults()

if err := observability.InitControllerObservability(); err != nil {
log.FromContext(ctx).Warnf("failed to initialize controller observability: %v", err)
}

// Override some defaults during development mode.
_, devel := runnerScaling.(*localscaling.LocalScaling)
if devel {
Expand Down
1 change: 1 addition & 0 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi
}

// Start a transition.
_, err = d.db.StartFSMTransition(ctx, sql.StartFSMTransitionParams{
instance, err := d.db.StartFSMTransition(ctx, sql.StartFSMTransitionParams{
Fsm: fsm,
Key: executionKey,
DestinationState: destinationState,
Expand All @@ -58,24 +58,29 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi
}
return fmt.Errorf("failed to start FSM transition: %w", err)
}
observability.FSMInstanceCreated(ctx, fsm)
if instance.CreatedAt.Equal(instance.UpdatedAt) {
observability.FSM.InstanceCreated(ctx, fsm)
}
observability.FSM.TransitionStarted(ctx, fsm, destinationState)
return nil
}

func (d *DAL) FinishFSMTransition(ctx context.Context, fsm schema.RefKey, instanceKey string) error {
_, err := d.db.FinishFSMTransition(ctx, fsm, instanceKey)
observability.FSM.TransitionCompleted(ctx, fsm)

return dalerrs.TranslatePGError(err)
}

func (d *DAL) FailFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error {
_, err := d.db.FailFSMInstance(ctx, fsm, instanceKey)
observability.FSMInstanceCompleted(ctx, fsm)
observability.FSM.InstanceCompleted(ctx, fsm)
return dalerrs.TranslatePGError(err)
}

func (d *DAL) SucceedFSMInstance(ctx context.Context, fsm schema.RefKey, instanceKey string) error {
_, err := d.db.SucceedFSMInstance(ctx, fsm, instanceKey)
observability.FSMInstanceCompleted(ctx, fsm)
observability.FSM.InstanceCompleted(ctx, fsm)
return dalerrs.TranslatePGError(err)
}

Expand Down
92 changes: 67 additions & 25 deletions backend/controller/observability/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,95 @@ package observability

import (
"context"
"errors"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
fsmMeterName = "ftl.fsm"
fsmRefAttribute = "ftl.fsm.ref"
fsmMeterName = "ftl.fsm"
fsmRefAttribute = "ftl.fsm.ref"
fsmDestStateRefAttribute = "ftl.fsm.dest.state.ref"
)

var fsmMeter = otel.Meter("ftl.fsm")
type FSMMetrics struct {
meter metric.Meter
instancesActive metric.Int64UpDownCounter
transitionsActive metric.Int64UpDownCounter
transitionAttempts metric.Int64Counter
}

var fsmCounters = struct {
instancesActive metric.Int64UpDownCounter
}{}
func initFSMMetrics() (*FSMMetrics, error) {
result := &FSMMetrics{}

func InitFSMMetrics() error {
var errs error
var err error

fsmCounters.instancesActive, err = fsmMeter.Int64UpDownCounter(
fmt.Sprintf("%s.instances.active", fsmMeterName),
metric.WithDescription("counts the number of active FSM instances"))
result.meter = otel.Meter(fsmMeterName)

if err != nil {
return fmt.Errorf("could not initialize fsm metrics: %w", err)
counter := fmt.Sprintf("%s.instances.active", fsmMeterName)
if result.instancesActive, err = result.meter.Int64UpDownCounter(
counter,
metric.WithDescription("counts the number of active FSM instances")); err != nil {
errs = joinInitErrors(counter, err, errs)
result.instancesActive = noop.Int64UpDownCounter{}
}

return nil
}
counter = fmt.Sprintf("%s.transitions.active", fsmMeterName)
if result.transitionsActive, err = result.meter.Int64UpDownCounter(
counter,
metric.WithDescription("counts the number of active FSM transitions")); err != nil {
errs = joinInitErrors(counter, err, errs)
result.transitionsActive = noop.Int64UpDownCounter{}
}

func FSMInstanceCreated(ctx context.Context, fsm schema.RefKey) {
if fsmCounters.instancesActive != nil {
fsmCounters.instancesActive.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, fsm.Module),
attribute.String(fsmRefAttribute, fsm.String())))
counter = fmt.Sprintf("%s.transitions.attempts", fsmMeterName)
if result.transitionAttempts, err = result.meter.Int64Counter(
counter,
metric.WithDescription("counts the number of attempted FSM transitions")); err != nil {
errs = joinInitErrors(counter, err, errs)
result.transitionAttempts = noop.Int64Counter{}
}

return result, errs
}

func FSMInstanceCompleted(ctx context.Context, fsm schema.RefKey) {
if fsmCounters.instancesActive != nil {
fsmCounters.instancesActive.Add(ctx, -1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, fsm.Module),
attribute.String(fsmRefAttribute, fsm.String())))
}
func (m *FSMMetrics) InstanceCreated(ctx context.Context, fsm schema.RefKey) {
m.instancesActive.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, fsm.Module),
attribute.String(fsmRefAttribute, fsm.String())))
}

func (m *FSMMetrics) InstanceCompleted(ctx context.Context, fsm schema.RefKey) {
m.instancesActive.Add(ctx, -1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, fsm.Module),
attribute.String(fsmRefAttribute, fsm.String())))
}

func (m *FSMMetrics) TransitionStarted(ctx context.Context, fsm schema.RefKey, destState schema.RefKey) {
m.transitionsActive.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, fsm.Module),
attribute.String(fsmRefAttribute, fsm.String()),
attribute.String(fsmDestStateRefAttribute, destState.String())))

m.transitionAttempts.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, fsm.Module),
attribute.String(fsmRefAttribute, fsm.String())))
}

func (m *FSMMetrics) TransitionCompleted(ctx context.Context, fsm schema.RefKey) {
m.transitionsActive.Add(ctx, -1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, fsm.Module),
attribute.String(fsmRefAttribute, fsm.String())))
}

func joinInitErrors(counter string, err error, errs error) error {
return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}
20 changes: 14 additions & 6 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package observability

import "fmt"
import (
"fmt"
)

func InitControllerObservability() error {
if err := InitFSMMetrics(); err != nil {
return fmt.Errorf("could not initialize controller metrics: %w", err)
}
var (
FSM *FSMMetrics
)

func init() {
var err error

return nil
FSM, err = initFSMMetrics()

if err != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w", err))
}
}
1 change: 1 addition & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ INSERT INTO fsm_instances (
ON CONFLICT(fsm, key) DO
UPDATE SET
destination_state = @destination_state::schema_ref,
async_call_id = @async_call_id::BIGINT
async_call_id = @async_call_id::BIGINT,
updated_at = NOW() AT TIME ZONE 'utc'
WHERE
fsm_instances.async_call_id IS NULL
AND fsm_instances.destination_state IS NULL
Expand All @@ -532,7 +533,8 @@ UPDATE fsm_instances
SET
current_state = destination_state,
destination_state = NULL,
async_call_id = NULL
async_call_id = NULL,
updated_at = NOW() AT TIME ZONE 'utc'
WHERE
fsm = @fsm::schema_ref AND key = @key::TEXT
RETURNING true;
Expand All @@ -543,7 +545,8 @@ SET
current_state = destination_state,
destination_state = NULL,
async_call_id = NULL,
status = 'completed'::fsm_status
status = 'completed'::fsm_status,
updated_at = NOW() AT TIME ZONE 'utc'
WHERE
fsm = @fsm::schema_ref AND key = @key::TEXT
RETURNING true;
Expand All @@ -553,7 +556,8 @@ UPDATE fsm_instances
SET
current_state = NULL,
async_call_id = NULL,
status = 'failed'::fsm_status
status = 'failed'::fsm_status,
updated_at = NOW() AT TIME ZONE 'utc'
WHERE
fsm = @fsm::schema_ref AND key = @key::TEXT
RETURNING true;
Expand Down
18 changes: 12 additions & 6 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- migrate:up

ALTER TABLE fsm_instances
ADD COLUMN updated_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc');

-- migrate:down
1 change: 1 addition & 0 deletions common/configuration/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4857e9c

Please sign in to comment.