Skip to content

Commit

Permalink
feat: support async calls in the database layer (#1371)
Browse files Browse the repository at this point in the history
The initial use case is for FSM support, but this will be the framework
we use for receiving PubSub events, callbacks, possibly cron jobs, etc.
Basically any form of asynchronous call.
  • Loading branch information
alecthomas authored May 1, 2024
1 parent 5990462 commit fdd90eb
Show file tree
Hide file tree
Showing 15 changed files with 506 additions and 38 deletions.
10 changes: 7 additions & 3 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ clean:

# Live rebuild the ftl binary whenever source changes.
live-rebuild:
watchexec -e go -- just build ftl
watchexec -e go -e sql -f sqlc.yaml -- "just build-sqlc && just build ftl"

# Run "ftl dev" with live-reloading whenever source changes.
dev *args:
watchexec -r -e go -e sql -f sqlc.yaml -- "just build-sqlc && ftl dev {{args}}"

# Build everything
build-all: build-frontend build-generate build-kt-runtime build-protos build-sqlc build-zips
Expand All @@ -56,8 +60,8 @@ init-db:
dbmate --migrations-dir backend/controller/sql/schema up

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc: init-db
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- sqlc generate
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand Down
53 changes: 53 additions & 0 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package dal

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/schema"
)

// SendFSMEvent sends an event to an executing instance of an FSM.
//
// If the instance doesn't exist a new one will be created.
//
// [name] is the name of the state machine to execute, [executionKey] is the
// unique identifier for this execution of the FSM.
//
// Returns ErrConflict if the state machine is already executing.
//
// Note: this does not actually call the FSM, it just enqueues an async call for
// future execution.
//
// Note: no validation of the FSM is performed.
func (d *DAL) SendFSMEvent(ctx context.Context, name, executionKey, destinationState string, verb schema.Ref, request json.RawMessage) error {
_, err := d.db.SendFSMEvent(ctx, sql.SendFSMEventParams{
Key: executionKey,
Name: name,
State: destinationState,
Verb: verb.String(),
Request: request,
})
return translatePGError(err)
}

// AcquireAsyncCall acquires a pending async call to execute.
//
// Returns ErrNotFound if there are no async calls to acquire.
func (d *DAL) AcquireAsyncCall(ctx context.Context) (*Lease, error) {
ttl := time.Second * 5
row, err := d.db.AcquireAsyncCall(ctx, ttl)
if err != nil {
err = translatePGError(err)
// We get a NULL constraint violation if there are no async calls to acquire, so translate it to ErrNotFound.
if errors.Is(err, ErrConstraint) {
return nil, fmt.Errorf("no pending async calls: %w", ErrNotFound)
}
return nil, err
}
return d.newLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl), nil
}
34 changes: 34 additions & 0 deletions backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package dal

import (
"context"
"testing"

"github.com/alecthomas/assert/v2"

"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
)

func TestSendFSMEvent(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn)
assert.NoError(t, err)

_, err = dal.AcquireAsyncCall(ctx)
assert.IsError(t, err, ErrNotFound)

err = dal.SendFSMEvent(ctx, "test", "test", "state", schema.Ref{Module: "module", Name: "verb"}, []byte(`{}`))
assert.NoError(t, err)

lease, err := dal.AcquireAsyncCall(ctx)
assert.NoError(t, err)
t.Cleanup(func() {
err := lease.Release()
assert.NoError(t, err)
})

assert.HasPrefix(t, lease.String(), "/system/async_call/1:")
}
14 changes: 12 additions & 2 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
ErrConflict = errors.New("conflict")
// ErrNotFound is returned by select methods in the DAL when no results are found.
ErrNotFound = errors.New("not found")
// ErrConstraint is returned by select methods in the DAL when a constraint is violated.
ErrConstraint = errors.New("constraint violation")
)

type IngressRoute struct {
Expand Down Expand Up @@ -1148,10 +1150,18 @@ func translatePGError(err error) error {
}
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
if pgErr.Code == pgerrcode.ForeignKeyViolation {
switch pgErr.Code {
case pgerrcode.ForeignKeyViolation:
return fmt.Errorf("%s: %w", strings.TrimSuffix(strings.TrimPrefix(pgErr.ConstraintName, pgErr.TableName+"_"), "_id_fkey"), ErrNotFound)
} else if pgErr.Code == pgerrcode.UniqueViolation {
case pgerrcode.UniqueViolation:
return ErrConflict
case pgerrcode.IntegrityConstraintViolation,
pgerrcode.RestrictViolation,
pgerrcode.NotNullViolation,
pgerrcode.CheckViolation,
pgerrcode.ExclusionViolation:
return fmt.Errorf("%s: %w", pgErr.Message, ErrConstraint)
default:
}
} else if isNotFound(err) {
return ErrNotFound
Expand Down
12 changes: 7 additions & 5 deletions backend/controller/dal/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ var _ leases.Leaser = (*DAL)(nil)

// Lease represents a lease that is held by a controller.
type Lease struct {
idempotencyKey uuid.UUID
context any
key leases.Key
idempotencyKey uuid.UUID
db *sql.DB
ttl time.Duration
errch chan error
Expand Down Expand Up @@ -72,21 +71,24 @@ func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duratio
if ttl < time.Second*5 {
return nil, fmt.Errorf("lease TTL must be at least 5 seconds")
}
idempotencyKey, err := d.db.NewLease(ctx, key, time.Now().Add(ttl))
idempotencyKey, err := d.db.NewLease(ctx, key, ttl)
if err != nil {
return nil, translatePGError(err)
}
return d.newLease(ctx, key, idempotencyKey, ttl), nil
}

func (d *DAL) newLease(ctx context.Context, key leases.Key, idempotencyKey uuid.UUID, ttl time.Duration) *Lease {
lease := &Lease{
idempotencyKey: idempotencyKey,
context: nil,
key: key,
db: d.db,
ttl: ttl,
release: make(chan bool),
errch: make(chan error, 1),
}
go lease.renew(ctx)
return lease, nil
return lease
}

// ExpireLeases expires (deletes) all leases that have expired.
Expand Down
7 changes: 5 additions & 2 deletions backend/controller/leases/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,16 @@ func (l *Key) Value() (driver.Value, error) {
}

func ParseLeaseKey(s string) (Key, error) {
parts := strings.Split(s, ".")
if !strings.HasPrefix(s, "/system/") && !strings.HasPrefix(s, "/module/") {
return nil, fmt.Errorf("invalid lease key: %q", s)
}
parts := strings.Split(s, "/")
for i, part := range parts {
var err error
parts[i], err = url.PathUnescape(part)
if err != nil {
return nil, err
}
}
return Key(parts), nil
return Key(parts[1:]), nil
}
158 changes: 158 additions & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit fdd90eb

Please sign in to comment.