Skip to content

Commit

Permalink
chore: remove FSM (#3198)
Browse files Browse the repository at this point in the history
This will be replaced by a workflow engine at a later date

Fixes: #3196
  • Loading branch information
stuartwdouglas authored Oct 28, 2024
1 parent 7a5b535 commit 87bc08a
Show file tree
Hide file tree
Showing 100 changed files with 2,450 additions and 7,844 deletions.
18 changes: 2 additions & 16 deletions backend/controller/async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var asyncOriginLexer = lexer.MustSimple([]lexer.SimpleRule{
})

var asyncOriginParser = participle.MustBuild[asyncOriginParseRoot](
participle.Union[AsyncOrigin](AsyncOriginCron{}, AsyncOriginFSM{}, AsyncOriginPubSub{}),
participle.Union[AsyncOrigin](AsyncOriginCron{}, AsyncOriginPubSub{}),
participle.Lexer(asyncOriginLexer),
)

Expand All @@ -48,23 +48,9 @@ func (AsyncOriginCron) asyncOrigin() {}
func (a AsyncOriginCron) Origin() string { return "cron" }
func (a AsyncOriginCron) String() string { return fmt.Sprintf("cron:%s", a.CronJobKey) }

// AsyncOriginFSM represents the context for the originator of an FSM async call.
//
// It is in the form fsm:<module>.<name>:<key>
type AsyncOriginFSM struct {
FSM schema.RefKey `parser:"'fsm' ':' @@"`
Key string `parser:"':' @(~EOF)+"`
}

var _ AsyncOrigin = AsyncOriginFSM{}

func (AsyncOriginFSM) asyncOrigin() {}
func (a AsyncOriginFSM) Origin() string { return "fsm" }
func (a AsyncOriginFSM) String() string { return fmt.Sprintf("fsm:%s:%s", a.FSM, a.Key) }

// AsyncOriginPubSub represents the context for the originator of an PubSub async call.
//
// It is in the form fsm:<module>.<subscription_name>
// It is in the form sub:<module>.<subscription_name>
type AsyncOriginPubSub struct {
Subscription schema.RefKey `parser:"'sub' ':' @@"`
}
Expand Down
14 changes: 1 addition & 13 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *ConsoleService) GetModules(ctx context.Context, req *connect.Request[pb
case *schema.Config:
configs = append(configs, configFromDecl(decl))

case *schema.Database, *schema.Enum, *schema.TypeAlias, *schema.FSM, *schema.Topic, *schema.Subscription:
case *schema.Database, *schema.Enum, *schema.TypeAlias, *schema.Topic, *schema.Subscription:
}
}

Expand Down Expand Up @@ -172,7 +172,6 @@ func moduleFromDecls(decls []schema.Decl, sch *schema.Schema) (*pbconsole.Module
var data []*pbconsole.Data
var databases []*pbconsole.Database
var enums []*pbconsole.Enum
var fsms []*pbconsole.FSM
var topics []*pbconsole.Topic
var typealiases []*pbconsole.TypeAlias
var secrets []*pbconsole.Secret
Expand All @@ -193,9 +192,6 @@ func moduleFromDecls(decls []schema.Decl, sch *schema.Schema) (*pbconsole.Module
case *schema.Enum:
enums = append(enums, enumFromDecl(decl))

case *schema.FSM:
fsms = append(fsms, fsmFromDecl(decl))

case *schema.Topic:
topics = append(topics, topicFromDecl(decl))

Expand All @@ -222,7 +218,6 @@ func moduleFromDecls(decls []schema.Decl, sch *schema.Schema) (*pbconsole.Module
Data: data,
Databases: databases,
Enums: enums,
Fsms: fsms,
Topics: topics,
Typealiases: typealiases,
Secrets: secrets,
Expand Down Expand Up @@ -261,13 +256,6 @@ func enumFromDecl(decl *schema.Enum) *pbconsole.Enum {
}
}

func fsmFromDecl(decl *schema.FSM) *pbconsole.FSM {
return &pbconsole.FSM{
//nolint:forcetypeassert
Fsm: decl.ToProto().(*schemapb.FSM),
}
}

func topicFromDecl(decl *schema.Topic) *pbconsole.Topic {
return &pbconsole.Topic{
//nolint:forcetypeassert
Expand Down
239 changes: 2 additions & 237 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,143 +843,6 @@ func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque
return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), optional.None[model.RequestKey](), "")
}

func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.SendFSMEventRequest]) (resp *connect.Response[ftlv1.SendFSMEventResponse], err error) {
msg := req.Msg

// Resolve the FSM.
fsm, eventType, fsmKey, err := s.resolveFSMEvent(msg)
if err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}

tx, err := s.dal.Begin(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err))
}
defer tx.CommitOrRollback(ctx, &err)

