Skip to content

Commit

Permalink
Raise a clearer exception when a concurrency slot is tried to claim w…
Browse files Browse the repository at this point in the history
…ith a priority outside of the valid range (#25172)

## Summary & Motivation
the column is stored as a postgres Integer, so trying to set a priority
that is too high will fail.

## How I Tested These Changes
BK

## Changelog
- [x] `NEW` Raise a clearer error message when a concurrency slot is
claimed with a priority that is too large.
  • Loading branch information
gibsondan authored Oct 21, 2024
1 parent 778418a commit 28f7302
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
STEP_UP_BASE = 1.1
MAX_CONCURRENCY_CLAIM_BLOCKED_INTERVAL = 15

MAX_ALLOWED_PRIORITY = 2**31 - 1


class InstanceConcurrencyContext:
"""This class is used to manage instance-scoped concurrency for a given run. It wraps the
Expand Down Expand Up @@ -57,8 +59,11 @@ def __exit__(
to_clear.append(step_key)

for step_key in to_clear:
del self._pending_timeouts[step_key]
del self._pending_claim_counts[step_key]
if step_key in self._pending_timeouts:
del self._pending_timeouts[step_key]
if step_key in self._pending_claim_counts:
del self._pending_claim_counts[step_key]

self._pending_claims.remove(step_key)

self._context_guard = False
Expand Down Expand Up @@ -105,6 +110,12 @@ def claim(self, concurrency_key: str, step_key: str, step_priority: int = 0):
self._pending_claims.add(step_key)

priority = self._run_priority + step_priority

if abs(priority) > MAX_ALLOWED_PRIORITY:
raise Exception(
f"Tried to claim a concurrency slot with a priority {priority} that was not in the allowed range of a 32-bit signed integer."
)

claim_status = self._instance.event_log_storage.claim_concurrency_slot(
concurrency_key, self._run_id, step_key, priority
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,19 @@ def test_run_step_priority(concurrency_instance):
time.sleep(0.1)

assert low_context.claim("foo", "low_run_low_step", step_priority=-1) # -1001

with pytest.raises(
Exception,
match="Tried to claim a concurrency slot with a priority -2147483648 that was not in the allowed range of a 32-bit signed integer.",
):
low_context.claim("foo", "too_low_step", step_priority=-(2**31 - 1) + 1000 - 1)

low_context.claim("foo", "too_low_step", step_priority=-(2**31 - 1) + 1000)

with pytest.raises(
Exception,
match="Tried to claim a concurrency slot with a priority 2147483648 that was not in the allowed range of a 32-bit signed integer.",
):
regular_context.claim("foo", "too_high_step", step_priority=(2**31 - 1) + 1)

regular_context.claim("foo", "too_high_step", step_priority=-(2**31 - 1))

0 comments on commit 28f7302

Please sign in to comment.