Skip to content

Commit

Permalink
pqworkqueue: allow workers to fail on their own transaction without f…
Browse files Browse the repository at this point in the history
…ailing the overal task (and causing it to be re-run)
  • Loading branch information
ntbosscher committed Nov 11, 2024
1 parent 1ef8c3e commit 5f19126
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions pqworkqueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ func init() {
start_after timestamp not null,
started_at timestamp null,
completed_at timestamp null,
retain_until timestamp null
retain_until timestamp null,
commit_error text null
);`)

_, err = pqshared.Pool.Exec(context.Background(), `alter table pq_worker_queue
add column if not exists debounce_key text not null default '',
add column if not exists start_after timestamp not null default current_timestamp;`)
add column if not exists start_after timestamp not null default current_timestamp,
add column if not exists commit_error text null;`)
if err != nil {
log.Fatal("failed to setup worker table: ", err)
}
Expand Down Expand Up @@ -128,19 +130,32 @@ func (w *watcherInfo) startWork(queueName string) (mightBeMore bool) {
return err
}

exec := info.Callback
for _, item := range info.Middleware {
exec = item(exec)
var result []byte

err2 := model.WithTx(context.Background(), func(ctx context.Context, tx *sqlx.Tx) error {
exec := info.Callback
for _, item := range info.Middleware {
exec = item(exec)
}

result = exec(ctx, id, message)
return nil
})

commitErr := nulls.String{}
if err2 != nil {
Logger.Println("failed to process job", err2)
commitErr = nulls.NewString(err2.Error())
}

result := exec(ctx, id, message)
err = model.ExecContext(ctx, `
update pq_worker_queue set
result = $1,
completed_at = $2,
retain_until = $3
where id = $4
`, result, time.Now().UTC(), time.Now().UTC().Add(info.RetainResultsFor), id)
retain_until = $3,
commit_error = $4
where id = $5
`, result, time.Now().UTC(), time.Now().UTC().Add(info.RetainResultsFor), commitErr, id)

if err != nil {
Logger.Println("failed to store result:", err)
Expand Down

0 comments on commit 5f19126

Please sign in to comment.