Skip to content

Commit

Permalink
feat: add retries for FSMs (#1583)
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e authored May 28, 2024
1 parent 52a8f12 commit 6752939
Show file tree
Hide file tree
Showing 19 changed files with 583 additions and 100 deletions.
22 changes: 17 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@ func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.S
defer instance.Release() //nolint:errcheck

// Populated if we find a matching transition.
var destinationVerb *schema.Ref
var destinationRef *schema.Ref
var destinationVerb *schema.Verb

var candidates []string

Expand All @@ -752,7 +753,8 @@ func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.S
return false, nil
}

destinationVerb = ref
destinationRef = ref
destinationVerb = verb
return true, nil
}

Expand Down Expand Up @@ -780,15 +782,20 @@ func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.S
}
}

if destinationVerb == nil {
if destinationRef == nil {
if len(candidates) > 0 {
return nil, connect.NewError(connect.CodeFailedPrecondition,
fmt.Errorf("no transition found from state %s for type %s, candidates are %s", instance.CurrentState, eventType, strings.Join(candidates, ", ")))
}
return nil, connect.NewError(connect.CodeFailedPrecondition, fmt.Errorf("no transition found from state %s for type %s", instance.CurrentState, eventType))
}

err = tx.StartFSMTransition(ctx, instance.FSM, instance.Key, destinationVerb.ToRefKey(), msg.Body)
retryParams, err := schema.RetryParamsForFSMTransition(fsm, destinationVerb)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}

err = tx.StartFSMTransition(ctx, instance.FSM, instance.Key, destinationRef.ToRefKey(), msg.Body, retryParams)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start fsm transition: %w", err))
}
Expand Down Expand Up @@ -1188,9 +1195,14 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
callResult = either.LeftOf[string](resp.Msg.GetBody())
}
err = s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx) error {
failed := resp.Msg.GetError() != nil
if failed && call.RemainingAttempts > 0 {
// Will retry, do not propagate failure yet.
return nil
}
switch origin := call.Origin.(type) {
case dal.AsyncOriginFSM:
return s.onAsyncFSMCallCompletion(ctx, tx, origin, resp.Msg.GetError() != nil)
return s.onAsyncFSMCallCompletion(ctx, tx, origin, failed)

default:
panic(fmt.Errorf("unsupported async call origin: %v", call.Origin))
Expand Down
50 changes: 37 additions & 13 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/alecthomas/participle/v2"
"github.com/alecthomas/types/either"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
)

Expand Down Expand Up @@ -55,11 +56,16 @@ func ParseAsyncOrigin(origin string) (AsyncOrigin, error) {
}

type AsyncCall struct {
*Lease // May be nil
ID int64
Origin AsyncOrigin
Verb schema.RefKey
Request json.RawMessage
*Lease // May be nil
ID int64
Origin AsyncOrigin
Verb schema.RefKey
Request json.RawMessage
ScheduledAt time.Time

RemainingAttempts int32
Backoff time.Duration
MaxBackoff time.Duration
}

// AcquireAsyncCall acquires a pending async call to execute.
Expand Down Expand Up @@ -87,11 +93,15 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
return nil, fmt.Errorf("failed to parse origin key %q: %w", row.Origin, err)
}
return &AsyncCall{
ID: row.AsyncCallID,
Verb: row.Verb,
Origin: origin,
Request: row.Request,
Lease: d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl),
ID: row.AsyncCallID,
Verb: row.Verb,
Origin: origin,
Request: row.Request,
Lease: d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl),
ScheduledAt: row.ScheduledAt,
RemainingAttempts: row.RemainingAttempts,
Backoff: row.Backoff,
MaxBackoff: row.MaxBackoff,
}, nil
}

Expand All @@ -114,9 +124,23 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, result eit
}