instance, err := tx.AcquireFSMInstance(ctx, fsmKey, msg.Instance)
if err != nil {
return nil, connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("could not acquire fsm instance: %w", err))
}
defer instance.Release() //nolint:errcheck

err = s.sendFSMEventInTx(ctx, tx, instance, fsm, eventType, msg.Body, false)
if err != nil {
return nil, err
}
return connect.NewResponse(&ftlv1.SendFSMEventResponse{}), nil
}

// schedules an event for a FSM instance within a db transaction
// body may already be encrypted, which is denoted by the encrypted flag
func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.DAL, instance *dal.FSMInstance, fsm *schema.FSM, eventType schema.Type, body []byte, encrypted bool) error {
// Populated if we find a matching transition.
var destinationRef *schema.Ref
var destinationVerb *schema.Verb

var candidates []string

sch := s.schemaState.Load().schema

updateCandidates := func(ref *schema.Ref) (brk bool, err error) {
verb := &schema.Verb{}
if err := sch.ResolveToType(ref, verb); err != nil {
return false, connect.NewError(connect.CodeNotFound, fmt.Errorf("fsm: destination verb %s not found: %w", ref, err))
}
candidates = append(candidates, verb.Name)
if !eventType.Equal(verb.Request) {
return false, nil
}

destinationRef = ref
destinationVerb = verb
return true, nil
}

// Check start transitions
if !instance.CurrentState.Ok() {
for _, start := range fsm.Start {
if brk, err := updateCandidates(start); err != nil {
return err
} else if brk {
break
}
}
} else {
// Find the transition from the current state that matches the given event.
for _, transition := range fsm.Transitions {
instanceState, _ := instance.CurrentState.Get()
if transition.From.ToRefKey() != instanceState {
continue
}
if brk, err := updateCandidates(transition.To); err != nil {
return err
} else if brk {
break
}
}
}

if destinationRef == nil {
if len(candidates) > 0 {
return connect.NewError(connect.CodeFailedPrecondition,
fmt.Errorf("no transition found from state %s for type %s, candidates are %s", instance.CurrentState, eventType, strings.Join(candidates, ", ")))
}
return connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("no transition found from state %s for type %s", instance.CurrentState, eventType))
}

retryParams, err := schema.RetryParamsForFSMTransition(fsm, destinationVerb)
if err != nil {
return connect.NewError(connect.CodeInternal, err)
}

err = tx.StartFSMTransition(ctx, instance.FSM, instance.Key, destinationRef.ToRefKey(), body, encrypted, retryParams)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not start fsm transition: %w", err))
}
return nil
}

func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv1.SendFSMEventRequest]) (resp *connect.Response[ftlv1.SendFSMEventResponse], err error) {
tx, err := s.dal.Begin(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err))
}
defer tx.CommitOrRollback(ctx, &err)
sch := s.schemaState.Load().schema
msg := req.Msg
fsm, eventType, fsmKey, err := s.resolveFSMEvent(msg)
if err != nil {
return nil, connect.NewError(connect.CodeNotFound, err)
}

// Get the current state the instance is transitioning to.
_, currentDestinationState, err := tx.GetFSMStates(ctx, fsmKey, req.Msg.Instance)
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("fsm instance not found: %w", err))
}
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not get fsm instance: %w", err))
}

// Check if the transition is valid from the current state.
nextState, ok := fsm.NextState(sch, currentDestinationState, eventType).Get()
if !ok {
return nil, connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("invalid event %q for state %q", eventType, currentDestinationState))
}

// Set the next event.
err = tx.SetNextFSMEvent(ctx, fsmKey, msg.Instance, nextState.ToRefKey(), msg.Body, eventType)
if err != nil {
if errors.Is(err, libdal.ErrConflict) {
return nil, connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("fsm instance already has its next state set: %w", err))
}
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not set next fsm event: %w", err))
}
return connect.NewResponse(&ftlv1.SendFSMEventResponse{}), nil
}

func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
// Publish the event.
err := s.pubSub.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Caller, req.Msg.Body)
Expand Down Expand Up @@ -1350,9 +1213,6 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
case async.AsyncOriginCron:
break

case async.AsyncOriginFSM:
break

case async.AsyncOriginPubSub:
go s.pubSub.AsyncCallDidCommit(originalCtx, origin)

