Skip to content

Commit

Permalink
refactor: add user-accessible transactions to the DAL (#1536)
Browse files Browse the repository at this point in the history
Up until now all transactions were hidden behind DAL methods, but in
order to cleanly separate the core of async calls from the type-specific
post-execution callbacks, we'll need to pass an open DAL transaction to
the callback.

eg. when an FSM transition async call completes there will be a callback
that will update the executing FSM instance with the destination state.
  • Loading branch information
alecthomas authored May 19, 2024
1 parent 21df777 commit 8264835
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 7 deletions.
57 changes: 55 additions & 2 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,66 @@ func New(ctx context.Context, pool *pgxpool.Pool) (*DAL, error) {
}

type DAL struct {
db *sql.DB
db sql.DBI

// DeploymentChanges is a Topic that receives changes to the deployments table.
DeploymentChanges *pubsub.Topic[DeploymentNotification]
// RouteChanges is a Topic that receives changes to the routing table.
}

// Tx is DAL within a transaction.
type Tx struct {
*DAL
}

// CommitOrRollback can be used in a defer statement to commit or rollback a
// transaction depending on whether the enclosing function returned an error.
//
// func myFunc() (err error) {
// tx, err := dal.Begin(ctx)
// if err != nil { return err }
// defer tx.CommitOrRollback(ctx, &err)
// ...
// }
func (t *Tx) CommitOrRollback(ctx context.Context, err *error) {
tx, ok := t.db.(*sql.Tx)
if !ok {
panic("inconcievable")
}
tx.CommitOrRollback(ctx, err)
}

func (t *Tx) Commit(ctx context.Context) error {
tx, ok := t.db.(*sql.Tx)
if !ok {
panic("inconcievable")
}
return tx.Commit(ctx)
}

func (t *Tx) Rollback(ctx context.Context) error {
tx, ok := t.db.(*sql.Tx)
if !ok {
panic("inconcievable")
}
return tx.Rollback(ctx)
}

func (d *DAL) Begin(ctx context.Context) (*Tx, error) {
db, ok := d.db.(*sql.DB)
if !ok {
return nil, fmt.Errorf("can't nest transactions")
}
stx, err := db.Begin(ctx)
if err != nil {
return nil, translatePGError(err)
}
return &Tx{&DAL{
db: stx,
DeploymentChanges: d.DeploymentChanges,
}}, nil
}

func (d *DAL) GetActiveControllers(ctx context.Context) ([]Controller, error) {
controllers, err := d.db.GetActiveControllers(ctx)
if err != nil {
Expand Down Expand Up @@ -1120,7 +1173,7 @@ func sha256esToBytes(digests []sha256.SHA256) [][]byte {

type artefactReader struct {
id int64
db *sql.DB
db sql.DBI
offset int32
}

Expand Down
2 changes: 1 addition & 1 deletion backend/controller/dal/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var _ leases.Leaser = (*DAL)(nil)
type Lease struct {
key leases.Key
idempotencyKey uuid.UUID
db *sql.DB
db sql.DBI
ttl time.Duration
errch chan error
release chan bool
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/dal/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/TBD54566975/ftl/internal/log"
)

func leaseExists(t *testing.T, conn sql.DBI, idempotencyKey uuid.UUID, key leases.Key) bool {
func leaseExists(t *testing.T, conn sql.ConnI, idempotencyKey uuid.UUID, key leases.Key) bool {
t.Helper()
var count int
err := translatePGError(conn.
Expand Down
18 changes: 15 additions & 3 deletions backend/controller/sql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@ import (
)

type DBI interface {
Querier
Conn() ConnI
Begin(ctx context.Context) (*Tx, error)
}

type ConnI interface {
DBTX
Begin(ctx context.Context) (pgx.Tx, error)
}

type DB struct {
conn DBI
conn ConnI
*Queries
}

func NewDB(conn DBI) *DB {
func NewDB(conn ConnI) *DB {
return &DB{conn: conn, Queries: New(conn)}
}

func (d *DB) Conn() DBI { return d.conn }
func (d *DB) Conn() ConnI { return d.conn }

func (d *DB) Begin(ctx context.Context) (*Tx, error) {
tx, err := d.conn.Begin(ctx)
Expand All @@ -35,6 +41,12 @@ type Tx struct {
*Queries
}

func (t *Tx) Conn() ConnI { return t.tx }

func (t *Tx) Begin(ctx context.Context) (*Tx, error) {
panic("recursive transactions are not supported")
}

func (t *Tx) Commit(ctx context.Context) error {
return t.tx.Commit(ctx)
}
Expand Down
2 changes: 2 additions & 0 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8264835

Please sign in to comment.