diff --git a/backend/controller/cronjobs/internal/dal/dal.go b/backend/controller/cronjobs/internal/dal/dal.go index a72e4ae739..2df11f56e5 100644 --- a/backend/controller/cronjobs/internal/dal/dal.go +++ b/backend/controller/cronjobs/internal/dal/dal.go @@ -102,7 +102,11 @@ func (d *DAL) UpdateCronJobExecution(ctx context.Context, params UpdateCronJobEx } func (d *DAL) DeleteCronJobsForDeployment(ctx context.Context, key model.DeploymentKey) error { - err := d.db.DeleteCronJobsForDeployment(ctx, key) + err := d.db.DeleteCronAsyncCallsForDeployment(ctx, key) + if err != nil { + return fmt.Errorf("failed to delete cron job async calls for deployment %v: %w", key, libdal.TranslatePGError(err)) + } + err = d.db.DeleteCronJobsForDeployment(ctx, key) if err != nil { return fmt.Errorf("failed to delete cron jobs for deployment %v: %w", key, libdal.TranslatePGError(err)) } diff --git a/backend/controller/cronjobs/internal/sql/querier.go b/backend/controller/cronjobs/internal/sql/querier.go index 7890eadf23..0a0f9b2bd9 100644 --- a/backend/controller/cronjobs/internal/sql/querier.go +++ b/backend/controller/cronjobs/internal/sql/querier.go @@ -15,6 +15,7 @@ type Querier interface { AsyncCallQueueDepth(ctx context.Context) (int64, error) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error + DeleteCronAsyncCallsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error DeleteCronJobsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) diff --git a/backend/controller/cronjobs/internal/sql/queries.sql b/backend/controller/cronjobs/internal/sql/queries.sql index cb1136326f..774039016a 100644 --- a/backend/controller/cronjobs/internal/sql/queries.sql +++ b/backend/controller/cronjobs/internal/sql/queries.sql @@ -52,4 +52,13 @@ SELECT EXISTS ( -- name: DeleteCronJobsForDeployment :exec DELETE FROM cron_jobs -WHERE deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1); \ No newline at end of file +WHERE deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1); + + +-- name: DeleteCronAsyncCallsForDeployment :exec +DELETE FROM async_calls +WHERE id IN ( + SELECT last_async_call_id + FROM cron_jobs + WHERE deployment_id = (SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1) +); \ No newline at end of file diff --git a/backend/controller/cronjobs/internal/sql/queries.sql.go b/backend/controller/cronjobs/internal/sql/queries.sql.go index 6d17718fc3..f3e514a6cd 100644 --- a/backend/controller/cronjobs/internal/sql/queries.sql.go +++ b/backend/controller/cronjobs/internal/sql/queries.sql.go @@ -47,6 +47,20 @@ func (q *Queries) CreateCronJob(ctx context.Context, arg CreateCronJobParams) er return err } +const deleteCronAsyncCallsForDeployment = `-- name: DeleteCronAsyncCallsForDeployment :exec +DELETE FROM async_calls +WHERE id IN ( + SELECT last_async_call_id + FROM cron_jobs + WHERE deployment_id = (SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1) +) +` + +func (q *Queries) DeleteCronAsyncCallsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error { + _, err := q.db.ExecContext(ctx, deleteCronAsyncCallsForDeployment, deploymentKey) + return err +} + const deleteCronJobsForDeployment = `-- name: DeleteCronJobsForDeployment :exec DELETE FROM cron_jobs WHERE deployment_id = (SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1) diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index 0873932cd1..50e0920405 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -27,6 +27,7 @@ type Querier interface { CreateDeployment(ctx context.Context, moduleName string, schema []byte, key model.DeploymentKey) error CreateIngressRoute(ctx context.Context, arg CreateIngressRouteParams) error CreateRequest(ctx context.Context, origin Origin, key model.RequestKey, sourceAddr string) error + DeleteCronAsyncCallsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error DeleteCronJobsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index a778919623..fc0b72ca8e 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -218,6 +218,20 @@ func (q *Queries) CreateRequest(ctx context.Context, origin Origin, key model.Re return err } +const deleteCronAsyncCallsForDeployment = `-- name: DeleteCronAsyncCallsForDeployment :exec +DELETE FROM async_calls +WHERE id IN ( + SELECT last_async_call_id + FROM cron_jobs + WHERE deployment_id = (SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1) +) +` + +func (q *Queries) DeleteCronAsyncCallsForDeployment(ctx context.Context, deploymentKey model.DeploymentKey) error { + _, err := q.db.ExecContext(ctx, deleteCronAsyncCallsForDeployment, deploymentKey) + return err +} + const deleteCronJobsForDeployment = `-- name: DeleteCronJobsForDeployment :exec DELETE FROM cron_jobs WHERE deployment_id = (SELECT id FROM deployments WHERE key = $1::deployment_key LIMIT 1)