Skip to content

Commit

Permalink
feat: plumb fsm send through from the runtime to async execution (#1542)
Browse files Browse the repository at this point in the history
Does not currently support use in tests. We'll implement that in a
followup.
  • Loading branch information
alecthomas authored May 27, 2024
1 parent 051269a commit 3c91886
Show file tree
Hide file tree
Showing 95 changed files with 2,453 additions and 1,214 deletions.
4 changes: 2 additions & 2 deletions Justfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
set positional-arguments
set shell := ["bash", "-c"]

WATCHEXEC_ARGS := "-e proto -e go -e sql -f sqlc.yaml"
WATCHEXEC_ARGS := "-d 1s -e proto -e go -e sql -f sqlc.yaml"
RELEASE := "build/release"
VERSION := `git describe --tags --always --dirty | sed -e 's/^v//'`
KT_RUNTIME_OUT := "kotlin-runtime/ftl-runtime/target/ftl-runtime-1.0-SNAPSHOT.jar"
Expand Down Expand Up @@ -38,7 +38,7 @@ dev *args:
watchexec -r {{WATCHEXEC_ARGS}} -- "just build-sqlc && ftl dev {{args}}"

# Build everything
build-all: build-frontend build-generate build-protos build-sqlc build-zips
build-all: build-protos build-frontend build-generate build-sqlc build-zips
@just build ftl ftl-controller ftl-runner ftl-initdb

# Run "go generate" on all packages
Expand Down
15 changes: 8 additions & 7 deletions backend/controller/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ func visitNode(sch *schema.Schema, n schema.Node, verbString *string) error {
return schema.Visit(n, func(n schema.Node, next func() error) error {
switch n := n.(type) {
case *schema.Ref:
decl := sch.Resolve(n)
if decl != nil {
*verbString += decl.String() + "\n\n"
err := visitNode(sch, decl, verbString)
if err != nil {
return err
}
decl, ok := sch.Resolve(n).Get()
if !ok {
return fmt.Errorf("failed to resolve %s", n)
}
*verbString += decl.String() + "\n\n"
err := visitNode(sch, decl, verbString)
if err != nil {
return err
}
default:
}
Expand Down
157 changes: 144 additions & 13 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/alecthomas/atomic"
"github.com/alecthomas/concurrency"
"github.com/alecthomas/kong"
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -711,6 +712,89 @@ func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque
return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), "")
}

func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.SendFSMEventRequest]) (resp *connect.Response[ftlv1.SendFSMEventResponse], err error) {
msg := req.Msg
sch := s.schema.Load()
// Resolve the FSM.
fsm := &schema.FSM{}
if err := sch.ResolveToType(schema.RefFromProto(msg.Fsm), fsm); err != nil {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("fsm not found: %w", err))
}

eventType := schema.TypeFromProto(msg.Event)

fsmKey := schema.RefFromProto(msg.Fsm).ToRefKey()

tx, err := s.dal.Begin(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err))
}
defer tx.CommitOrRollback(ctx, &err)

instance, err := tx.AcquireFSMInstance(ctx, fsmKey, msg.Instance)
if err != nil {
return nil, connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("could not acquire fsm instance: %w", err))
}
defer instance.Release() //nolint:errcheck

// Populated if we find a matching transition.
var destinationVerb *schema.Ref

var candidates []string

updateCandidates := func(ref *schema.Ref) (brk bool, err error) {
verb := &schema.Verb{}
if err := sch.ResolveToType(ref, verb); err != nil {
return false, connect.NewError(connect.CodeNotFound, fmt.Errorf("fsm: destination verb %s not found: %w", ref, err))
}
candidates = append(candidates, verb.Name)
if !eventType.Equal(verb.Request) {
return false, nil
}

destinationVerb = ref
return true, nil
}

// Check start transitions
if !instance.CurrentState.Ok() {
for _, start := range fsm.Start {
if brk, err := updateCandidates(start); err != nil {
return nil, err
} else if brk {
break
}
}
} else {
// Find the transition from the current state that matches the given event.
for _, transition := range fsm.Transitions {
instanceState, _ := instance.CurrentState.Get()
if transition.From.ToRefKey() != instanceState {
continue
}
if brk, err := updateCandidates(transition.To); err != nil {
return nil, err
} else if brk {
break
}
}
}