case either.Right[[]byte, string]: // Failure message.
_, err = tx.db.FailAsyncCall(ctx, result.Get(), call.ID)
if err != nil {
return translatePGError(err)
if call.RemainingAttempts > 0 {
_, err = d.db.FailAsyncCallWithRetry(ctx, sql.FailAsyncCallWithRetryParams{
ID: call.ID,
Error: result.Get(),
RemainingAttempts: call.RemainingAttempts - 1,
Backoff: min(call.Backoff*2, call.MaxBackoff),
MaxBackoff: call.MaxBackoff,
ScheduledAt: time.Now().Add(call.Backoff),
})
if err != nil {
return translatePGError(err)
}
} else {
_, err = tx.db.FailAsyncCall(ctx, result.Get(), call.ID)
if err != nil {
return translatePGError(err)
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ import (
// future execution.
//
// Note: no validation of the FSM is performed.
func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executionKey string, destinationState schema.RefKey, request json.RawMessage) (err error) {
func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executionKey string, destinationState schema.RefKey, request json.RawMessage, retryParams schema.RetryParams) (err error) {
// Create an async call for the event.
origin := AsyncOriginFSM{FSM: fsm, Key: executionKey}
asyncCallID, err := d.db.CreateAsyncCall(ctx, destinationState, origin.String(), request)
asyncCallID, err := d.db.CreateAsyncCall(ctx, sql.CreateAsyncCallParams{
Verb: destinationState,
Origin: origin.String(),
Request: request,
RemainingAttempts: int32(retryParams.Count),
Backoff: retryParams.MinBackoff,
MaxBackoff: retryParams.MaxBackoff,
})
if err != nil {
return fmt.Errorf("failed to create FSM async call: %w", translatePGError(err))
}
Expand Down
9 changes: 5 additions & 4 deletions backend/controller/dal/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dal
import (
"context"
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/either"
Expand All @@ -22,10 +23,10 @@ func TestSendFSMEvent(t *testing.T) {
assert.IsError(t, err, ErrNotFound)

ref := schema.RefKey{Module: "module", Name: "verb"}
err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`))
err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`), schema.RetryParams{})
assert.NoError(t, err)

err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`))
err = dal.StartFSMTransition(ctx, schema.RefKey{Module: "test", Name: "test"}, "invoiceID", ref, []byte(`{}`), schema.RetryParams{})
assert.IsError(t, err, ErrConflict)
assert.EqualError(t, err, "transition already executing: conflict")

Expand All @@ -46,12 +47,12 @@ func TestSendFSMEvent(t *testing.T) {
},
Request: []byte(`{}`),
}
assert.Equal(t, expectedCall, call, assert.Exclude[*Lease]())
assert.Equal(t, expectedCall, call, assert.Exclude[*Lease](), assert.Exclude[time.Time]())

err = dal.CompleteAsyncCall(ctx, call, either.LeftOf[string]([]byte(`{}`)), func(tx *Tx) error { return nil })
assert.NoError(t, err)

actual, err := dal.LoadAsyncCall(ctx, call.ID)
assert.NoError(t, err)
assert.Equal(t, call, actual, assert.Exclude[*Lease]())
assert.Equal(t, call, actual, assert.Exclude[*Lease](), assert.Exclude[time.Time]())
}
22 changes: 13 additions & 9 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 22 additions & 5 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ SELECT COUNT(*)
FROM expired;

-- name: CreateAsyncCall :one
INSERT INTO async_calls (verb, origin, request)
VALUES (@verb, @origin, @request)
INSERT INTO async_calls (verb, origin, request, remaining_attempts, backoff, max_backoff)
VALUES (@verb, @origin, @request, @remaining_attempts, @backoff::interval, @max_backoff::interval)
RETURNING id;

-- name: AcquireAsyncCall :one
Expand All @@ -495,7 +495,7 @@ RETURNING id;
WITH async_call AS (
SELECT id
FROM async_calls
WHERE state = 'pending'
WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc')
LIMIT 1
FOR UPDATE SKIP LOCKED
), lease AS (
Expand All @@ -512,7 +512,11 @@ RETURNING
(SELECT key FROM lease) AS lease_key,
origin,
verb,
request;
request,
scheduled_at,
remaining_attempts,
backoff,
max_backoff;

-- name: SucceedAsyncCall :one
UPDATE async_calls
Expand All @@ -530,6 +534,19 @@ SET
WHERE id = @id
RETURNING true;

-- name: FailAsyncCallWithRetry :one
WITH updated AS (
UPDATE async_calls
SET state = 'error'::async_call_state,
error = @error::TEXT
WHERE id = @id::BIGINT
RETURNING *
)
INSERT INTO async_calls (verb, origin, request, remaining_attempts, backoff, max_backoff, scheduled_at)
SELECT updated.verb, updated.origin, updated.request, @remaining_attempts, @backoff::interval, @max_backoff::interval, @scheduled_at::TIMESTAMPTZ
FROM updated
RETURNING true;

-- name: LoadAsyncCall :one
SELECT *
FROM async_calls
Expand Down Expand Up @@ -590,7 +607,7 @@ RETURNING true;
-- name: FailFSMInstance :one
UPDATE fsm_instances
SET
current_state = '',
current_state = NULL,
async_call_id = NULL,
status = 'failed'::fsm_status
WHERE
Expand Down
Loading

0 comments on commit 6752939

Please sign in to comment.