Skip to content

Commit

Permalink
refactor: add a reusable DAL helper (#2492)
Browse files Browse the repository at this point in the history
This removes a bunch of duplicated boilerplate for all the DAL
implementations.
  • Loading branch information
alecthomas authored Aug 24, 2024
1 parent 723c84f commit ed98645
Show file tree
Hide file tree
Showing 28 changed files with 533 additions and 530 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ linters:
- tagalign
- nolintlint
- protogetter
- thelper

linters-settings:
exhaustive:
Expand Down
10 changes: 5 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.S

// schedules an event for a FSM instance within a db transaction
// body may already be encrypted, which is denoted by the encrypted flag
func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.Tx, instance *dal.FSMInstance, fsm *schema.FSM, eventType schema.Type, body []byte, encrypted bool) error {
func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.DAL, instance *dal.FSMInstance, fsm *schema.FSM, eventType schema.Type, body []byte, encrypted bool) error {
// Populated if we find a matching transition.
var destinationRef *schema.Ref
var destinationVerb *schema.Verb
Expand Down Expand Up @@ -1484,7 +1484,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
}

queueDepth := call.QueueDepth
didScheduleAnotherCall, err := s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx, isFinalResult bool) error {
didScheduleAnotherCall, err := s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.DAL, isFinalResult bool) error {
return s.finaliseAsyncCall(ctx, tx, call, callResult, isFinalResult)
})
if err != nil {
Expand Down Expand Up @@ -1556,7 +1556,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
catchResult = either.LeftOf[string](resp.Msg.GetBody())
}
queueDepth := call.QueueDepth
didScheduleAnotherCall, err := s.dal.CompleteAsyncCall(ctx, call, catchResult, func(tx *dal.Tx, isFinalResult bool) error {
didScheduleAnotherCall, err := s.dal.CompleteAsyncCall(ctx, call, catchResult, func(tx *dal.DAL, isFinalResult bool) error {
// Exposes the original error to external components such as PubSub and FSM
return s.finaliseAsyncCall(ctx, tx, call, originalResult, isFinalResult)
})
Expand Down Expand Up @@ -1602,7 +1602,7 @@ func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata {
}
}

func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.AsyncCall, callResult either.Either[[]byte, string], isFinalResult bool) error {
func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.DAL, call *dal.AsyncCall, callResult either.Either[[]byte, string], isFinalResult bool) error {
_, failed := callResult.(either.Right[[]byte, string])

// Allow for handling of completion based on origin
Expand All @@ -1628,7 +1628,7 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.A
return nil
}

func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.Tx, origin dal.AsyncOriginFSM, failed bool, isFinalResult bool) error {
func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, origin dal.AsyncOriginFSM, failed bool, isFinalResult bool) error {
logger := log.FromContext(ctx).Scope(origin.FSM.String())

// retrieve the next fsm event and delete it
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *Service) OnJobCompletion(ctx context.Context, key model.CronJobKey, fai
}

