diff --git a/backend/controller/sql/querier.go b/backend/controller/sql/querier.go index 33293fc322..738ae45861 100644 --- a/backend/controller/sql/querier.go +++ b/backend/controller/sql/querier.go @@ -90,6 +90,7 @@ type Querier interface { GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) GetUnscheduledCronJobs(ctx context.Context, startTime time.Time) ([]GetUnscheduledCronJobsRow, error) + GetZombieAsyncCalls(ctx context.Context, limit int32) ([]AsyncCall, error) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index dab7ab0350..1ba65230c0 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -1844,6 +1844,56 @@ func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Tim return items, nil } +const getZombieAsyncCalls = `-- name: GetZombieAsyncCalls :many +SELECT id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context +FROM async_calls +WHERE state = 'executing' + AND lease_id IS NULL +ORDER BY created_at ASC +LIMIT $1::INT +` + +func (q *Queries) GetZombieAsyncCalls(ctx context.Context, limit int32) ([]AsyncCall, error) { + rows, err := q.db.QueryContext(ctx, getZombieAsyncCalls, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []AsyncCall + for rows.Next() { + var i AsyncCall + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.LeaseID, + &i.Verb, + &i.State, + &i.Origin, + &i.ScheduledAt, + &i.Request, + &i.Response, + &i.Error, + &i.RemainingAttempts, + &i.Backoff, + &i.MaxBackoff, + &i.CatchVerb, + &i.Catching, + &i.ParentRequestKey, + &i.TraceContext, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertSubscriber = `-- name: InsertSubscriber :exec INSERT INTO topic_subscribers ( key,