Skip to content

Commit

Permalink
feat: catch async calls after retries are exhausted (#2190)
Browse files Browse the repository at this point in the history
closes #2185
closes #2212

Retry directives can now define a catch verb:
```go
//ftl:retry 10 1s catch example.catchPayments
```

Catch verbs have a special request type:
```go
func CatchPayments(ctx context.Context, req builtin.CatchRequest[publisher.PubSubEvent]) error {
    // do something
    return nil
}
```
Behavior:
- FTL will keep attemping the catch verb until it does not return an
error. It will do this at the backoff rate that the retries progressed
to before catching.
- `builtin.CatchRequest[EventType]` provides the original request and a
string of the error that was returned on the last attempt by the
subscriber
- Once in a catch verb for a FSM transition, there is currently no way
to prevent the FSM from reaching a failed state

Behind the scenes:
Async calls use a new row per attempt. This PR continues that pattern:
- After the last attempt is completed, if there is a catching verb
defined then a new row is added with `catching = true`
- Originally I had gone down the road of making this a different state
but it got tricky with `AcquireAsyncCall` coordinating leases with the
`pending` and `executing` states, and would require 2 new states for the
catching equivalent of those.
- If the catch verb could not be called or fails, a new async call row
is inserted with the current backoff
- The next scheduled attempt to catch keeps the original verb's error,
not the new catch error
  • Loading branch information
matt2e authored Aug 8, 2024
1 parent c417145 commit cd2b6c1
Show file tree
Hide file tree
Showing 31 changed files with 1,029 additions and 431 deletions.
177 changes: 124 additions & 53 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
sha "crypto/sha256"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash"
Expand Down Expand Up @@ -1366,7 +1367,7 @@ func (s *Service) AsyncCallWasAdded(ctx context.Context) {
}()
}

func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) {
func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration, returnErr error) {
// There are multiple entry points into this function, but we want the controller to handle async calls one at a time.
s.asyncCallsLock.Lock()
defer s.asyncCallsLock.Unlock()
Expand All @@ -1379,91 +1380,161 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
logger.Tracef("No async calls to execute")
return time.Second * 2, nil
} else if err != nil {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err)
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err)
return 0, err
}
observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil)
defer call.Release() //nolint:errcheck

// Queue depth is queried at acquisition time, which means it includes the async
// call that was just executed, but it will be recorded at completion time, so
// decrement the value accordingly.
queueDepth := call.QueueDepth - 1
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, nil)

defer func() {
if returnErr == nil {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
break

case dal.AsyncOriginPubSub:
go s.pubSub.AsyncCallDidCommit(ctx, origin)

default:
break
}
}

call.Release() //nolint:errcheck
}()

logger = logger.Scope(fmt.Sprintf("%s:%s", call.Origin, call.Verb))

if call.Catching {
// Retries have been exhausted but catch verb has previously failed
// We need to try again to catch the async call
return 0, s.catchAsyncCall(ctx, logger, call)
}

logger.Tracef("Executing async call")
req := &ftlv1.CallRequest{
Verb: call.Verb.ToProto(),
Body: call.Request,
}
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String())
var callResult either.Either[[]byte, string]
failed := false
if err != nil {
logger.Warnf("Async call could not be called: %v", err)
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.Some("async call could not be called"))
observability.AsyncCalls.Executed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, false, optional.Some("async call could not be called"))
callResult = either.RightOf[[]byte](err.Error())
failed = true
} else if perr := resp.Msg.GetError(); perr != nil {
logger.Warnf("Async call failed: %s", perr.Message)
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.Some("async call failed"))
observability.AsyncCalls.Executed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, false, optional.Some("async call failed"))
callResult = either.RightOf[[]byte](perr.Message)
failed = true
} else {
logger.Debugf("Async call succeeded")
callResult = either.LeftOf[string](resp.Msg.GetBody())
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.None[string]())
observability.AsyncCalls.Executed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, false, optional.None[string]())
}
err = s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx) error {
if failed && call.RemainingAttempts > 0 {
// If the call failed and there are attempts remaining, then
// CompleteAsyncCall would enqueue another call, so increment
// queue depth accordingly.
queueDepth++

// Will retry, do not propagate failure yet.
return nil
}
// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
return s.onAsyncFSMCallCompletion(ctx, tx, origin, failed)
queueDepth := call.QueueDepth
didScheduleAnotherCall, err := s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx, isFinalResult bool) error {
return s.finaliseAsyncCall(ctx, tx, call, callResult, isFinalResult)
})
if err != nil {
observability.AsyncCalls.Completed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, false, queueDepth, err)
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
if !didScheduleAnotherCall {
// Queue depth is queried at acquisition time, which means it includes the async
// call that was just executed so we need to decrement
queueDepth = call.QueueDepth - 1
}
observability.AsyncCalls.Completed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, false, queueDepth, nil)
return 0, nil
}

