Skip to content

Commit

Permalink
feat: implement DAL functions for full async lifecycle (#1382)
Browse files Browse the repository at this point in the history
Creating an async call, acquiring a call for execution, and finalising
it either successfully or through error.

Also implemented the code in the controller to perform this execution
and finalisation.
  • Loading branch information
alecthomas authored May 2, 2024
1 parent 14e04e4 commit a833d39
Show file tree
Hide file tree
Showing 17 changed files with 299 additions and 51 deletions.
72 changes: 65 additions & 7 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,27 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

// Use min, max backoff if we are running in production, otherwise use develBackoff if available.
// Use min, max backoff if we are running in production, otherwise use
// (1s, 1s) or develBackoff if available.
maybeDevelBackoff := func(min, max time.Duration, develBackoff ...backoff.Backoff) backoff.Backoff {
if len(develBackoff) > 1 {
panic("too many devel backoffs")
}
if _, devel := runnerScaling.(*localscaling.LocalScaling); devel && len(develBackoff) == 1 {
return develBackoff[0]
if _, devel := runnerScaling.(*localscaling.LocalScaling); devel {
if len(develBackoff) == 1 {
return develBackoff[0]
}
return backoff.Backoff{Min: time.Second, Max: time.Second}
}
return makeBackoff(min, max)
}

// Parallel tasks.
svc.tasks.Parallel(maybeDevelBackoff(time.Second, time.Second*5), svc.syncRoutes)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*3, time.Second*5, makeBackoff(time.Second*2, time.Second*2)), svc.heartbeatController)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*3, time.Second*5), svc.heartbeatController)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*5, time.Second*5), svc.updateControllersList)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*5, time.Second*10), svc.executeAsyncCalls)

// This should be a singleton task, but because this is the task that
// actually expires the leases used to run singleton tasks, it must be
// parallel.
Expand Down Expand Up @@ -649,7 +655,12 @@ func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque
return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), "")
}

