Skip to content

Commit

Permalink
RedisBroker: Track non-idempotent jobs as running, too
Browse files Browse the repository at this point in the history
Push running-job markers down into the Redis for all jobs, and move
the logic for re-enqueueing jobs from dead brokers into the script
itself.  Non-idempotent jobs running on a dead broker are still NOT
re-enqueued.

This should cause non-idempotent jobs to no longer run "invisibly"
on a Redis broker, as well as causing dead brokers to signal any
non-idempotent jobs that were running on them as failed.

Fixes: Issue #18
  • Loading branch information
nisimond committed Mar 23, 2024
1 parent b236f46 commit dcf8254
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ local jobs_json = redis.call('hvals', running_jobs_key)

for _, job_json in ipairs(jobs_json) do
local job = cjson.decode(job_json)
if job["retries"] < job["max_retries"] then
if job["max_retries"] > 0 and job["retries"] < job["max_retries"] then
job["retries"] = job["retries"] + 1
-- Set job status to queued:
-- A major difference between retrying a job failing in a worker and
Expand Down
12 changes: 5 additions & 7 deletions spinach/brokers/redis_scripts/get_jobs_from_queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ repeat
job["status"] = job_status_running
local job_json = cjson.encode(job)

if job["max_retries"] > 0 then
-- job is idempotent, must track if it's running
redis.call('hset', running_jobs_key, job["id"], job_json)
-- If tracking concurrency, bump the current value.
if max_concurrency ~= -1 then
redis.call('hincrby', current_concurrency_key, job['task_name'], 1)
end
-- track the running job
redis.call('hset', running_jobs_key, job["id"], job_json)
-- If tracking concurrency, bump the current value.
if max_concurrency ~= -1 then
redis.call('hincrby', current_concurrency_key, job['task_name'], 1)
end

jobs[i] = job_json
Expand Down
15 changes: 12 additions & 3 deletions tests/test_redis_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def test_running_job(broker):
broker.enqueue_jobs([job])
assert broker._r.hget(running_jobs_key, str(job.id)) is None
broker.get_jobs_from_queue('foo_queue', 1)
assert broker._r.hget(running_jobs_key, str(job.id)) is None
job.status = JobStatus.RUNNING
assert (
Job.deserialize(broker._r.hget(running_jobs_key, str(job.id)).decode())
== job
)
# Try to remove it, even if it doesn't exist in running
broker.remove_job_from_running(job)

Expand Down Expand Up @@ -281,8 +285,13 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2):
)
assert current == b'1'

# Mark broker as dead, should re-enqueue only the idempotent jobs.
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (2, [])
# Mark broker as dead, should re-enqueue only the idempotent jobs. The
# non-idempotent job will report as failed.
num_requeued, failed = broker_2.enqueue_jobs_from_dead_broker(broker._id)
assert num_requeued == 2
jobs = [Job.deserialize(job.decode()) for job in failed]
job_1.status = JobStatus.RUNNING
assert [job_1] == jobs

# Check that the current_concurrency was decremented for job_3.
current = broker._r.hget(
Expand Down

0 comments on commit dcf8254

Please sign in to comment.