func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *dal.AsyncCall) error {
catchVerb, ok := call.CatchVerb.Get()
if !ok {
logger.Warnf("Async call %s could not catch, missing catch verb", call.Verb)
return fmt.Errorf("async call %s could not catch, missing catch verb", call.Verb)
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

case dal.AsyncOriginPubSub:
return s.pubSub.OnCallCompletion(ctx, tx, origin, failed)
originalError := call.Error.Default("unknown error")
originalResult := either.RightOf[[]byte](originalError)

default:
panic(fmt.Errorf("unsupported async call origin: %v", call.Origin))
}
request := map[string]any{
"request": call.Request,
"error": originalError,
}
body, err := json.Marshal(request)
if err != nil {
logger.Warnf("Async call %s could not marshal body while catching", call.Verb)
return fmt.Errorf("async call %s could not marshal body while catching", call.Verb)
}

req := &ftlv1.CallRequest{
Verb: catchVerb.ToProto(),
Body: body,
}
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String())
var catchResult either.Either[[]byte, string]
if err != nil {
// Could not call catch verb
logger.Warnf("Async call %s could not call catch verb %s: %s", call.Verb, catchVerb, err)
observability.AsyncCalls.Executed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, true, optional.Some("async call could not be called"))
catchResult = either.RightOf[[]byte](err.Error())
} else if perr := resp.Msg.GetError(); perr != nil {
// Catch verb failed
logger.Warnf("Async call %s had an error while catching (%s): %s", call.Verb, catchVerb, perr.Message)
observability.AsyncCalls.Executed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, true, optional.Some("async call failed"))
catchResult = either.RightOf[[]byte](perr.Message)
} else {
observability.AsyncCalls.Executed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, true, optional.None[string]())
catchResult = either.LeftOf[string](resp.Msg.GetBody())
}
queueDepth := call.QueueDepth
didScheduleAnotherCall, err := s.dal.CompleteAsyncCall(ctx, call, catchResult, func(tx *dal.Tx, isFinalResult bool) error {
// Exposes the original error to external components such as PubSub and FSM
return s.finaliseAsyncCall(ctx, tx, call, originalResult, isFinalResult)
})
if err != nil {
if failed && call.RemainingAttempts > 0 {
// If the call failed and we had more remaining attempts, then we
// would have incremented queue depth to account for the retry,
// but if CompleteAsyncCall failed, then we failed to enqueue that
// retry, so queue depth needs to be decremented back accordingly.
queueDepth--
}
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, queueDepth, err)
return 0, fmt.Errorf("failed to complete async call: %w", err)
logger.Errorf(err, "Async call %s could not complete after catching (%s)", call.Verb, catchVerb)
observability.AsyncCalls.Completed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, true, queueDepth, err)
return fmt.Errorf("async call %s could not complete after catching (%s): %w", call.Verb, catchVerb, err)
}
if !didScheduleAnotherCall {
// Queue depth is queried at acquisition time, which means it includes the async
// call that was just executed so we need to decrement
queueDepth = call.QueueDepth - 1
}
logger.Debugf("Caught async call %s with %s", call.Verb, catchVerb)
observability.AsyncCalls.Completed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, true, queueDepth, nil)
return nil
}

func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.AsyncCall, callResult either.Either[[]byte, string], isFinalResult bool) error {
if !isFinalResult {
// Will retry, do not propagate yet.
return nil
}
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, queueDepth, nil)
go func() {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
break

case dal.AsyncOriginPubSub:
s.pubSub.AsyncCallDidCommit(ctx, origin)
_, failed := callResult.(either.Right[[]byte, string])

default:
break
// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed); err != nil {
return fmt.Errorf("failed to finalize FSM async call: %w", err)
}
}()

return 0, nil
case dal.AsyncOriginPubSub:
if err := s.pubSub.OnCallCompletion(ctx, tx, origin, failed); err != nil {
return fmt.Errorf("failed to finalize pubsub async call: %w", err)
}