func (s *Service) callWithRequest(ctx context.Context, req *connect.Request[ftlv1.CallRequest], key optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
func (s *Service) callWithRequest(
ctx context.Context,
req *connect.Request[ftlv1.CallRequest],
key optional.Option[model.RequestKey],
sourceAddress string,
) (*connect.Response[ftlv1.CallResponse], error) {
start := time.Now()
if req.Msg.Verb == nil {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("verb is required"))
Expand Down Expand Up @@ -709,15 +720,15 @@ func (s *Service) callWithRequest(ctx context.Context, req *connect.Request[ftlv
return nil, err
} else if !ok {
requestKey = model.NewRequestKey(model.OriginIngress, "grpc")
requestSource = req.Peer().Addr
sourceAddress = req.Peer().Addr
isNewRequestKey = true
} else {
requestKey = k
}
}
if isNewRequestKey {
headers.SetRequestKey(req.Header(), requestKey)
if err = s.dal.CreateRequest(ctx, requestKey, requestSource); err != nil {
if err = s.dal.CreateRequest(ctx, requestKey, sourceAddress); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1002,6 +1013,53 @@ func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) {
return time.Second, nil
}

func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
logger.Tracef("Acquiring async call")

call, err := s.dal.AcquireAsyncCall(ctx)
if errors.Is(err, dal.ErrNotFound) {
return time.Second * 2, nil
} else if err != nil {
return 0, err
}
defer call.Release() //nolint:errcheck

logger = logger.Scope(fmt.Sprintf("%s:%s:%s", call.Origin, call.OriginKey, call.Verb))

logger.Tracef("Executing async call")
req := &ftlv1.CallRequest{ //nolint:forcetypeassert
Verb: call.Verb.ToProto().(*schemapb.Ref),
Body: call.Request,
}
resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String())
if err != nil {
return 0, fmt.Errorf("async call failed: %w", err)
}
var callError optional.Option[string]
if perr := resp.Msg.GetError(); perr != nil {
logger.Warnf("Async call failed: %s", perr.Message)
callError = optional.Some(perr.Message)
} else {
logger.Debugf("Async call succeeded")
}
err = s.dal.CompleteAsyncCall(ctx, call, resp.Msg.GetBody(), callError)
if err != nil {
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
switch call.Origin {
case dal.AsyncCallOriginFSM:
return time.Second * 2, s.onAsyncFSMCallCompletion(ctx, call, resp.Msg)

default:
panic(fmt.Sprintf("unexpected async call origin: %s", call.Origin))
}
}

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, call *dal.AsyncCall, response *ftlv1.CallResponse) error {
return nil
}

func (s *Service) expireStaleLeases(ctx context.Context) (time.Duration, error) {
err := s.dal.ExpireLeases(ctx)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -145,7 +146,7 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod
}
newJobs = append(newJobs, model.CronJob{
Key: model.NewCronJobKey(module.Name, verb.Verb.Name),
Verb: model.VerbRef{Module: module.Name, Name: verb.Verb.Name},
Verb: schema.Ref{Module: module.Name, Name: verb.Verb.Name},
Schedule: cronStr,
StartTime: start,
NextExecution: next,
Expand Down
11 changes: 6 additions & 5 deletions backend/controller/cronjobs/cronjobs_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ import (
"time"

"connectrpc.com/connect"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"

db "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"
"github.com/jpillora/backoff"
)

type ExtendedDAL interface {
Expand Down Expand Up @@ -161,7 +162,7 @@ func newJobs(t *testing.T, moduleName string, cronPattern string, clock clock.Cl
assert.NoError(t, err)
newJobs = append(newJobs, model.CronJob{
Key: model.NewCronJobKey(moduleName, fmt.Sprintf("verb%d", i)),
Verb: model.VerbRef{Module: moduleName, Name: fmt.Sprintf("verb%d", i)},
Verb: schema.Ref{Module: moduleName, Name: fmt.Sprintf("verb%d", i)},
Schedule: pattern.String(),
StartTime: now,
NextExecution: next,
Expand Down
64 changes: 60 additions & 4 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"
"time"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
)
Expand All @@ -29,16 +31,35 @@ func (d *DAL) SendFSMEvent(ctx context.Context, name, executionKey, destinationS
Key: executionKey,
Name: name,
State: destinationState,
Verb: verb.String(),
Verb: verb,
Request: request,
})
return translatePGError(err)
}

// AsyncCallOrigin represents the kind of originator of the async call.
type AsyncCallOrigin sql.AsyncCallOrigin

const (
AsyncCallOriginFSM = AsyncCallOrigin(sql.AsyncCallOriginFsm)
AsyncCallOriginCron = AsyncCallOrigin(sql.AsyncCallOriginCron)
AsyncCallOriginPubSub = AsyncCallOrigin(sql.AsyncCallOriginPubsub)
)

type AsyncCall struct {
*Lease
ID int64
Origin AsyncCallOrigin
// A key identifying the origin, e.g. the key of the FSM, cron job reference, etc.
OriginKey string
Verb schema.Ref
Request json.RawMessage
}

// AcquireAsyncCall acquires a pending async call to execute.
//
// Returns ErrNotFound if there are no async calls to acquire.
func (d *DAL) AcquireAsyncCall(ctx context.Context) (*Lease, error) {
func (d *DAL) AcquireAsyncCall(ctx context.Context) (*AsyncCall, error) {
ttl := time.Second * 5
row, err := d.db.AcquireAsyncCall(ctx, ttl)
if err != nil {
Expand All @@ -47,7 +68,42 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (*Lease, error) {
if errors.Is(err, ErrConstraint) {
return nil, fmt.Errorf("no pending async calls: %w", ErrNotFound)
}
return nil, err
return nil, fmt.Errorf("failed to acquire async call: %w", err)
}
return &AsyncCall{
ID: row.AsyncCallID,
Verb: row.Verb,
Origin: AsyncCallOrigin(row.Origin),
OriginKey: row.OriginKey,
Request: row.Request,
Lease: d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl),
}, nil
}

// CompleteAsyncCall completes an async call.
//
// Either [response] or [responseError] must be provided, but not both.
func (d *DAL) CompleteAsyncCall(ctx context.Context, call *AsyncCall, response []byte, responseError optional.Option[string]) error {
if (response == nil) != responseError.Ok() {
return fmt.Errorf("must provide exactly one of response or error")
}
_, err := d.db.CompleteAsyncCall(ctx, response, responseError, call.ID)
if err != nil {
return translatePGError(err)
}
return nil
}

func (d *DAL) LoadAsyncCall(ctx context.Context, id int64) (*AsyncCall, error) {
row, err := d.db.LoadAsyncCall(ctx, id)
if err != nil {
return nil, translatePGError(err)
}
return d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl), nil
return &AsyncCall{
ID: row.ID,
Verb: row.Verb,
Origin: AsyncCallOrigin(row.Origin),
OriginKey: row.OriginKey,
Request: row.Request,
}, nil
}
29 changes: 25 additions & 4 deletions backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/schema"
Expand All @@ -20,15 +21,35 @@ func TestSendFSMEvent(t *testing.T) {
_, err = dal.AcquireAsyncCall(ctx)
assert.IsError(t, err, ErrNotFound)

err = dal.SendFSMEvent(ctx, "test", "test", "state", schema.Ref{Module: "module", Name: "verb"}, []byte(`{}`))
ref := schema.Ref{Module: "module", Name: "verb"}
err = dal.SendFSMEvent(ctx, "test", "invoiceID", "state", ref, []byte(`{}`))
assert.NoError(t, err)

lease, err := dal.AcquireAsyncCall(ctx)
call, err := dal.AcquireAsyncCall(ctx)
assert.NoError(t, err)
t.Cleanup(func() {
err := lease.Release()
err := call.Lease.Release()
assert.NoError(t, err)
})

assert.HasPrefix(t, lease.String(), "/system/async_call/1:")
assert.HasPrefix(t, call.Lease.String(), "/system/async_call/1:")
expectedCall := &AsyncCall{
ID: 1,
Lease: call.Lease,
Origin: AsyncCallOriginFSM,
OriginKey: "invoiceID",
Verb: ref,
Request: []byte(`{}`),
}
assert.Equal(t, expectedCall, call)

err = dal.CompleteAsyncCall(ctx, call, nil, optional.None[string]())
assert.EqualError(t, err, "must provide exactly one of response or error")

err = dal.CompleteAsyncCall(ctx, call, []byte(`{}`), optional.None[string]())
assert.NoError(t, err)

actual, err := dal.LoadAsyncCall(ctx, call.ID)
assert.NoError(t, err)
assert.Equal(t, call, actual, assert.Exclude[*Lease]())
}
4 changes: 2 additions & 2 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func cronJobFromRow(row sql.GetCronJobsRow) model.CronJob {
return model.CronJob{
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Verb: model.VerbRef{Module: row.Module, Name: row.Verb},
Verb: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
StartTime: row.StartTime,
NextExecution: row.NextExecution,
Expand Down Expand Up @@ -949,7 +949,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []model.CronJob) (attempte
CronJob: model.CronJob{
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Verb: model.VerbRef{Module: row.Module, Name: row.Verb},
Verb: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
StartTime: row.StartTime,
NextExecution: row.NextExecution,
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/scheduledtask/scheduledtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (s *Scheduler) run(ctx context.Context) {
if errors.Is(err, leases.ErrConflict) {
logger.Scope(job.name).Tracef("Lease is held by another controller, will try again shortly.")
} else {
logger.Scope(job.name).Warnf("Failed to acquire lease: %s", err)
logger.Scope(job.name).Debugf("Failed to acquire lease: %s", err)
}
job.next = s.clock.Now().Add(job.retry.Duration())
continue
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a833d39

Please sign in to comment.