Skip to content

Commit

Permalink
fix: lock rows and skip them during concurrent poll (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
thevaibhav-dixit authored Mar 18, 2024
1 parent cce5bc1 commit ee97ad5
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
62 changes: 62 additions & 0 deletions migrations/20240318080232_lock_and_skip_rows_in_mq_poll.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

-- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
RETURNS TABLE(
id UUID,
is_committed BOOLEAN,
name TEXT,
payload_json TEXT,
payload_bytes BYTEA,
retry_backoff INTERVAL,
wait_time INTERVAL
) AS $$
BEGIN
RETURN QUERY UPDATE mq_msgs
SET
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
attempts = mq_msgs.attempts - 1,
retry_backoff = mq_msgs.retry_backoff * 2
FROM (
SELECT
msgs.id
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT mq_msgs.id FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
ORDER BY mq_msgs.attempt_at ASC
LIMIT batch_size
) AS msgs ON TRUE
LIMIT batch_size
) AS messages_to_update
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
WHERE mq_msgs.id = messages_to_update.id
AND mq_msgs.attempt_at <= NOW()
RETURNING
mq_msgs.id,
mq_msgs.commit_interval IS NULL,
mq_payloads.name,
mq_payloads.payload_json::TEXT,
mq_payloads.payload_bytes,
mq_msgs.retry_backoff / 2,
interval '0' AS wait_time;

IF NOT FOUND THEN
RETURN QUERY SELECT
NULL::UUID,
NULL::BOOLEAN,
NULL::TEXT,
NULL::TEXT,
NULL::BYTEA,
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
END;
$$ LANGUAGE plpgsql;
63 changes: 63 additions & 0 deletions migrations/20240318080232_lock_and_skip_rows_in_mq_poll.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@

-- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
RETURNS TABLE(
id UUID,
is_committed BOOLEAN,
name TEXT,
payload_json TEXT,
payload_bytes BYTEA,
retry_backoff INTERVAL,
wait_time INTERVAL
) AS $$
BEGIN
RETURN QUERY UPDATE mq_msgs
SET
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
attempts = mq_msgs.attempts - 1,
retry_backoff = mq_msgs.retry_backoff * 2
FROM (
SELECT
msgs.id
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT mq_msgs.id FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
ORDER BY mq_msgs.attempt_at ASC
LIMIT batch_size
FOR UPDATE SKIP LOCKED
) AS msgs ON TRUE
LIMIT batch_size
) AS messages_to_update
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
WHERE mq_msgs.id = messages_to_update.id
AND mq_msgs.attempt_at <= NOW()
RETURNING
mq_msgs.id,
mq_msgs.commit_interval IS NULL,
mq_payloads.name,
mq_payloads.payload_json::TEXT,
mq_payloads.payload_bytes,
mq_msgs.retry_backoff / 2,
interval '0' AS wait_time;

IF NOT FOUND THEN
RETURN QUERY SELECT
NULL::UUID,
NULL::BOOLEAN,
NULL::TEXT,
NULL::TEXT,
NULL::BYTEA,
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
END;
$$ LANGUAGE plpgsql;

0 comments on commit ee97ad5

Please sign in to comment.