default:
panic(fmt.Errorf("unsupported async call origin: %v", call.Origin))
}
return nil
}

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginFSM, failed bool) error {
Expand Down
3 changes: 3 additions & 0 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 47 additions & 7 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/alecthomas/participle/v2"
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/sql"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
Expand Down Expand Up @@ -74,13 +75,17 @@ type AsyncCall struct {
ID int64
Origin AsyncOrigin
Verb schema.RefKey
CatchVerb optional.Option[schema.RefKey]
Request json.RawMessage
ScheduledAt time.Time
QueueDepth int64

Error optional.Option[string]

RemainingAttempts int32
Backoff time.Duration
MaxBackoff time.Duration
Catching bool
}

// AcquireAsyncCall acquires a pending async call to execute.
Expand Down Expand Up @@ -118,32 +123,40 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
ID: row.AsyncCallID,
Verb: row.Verb,
Origin: origin,
CatchVerb: row.CatchVerb,
Request: decryptedRequest,
Lease: lease,
ScheduledAt: row.ScheduledAt,
QueueDepth: row.QueueDepth,
RemainingAttempts: row.RemainingAttempts,
Error: row.Error,
Backoff: row.Backoff,
MaxBackoff: row.MaxBackoff,
Catching: row.Catching,
}, nil
}

// CompleteAsyncCall completes an async call.
//
// "result" is either a []byte representing the successful response, or a string
// representing a failure message.
func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result either.Either[[]byte, string], finalise func(tx *Tx) error) (err error) {
func (d *DAL) CompleteAsyncCall(ctx context.Context,
call *AsyncCall,
result either.Either[[]byte, string],
finalise func(tx *Tx, isFinalResult bool) error) (didScheduleAnotherCall bool, err error) {
tx, err := d.Begin(ctx)
if err != nil {
return dalerrs.TranslatePGError(err)
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
defer tx.CommitOrRollback(ctx, &err)

isFinalResult := true
didScheduleAnotherCall = false
switch result := result.(type) {
case either.Left[[]byte, string]: // Successful response.
_, err = tx.db.SucceedAsyncCall(ctx, result.Get(), call.ID)
if err != nil {
return dalerrs.TranslatePGError(err)
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}

case either.Right[[]byte, string]: // Failure message.
Expand All @@ -157,17 +170,44 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result eit
ScheduledAt: time.Now().Add(call.Backoff),
})
if err != nil {
return dalerrs.TranslatePGError(err)
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
isFinalResult = false
didScheduleAnotherCall = true
} else if call.RemainingAttempts == 0 && call.CatchVerb.Ok() {
// original error is the last error that occurred before we started to catch
originalError := call.Error.Default(result.Get())
// scheduledAt should be immediate if this is our first catch attempt, otherwise we should use backoff
scheduledAt := time.Now()
if call.Catching {
scheduledAt = scheduledAt.Add(call.Backoff)
}
_, err = d.db.FailAsyncCallWithRetry(ctx, sql.FailAsyncCallWithRetryParams{
ID: call.ID,
Error: result.Get(),
RemainingAttempts: 0,
Backoff: call.Backoff, // maintain backoff
MaxBackoff: call.MaxBackoff,
ScheduledAt: scheduledAt,
Catching: true,
OriginalError: optional.Some(originalError),
})
if err != nil {
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
isFinalResult = false
didScheduleAnotherCall = true
} else {
_, err = tx.db.FailAsyncCall(ctx, result.Get(), call.ID)
if err != nil {
return dalerrs.TranslatePGError(err)
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
}
}
}

return finalise(tx)
if err := finalise(tx, isFinalResult); err != nil {
return false, err
}
return didScheduleAnotherCall, nil
}

func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) {
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi
RemainingAttempts: int32(retryParams.Count),
Backoff: retryParams.MinBackoff,
MaxBackoff: retryParams.MaxBackoff,
CatchVerb: retryParams.Catch,
})
observability.AsyncCalls.Created(ctx, destinationState, origin.String(), int64(retryParams.Count), err)
observability.AsyncCalls.Created(ctx, destinationState, retryParams.Catch, origin.String(), int64(retryParams.Count), err)
if err != nil {
return fmt.Errorf("failed to create FSM async call: %w", dalerrs.TranslatePGError(err))
}
Expand Down
Loading

0 comments on commit cd2b6c1

Please sign in to comment.