if destinationVerb == nil {
if len(candidates) > 0 {
return nil, connect.NewError(connect.CodeFailedPrecondition,
fmt.Errorf("no transition found from state %s for type %s, candidates are %s", instance.CurrentState, eventType, strings.Join(candidates, ", ")))
}
return nil, connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("no transition found from state %s for type %s", instance.CurrentState, eventType))
}

err = tx.StartFSMTransition(ctx, instance.FSM, instance.Key, destinationVerb.ToRefKey(), msg.Body)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start fsm transition: %w", err))
}
return connect.NewResponse(&ftlv1.SendFSMEventResponse{}), nil
}

func (s *Service) callWithRequest(
ctx context.Context,
req *connect.Request[ftlv1.CallRequest],
Expand Down Expand Up @@ -1077,44 +1161,91 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)

call, err := s.dal.AcquireAsyncCall(ctx)
if errors.Is(err, dal.ErrNotFound) {
logger.Tracef("No async calls to execute")
return time.Second * 2, nil
} else if err != nil {
return 0, err
}
defer call.Release() //nolint:errcheck

logger = logger.Scope(fmt.Sprintf("%s:%s:%s", call.Origin, call.OriginKey, call.Verb))
logger = logger.Scope(fmt.Sprintf("%s:%s", call.Origin, call.Verb))

logger.Tracef("Executing async call")
req := &ftlv1.CallRequest{ //nolint:forcetypeassert
Verb: call.Verb.ToProto().(*schemapb.Ref),
req := &ftlv1.CallRequest{
Verb: call.Verb.ToProto(),
Body: call.Request,
}
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String())
if err != nil {
return 0, fmt.Errorf("async call failed: %w", err)
}
var callError optional.Option[string]
var callResult either.Either[[]byte, string]
if perr := resp.Msg.GetError(); perr != nil {
logger.Warnf("Async call failed: %s", perr.Message)
callError = optional.Some(perr.Message)
callResult = either.RightOf[[]byte](perr.Message)
} else {
logger.Debugf("Async call succeeded")
callResult = either.LeftOf[string](resp.Msg.GetBody())
}
err = s.dal.CompleteAsyncCall(ctx, call, resp.Msg.GetBody(), callError)
err = s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx) error {
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
return s.onAsyncFSMCallCompletion(ctx, tx, origin, resp.Msg.GetError() != nil)

default:
panic(fmt.Errorf("unsupported async call origin: %v", call.Origin))
}
})
if err != nil {
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
switch call.Origin {
case dal.AsyncCallOriginFSM:
return time.Millisecond * 100, s.onAsyncFSMCallCompletion(ctx, call, resp.Msg)
return 0, nil
}

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginFSM, failed bool) error {
logger := log.FromContext(ctx).Scope(origin.FSM.String())

default:
panic(fmt.Sprintf("unexpected async call origin: %s", call.Origin))
instance, err := tx.AcquireFSMInstance(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("could not acquire lock on FSM instance: %w", err)
}
}
defer instance.Release() //nolint:errcheck

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, call *dal.AsyncCall, response *ftlv1.CallResponse) error {
if failed {
logger.Warnf("FSM %s failed async call", origin.FSM)
err := tx.FailFSMInstance(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("failed to fail FSM instance: %w", err)
}
return nil
}

sch := s.schema.Load()

fsm := &schema.FSM{}
err = sch.ResolveToType(origin.FSM.ToRef(), fsm)
if err != nil {
return fmt.Errorf("could not resolve FSM: %w", err)
}

destinationState, _ := instance.DestinationState.Get()
// If we're heading to a terminal state we can just succeed the FSM.
for _, terminal := range fsm.TerminalStates() {
if terminal.ToRefKey() == destinationState {
logger.Debugf("FSM reached terminal state %s", destinationState)
err := tx.SucceedFSMInstance(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("failed to succeed FSM instance: %w", err)
}
return nil
}

}

err = tx.FinishFSMTransition(ctx, origin.FSM, origin.Key)
if err != nil {
return fmt.Errorf("failed to complete FSM transition: %w", err)
}
return nil
}

Expand Down
Loading

0 comments on commit 3c91886

Please sign in to comment.