diff --git a/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py b/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py index 3bc7f15982beb..2835582c3b647 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py +++ b/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py @@ -13,6 +13,8 @@ STEP_UP_BASE = 1.1 MAX_CONCURRENCY_CLAIM_BLOCKED_INTERVAL = 15 +MAX_ALLOWED_PRIORITY = 2**31 - 1 + class InstanceConcurrencyContext: """This class is used to manage instance-scoped concurrency for a given run. It wraps the @@ -57,8 +59,11 @@ def __exit__( to_clear.append(step_key) for step_key in to_clear: - del self._pending_timeouts[step_key] - del self._pending_claim_counts[step_key] + if step_key in self._pending_timeouts: + del self._pending_timeouts[step_key] + if step_key in self._pending_claim_counts: + del self._pending_claim_counts[step_key] + self._pending_claims.remove(step_key) self._context_guard = False @@ -105,6 +110,12 @@ def claim(self, concurrency_key: str, step_key: str, step_priority: int = 0): self._pending_claims.add(step_key) priority = self._run_priority + step_priority + + if abs(priority) > MAX_ALLOWED_PRIORITY: + raise Exception( + f"Tried to claim a concurrency slot with a priority {priority} that was not in the allowed range of a 32-bit signed integer." + ) + claim_status = self._instance.event_log_storage.claim_concurrency_slot( concurrency_key, self._run_id, step_key, priority ) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py index fce54b40889bc..65a8bc01da17f 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py @@ -292,3 +292,19 @@ def test_run_step_priority(concurrency_instance): time.sleep(0.1) assert low_context.claim("foo", "low_run_low_step", step_priority=-1) # -1001 + + with pytest.raises( + Exception, + match="Tried to claim a concurrency slot with a priority -2147483648 that was not in the allowed range of a 32-bit signed integer.", + ): + low_context.claim("foo", "too_low_step", step_priority=-(2**31 - 1) + 1000 - 1) + + low_context.claim("foo", "too_low_step", step_priority=-(2**31 - 1) + 1000) + + with pytest.raises( + Exception, + match="Tried to claim a concurrency slot with a priority 2147483648 that was not in the allowed range of a 32-bit signed integer.", + ): + regular_context.claim("foo", "too_high_step", step_priority=(2**31 - 1) + 1) + + regular_context.claim("foo", "too_high_step", step_priority=-(2**31 - 1))