From f58ec6aee447bfce589a2b85bb442a4d3f9e300f Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Thu, 12 Sep 2024 13:12:44 +1000 Subject: [PATCH] fix: leaky transactions on error If we are using the 'defer tx.CommitOrRollback(ctx, &err)' pattern this will not roll back on error if the err variable has been shadowed in a code block, as the outer err is not updated. fixes #2655 --- backend/controller/controller.go | 2 +- backend/controller/dal/async_calls.go | 4 ++-- backend/controller/dal/dal.go | 4 ++-- backend/controller/dal/pubsub.go | 10 ++++++---- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 0a5ae5ace6..fa305017e8 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -1451,7 +1451,7 @@ func (s *Service) reapAsyncCalls(ctx context.Context) (nextInterval time.Duratio } for _, call := range calls { callResult := either.RightOf[[]byte]("async call lease expired") - _, err := tx.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.DAL, isFinalResult bool) error { + _, err = tx.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.DAL, isFinalResult bool) error { return s.finaliseAsyncCall(ctx, tx, call, callResult, isFinalResult) }) if err != nil { diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 909e03713c..8ced9dcb3a 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -192,7 +192,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, switch result := result.(type) { case either.Left[[]byte, string]: // Successful response. var encryptedResult encryption.EncryptedAsyncColumn - err := tx.encrypt(result.Get(), &encryptedResult) + err = tx.encrypt(result.Get(), &encryptedResult) if err != nil { return false, fmt.Errorf("failed to encrypt async call result: %w", err) } @@ -246,7 +246,7 @@ func (d *DAL) CompleteAsyncCall(ctx context.Context, } } } - if err := finalise(tx, isFinalResult); err != nil { + if err = finalise(tx, isFinalResult); err != nil { return false, err } return didScheduleAnotherCall, nil diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 943de4d4ea..d66bc38eb4 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -430,7 +430,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem if !ok { continue } - err := tx.db.UpsertTopic(ctx, dalsql.UpsertTopicParams{ + err = tx.db.UpsertTopic(ctx, dalsql.UpsertTopicParams{ Topic: model.NewTopicKey(moduleSchema.Name, t.Name), Module: moduleSchema.Name, Name: t.Name, @@ -489,7 +489,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem for _, job := range cronJobs { // Start time must be calculated by the caller rather than generated by db // This ensures that nextExecution is after start time, otherwise the job will never be triggered - err := tx.db.CreateCronJob(ctx, dalsql.CreateCronJobParams{ + err = tx.db.CreateCronJob(ctx, dalsql.CreateCronJobParams{ Key: job.Key, DeploymentKey: deploymentKey, ModuleName: job.Verb.Module, diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 389ee35092..dcc7f17729 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -93,7 +93,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t successful := 0 for _, subscription := range subs { - nextCursor, err := tx.db.GetNextEventForSubscription(ctx, sqltypes.Duration(eventConsumptionDelay), subscription.Topic, subscription.Cursor) + var nextCursor sql2.GetNextEventForSubscriptionRow + nextCursor, err = tx.db.GetNextEventForSubscription(ctx, sqltypes.Duration(eventConsumptionDelay), subscription.Topic, subscription.Cursor) if err != nil { observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]()) return 0, fmt.Errorf("failed to get next cursor: %w", libdal.TranslatePGError(err)) @@ -107,8 +108,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t logger.Tracef("Skipping subscription %s because event is too new", subscription.Key) continue } - - subscriber, err := tx.db.GetRandomSubscriber(ctx, subscription.Key) + var subscriber sql2.GetRandomSubscriberRow + subscriber, err = tx.db.GetRandomSubscriber(ctx, subscription.Key) if err != nil { logger.Tracef("no subscriber for subscription %s", subscription.Key) continue @@ -152,7 +153,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t if successful > 0 { // If no async calls were successfully created, then there is no need to // potentially increment the queue depth gauge. - queueDepth, err := tx.db.AsyncCallQueueDepth(ctx) + var queueDepth int64 + queueDepth, err = tx.db.AsyncCallQueueDepth(ctx) if err == nil { // Don't error out of progressing subscriptions just over a queue depth // retrieval error because this is only used for an observability gauge.