// scheduleCronJob schedules the next execution of a single cron job.
func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.Tx, job model.CronJob) error {
func (s *Service) scheduleCronJob(ctx context.Context, tx *dal.DAL, job model.CronJob) error {
logger := log.FromContext(ctx).Scope("cron")
now := s.clock.Now().UTC()
pending, err := tx.IsCronJobPending(ctx, job.Key, now)
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestNewCronJobsForModule(t *testing.T) {
// Complete all calls
for _, call := range calls {
callResult := either.LeftOf[string]([]byte("{}"))
_, err = parentDAL.CompleteAsyncCall(ctx, call, callResult, func(tx *parentdal.Tx, isFinalResult bool) error {
_, err = parentDAL.CompleteAsyncCall(ctx, call, callResult, func(tx *parentdal.DAL, isFinalResult bool) error {
return nil
})
assert.NoError(t, err)
Expand Down
66 changes: 14 additions & 52 deletions backend/controller/cronjobs/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,62 +9,24 @@ import (

"github.com/TBD54566975/ftl/backend/controller/cronjobs/sql"
"github.com/TBD54566975/ftl/backend/controller/observability"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/dal"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/slices"
)

type DAL struct {
db sql.DBI
*dal.Handle[DAL]
db sql.Querier
}

func New(conn sql.ConnI) *DAL {
return &DAL{db: sql.NewDB(conn)}
}

type Tx struct {
*DAL
}

func (d *DAL) Begin(ctx context.Context) (*Tx, error) {
tx, err := d.db.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", dalerrs.TranslatePGError(err))
}
return &Tx{DAL: &DAL{db: tx}}, nil
}

func (t *Tx) CommitOrRollback(ctx context.Context, err *error) {
tx, ok := t.db.(*sql.Tx)
if !ok {
panic("inconceivable")
}
tx.CommitOrRollback(ctx, err)
}

func (t *Tx) Commit(ctx context.Context) error {
tx, ok := t.db.(*sql.Tx)
if !ok {
panic("inconcievable")
}
err := tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", dalerrs.TranslatePGError(err))
}
return nil
}

func (t *Tx) Rollback(ctx context.Context) error {
tx, ok := t.db.(*sql.Tx)
if !ok {
panic("inconcievable")
func New(conn dal.Connection) *DAL {
return &DAL{
db: sql.New(conn),
Handle: dal.New(conn, func(h *dal.Handle[DAL]) *DAL {
return &DAL{Handle: h, db: sql.New(h.Connection)}
}),
}
err := tx.Rollback(ctx)
if err != nil {
return fmt.Errorf("failed to rollback transaction: %w", dalerrs.TranslatePGError(err))
}
return nil
}

func cronJobFromRow(c sql.CronJob, d sql.Deployment) model.CronJob {
Expand All @@ -83,7 +45,7 @@ func cronJobFromRow(c sql.CronJob, d sql.Deployment) model.CronJob {
func (d *DAL) CreateAsyncCall(ctx context.Context, params sql.CreateAsyncCallParams) (int64, error) {
id, err := d.db.CreateAsyncCall(ctx, params)
if err != nil {
return 0, fmt.Errorf("failed to create async call: %w", dalerrs.TranslatePGError(err))
return 0, fmt.Errorf("failed to create async call: %w", dal.TranslatePGError(err))
}
observability.AsyncCalls.Created(ctx, params.Verb, optional.None[schema.RefKey](), params.Origin, 0, err)
queueDepth, err := d.db.AsyncCallQueueDepth(ctx)
Expand All @@ -100,7 +62,7 @@ func (d *DAL) CreateAsyncCall(ctx context.Context, params sql.CreateAsyncCallPar
func (d *DAL) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]model.CronJob, error) {
rows, err := d.db.GetUnscheduledCronJobs(ctx, startTime)
if err != nil {
return nil, fmt.Errorf("failed to get cron jobs: %w", dalerrs.TranslatePGError(err))
return nil, fmt.Errorf("failed to get cron jobs: %w", dal.TranslatePGError(err))
}
return slices.Map(rows, func(r sql.GetUnscheduledCronJobsRow) model.CronJob {
return cronJobFromRow(r.CronJob, r.Deployment)
Expand All @@ -111,7 +73,7 @@ func (d *DAL) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) (
func (d *DAL) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (model.CronJob, error) {
row, err := d.db.GetCronJobByKey(ctx, key)
if err != nil {
return model.CronJob{}, fmt.Errorf("failed to get cron job %q: %w", key, dalerrs.TranslatePGError(err))
return model.CronJob{}, fmt.Errorf("failed to get cron job %q: %w", key, dal.TranslatePGError(err))
}
return cronJobFromRow(row.CronJob, row.Deployment), nil
}
Expand All @@ -120,7 +82,7 @@ func (d *DAL) GetCronJobByKey(ctx context.Context, key model.CronJobKey) (model.
func (d *DAL) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) {
pending, err := d.db.IsCronJobPending(ctx, key, startTime)
if err != nil {
return false, fmt.Errorf("failed to check if cron job %q is pending: %w", key, dalerrs.TranslatePGError(err))
return false, fmt.Errorf("failed to check if cron job %q is pending: %w", key, dal.TranslatePGError(err))
}
return pending, nil
}
Expand All @@ -130,7 +92,7 @@ func (d *DAL) IsCronJobPending(ctx context.Context, key model.CronJobKey, startT
func (d *DAL) UpdateCronJobExecution(ctx context.Context, params sql.UpdateCronJobExecutionParams) error {
err := d.db.UpdateCronJobExecution(ctx, params)
if err != nil {
return fmt.Errorf("failed to update cron job %q: %w", params.Key, dalerrs.TranslatePGError(err))
return fmt.Errorf("failed to update cron job %q: %w", params.Key, dal.TranslatePGError(err))
}
return nil
}
94 changes: 0 additions & 94 deletions backend/controller/cronjobs/sql/conn.go

This file was deleted.

2 changes: 1 addition & 1 deletion backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
func (d *DAL) CompleteAsyncCall(ctx context.Context,
call *AsyncCall,
result either.Either[[]byte, string],
finalise func(tx *Tx, isFinalResult bool) error) (didScheduleAnotherCall bool, err error) {
finalise func(tx *DAL, isFinalResult bool) error) (didScheduleAnotherCall bool, err error) {
tx, err := d.Begin(ctx)
if err != nil {
return false, dalerrs.TranslatePGError(err) //nolint:wrapcheck
Expand Down
Loading

0 comments on commit ed98645

Please sign in to comment.