Skip to content

Commit

Permalink
Merge pull request #38 from 0xDEC0DE/issue/18
Browse files Browse the repository at this point in the history
RedisBroker: Track non-idempotent jobs as running, too
  • Loading branch information
bigjools authored Mar 27, 2024
2 parents b236f46 + e00eef5 commit 3e4df65
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ local jobs_json = redis.call('hvals', running_jobs_key)

for _, job_json in ipairs(jobs_json) do
local job = cjson.decode(job_json)
if job["retries"] < job["max_retries"] then
-- `max_retries == 0` jobs are non-idempotent, do not re-run them
if job["max_retries"] > 0 and job["retries"] < job["max_retries"] then
job["retries"] = job["retries"] + 1
-- Set job status to queued:
-- A major difference between retrying a job failing in a worker and
Expand All @@ -41,7 +42,8 @@ for _, job_json in ipairs(jobs_json) do
redis.call('rpush', queue, job_json)
num_enqueued_jobs = num_enqueued_jobs + 1
else
-- Keep track of jobs that exceeded the max_retries
-- Keep track of jobs that exceeded the max_retries (or were not
-- retryable)
failed_jobs[i] = job_json
i = i + 1
end
Expand Down
12 changes: 5 additions & 7 deletions spinach/brokers/redis_scripts/get_jobs_from_queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ repeat
job["status"] = job_status_running
local job_json = cjson.encode(job)

if job["max_retries"] > 0 then
-- job is idempotent, must track if it's running
redis.call('hset', running_jobs_key, job["id"], job_json)
-- If tracking concurrency, bump the current value.
if max_concurrency ~= -1 then
redis.call('hincrby', current_concurrency_key, job['task_name'], 1)
end
-- track the running job
redis.call('hset', running_jobs_key, job["id"], job_json)
-- If tracking concurrency, bump the current value.
if max_concurrency ~= -1 then
redis.call('hincrby', current_concurrency_key, job['task_name'], 1)
end

jobs[i] = job_json
Expand Down
33 changes: 23 additions & 10 deletions tests/test_redis_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def test_running_job(broker):
broker.enqueue_jobs([job])
assert broker._r.hget(running_jobs_key, str(job.id)) is None
broker.get_jobs_from_queue('foo_queue', 1)
assert broker._r.hget(running_jobs_key, str(job.id)) is None
job.status = JobStatus.RUNNING
assert (
Job.deserialize(broker._r.hget(running_jobs_key, str(job.id)).decode())
== job
)
# Try to remove it, even if it doesn't exist in running
broker.remove_job_from_running(job)

Expand Down Expand Up @@ -281,8 +285,13 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2):
)
assert current == b'1'

# Mark broker as dead, should re-enqueue only the idempotent jobs.
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (2, [])
# Mark broker as dead, should re-enqueue only the idempotent jobs. The
# non-idempotent job will report as failed.
num_requeued, failed = broker_2.enqueue_jobs_from_dead_broker(broker._id)
assert num_requeued == 2
jobs = [Job.deserialize(job.decode()) for job in failed]
job_1.status = JobStatus.RUNNING
assert [job_1] == jobs

# Check that the current_concurrency was decremented for job_3.
current = broker._r.hget(
Expand Down Expand Up @@ -348,27 +357,31 @@ def test_detect_dead_broker(broker, broker_2):


def test_dead_jobs_exceeding_max_retries_are_marked_failed(broker, broker_2):
# Idempotent job
job_1 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 1)
job_1.retries = 1
# Non-idempotent job
job_2 = Job('bar_task', 'foo_queue', datetime.now(timezone.utc), 0)
# Register the first broker
broker.enqueue_jobs([job_1])
broker.enqueue_jobs([job_2])
broker.get_jobs_from_queue('foo_queue', 100)
broker.move_future_jobs()
broker_2.enqueue_jobs_from_dead_broker = Mock(
return_value=(0, [job_1.serialize()])
)
# Set the 2nd broker to detect dead brokers after 2 seconds of inactivity
broker_2.broker_dead_threshold_seconds = 2
time.sleep(2.1)

signal_called = False
signal_called_count = 0

@signals.job_failed.connect
def check_job(namespace, job, err, **kwargs):
nonlocal signal_called
signal_called = True
nonlocal signal_called_count
signal_called_count += 1
assert job.status == JobStatus.FAILED

# Signals are sent for both jobs
assert 0 == broker_2.move_future_jobs()
assert True is signal_called
assert signal_called_count == 2


def test_not_detect_deregistered_broker_as_dead(broker, broker_2):
Expand Down

0 comments on commit 3e4df65

Please sign in to comment.