Expand Down Expand Up @@ -1468,7 +1328,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
queueDepth := call.QueueDepth
didScheduleAnotherCall, err := s.dal.CompleteAsyncCall(ctx, call, catchResult, func(tx *dal.DAL, isFinalResult bool) error {
// Exposes the original error to external components such as PubSub and FSM
// Exposes the original error to external components such as PubSub
return s.finaliseAsyncCall(ctx, tx, call, originalResult, isFinalResult)
})
if err != nil {
Expand Down Expand Up @@ -1517,24 +1377,10 @@ func (s *Service) reapAsyncCalls(ctx context.Context) (nextInterval time.Duratio
}

func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata {
switch origin := call.Origin.(type) {
switch call.Origin.(type) {
case async.AsyncOriginCron:
return &ftlv1.Metadata{}

case async.AsyncOriginFSM:
return &ftlv1.Metadata{
Values: []*ftlv1.Metadata_Pair{
{
Key: "fsmName",
Value: origin.FSM.Name,
},
{
Key: "fsmInstance",
Value: origin.Key,
},
},
}

case async.AsyncOriginPubSub:
return &ftlv1.Metadata{}

Expand All @@ -1553,11 +1399,6 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.DAL, call *dal.
return fmt.Errorf("failed to finalize cron async call: %w", err)
}

case async.AsyncOriginFSM:
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize FSM async call: %w", err)
}

case async.AsyncOriginPubSub:
if err := s.pubSub.OnCallCompletion(ctx, tx.Connection, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize pubsub async call: %w", err)
Expand All @@ -1569,82 +1410,6 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.DAL, call *dal.
return nil
}

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, origin async.AsyncOriginFSM, failed bool, isFinalResult bool) error {
logger := log.FromContext(ctx).Scope(origin.FSM.String()).Module(origin.FSM.Module)

// retrieve the next fsm event and delete it
next, err := tx.PopNextFSMEvent(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: failed to get next FSM event: %w", origin, err)
}
if !isFinalResult {
// Will retry, so we only want next fsm to be removed
return nil
}

instance, err := tx.AcquireFSMInstance(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: could not acquire lock on FSM instance: %w", origin, err)
}
defer instance.Release() //nolint:errcheck

if failed {
logger.Warnf("FSM %s failed async call", origin.FSM)
err := tx.FailFSMInstance(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: failed to fail FSM instance: %w", origin, err)
}
return nil
}

sch := s.schemaState.Load().schema

fsm := &schema.FSM{}
err = sch.ResolveToType(origin.FSM.ToRef(), fsm)
if err != nil {
return fmt.Errorf("%s: could not resolve FSM: %w", origin, err)
}

destinationState, _ := instance.DestinationState.Get()
// If we're heading to a terminal state we can just succeed the FSM.
for _, terminal := range fsm.TerminalStates() {
if terminal.ToRefKey() == destinationState {
logger.Debugf("FSM reached terminal state %s", destinationState)
err := tx.SucceedFSMInstance(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("%s: failed to succeed FSM instance: %w", origin, err)
}
return nil
}

}

instance, err = tx.FinishFSMTransition(ctx, instance)
if err != nil {
return fmt.Errorf("%s: failed to complete FSM transition: %w", origin, err)
}

// If there's a next event enqueued, we immediately start it.
if next, ok := next.Get(); ok {
return s.sendFSMEventInTx(ctx, tx, instance, fsm, next.RequestType, next.Request, true)
}
return nil
}

func (s *Service) resolveFSMEvent(msg *ftlv1.SendFSMEventRequest) (fsm *schema.FSM, eventType schema.Type, fsmKey schema.RefKey, err error) {
sch := s.schemaState.Load().schema

fsm = &schema.FSM{}
if err := sch.ResolveToType(schema.RefFromProto(msg.Fsm), fsm); err != nil {
return nil, nil, schema.RefKey{}, fmt.Errorf("fsm not found: %w", err)
}

eventType = schema.TypeFromProto(msg.Event)

fsmKey = schema.RefFromProto(msg.Fsm).ToRefKey()
return fsm, eventType, fsmKey, nil
}

func (s *Service) expireStaleLeases(ctx context.Context) (time.Duration, error) {
err := s.dbleaser.ExpireLeases(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/internal/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (d *DAL) CreateAsyncCall(ctx context.Context, params CreateAsyncCallParams)
observability.AsyncCalls.Created(ctx, params.Verb, optional.None[schema.RefKey](), params.Origin, 0, err)
queueDepth, err := d.db.AsyncCallQueueDepth(ctx)
if err == nil {
// Don't error out of an FSM transition just over a queue depth retrieval
// Don't error out of a transition just over a queue depth retrieval
// error because this is only used for an observability gauge.
observability.AsyncCalls.RecordQueueDepth(ctx, queueDepth)
}
Expand Down
1 change: 0 additions & 1 deletion backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestParser(t *testing.T) {
Payload: model.CronJobPayload{Module: "initial", Verb: "verb0Cron"},
Suffix: []byte("\xfd7\xe6*\xfcƹ\xe9.\x9c"),
}}},
{"FSM", `fsm:module.name:key`, async.AsyncOriginFSM{FSM: schema.RefKey{Module: "module", Name: "name"}, Key: "key"}},
{"PubSub", `sub:module.topic`, async.AsyncOriginPubSub{Subscription: schema.RefKey{Module: "module", Name: "topic"}}},
}
for _, tt := range tests {
Expand Down
Loading

0 comments on commit 87bc08a

Please sign in to comment.