From 28f7302eb5a3c5c7f82fea9e409bfa59873295dd Mon Sep 17 00:00:00 2001 From: gibsondan Date: Mon, 21 Oct 2024 09:51:19 -0500 Subject: [PATCH] Raise a clearer exception when a concurrency slot is tried to claim with a priority outside of the valid range (#25172) ## Summary & Motivation the column is stored as a postgres Integer, so trying to set a priority that is too high will fail. ## How I Tested These Changes BK ## Changelog - [x] `NEW` Raise a clearer error message when a concurrency slot is claimed with a priority that is too large. --- .../plan/instance_concurrency_context.py | 15 +++++++++++++-- .../test_instance_concurrency_context.py | 16 ++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py b/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py index 3bc7f15982beb..2835582c3b647 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py +++ b/python_modules/dagster/dagster/_core/execution/plan/instance_concurrency_context.py @@ -13,6 +13,8 @@ STEP_UP_BASE = 1.1 MAX_CONCURRENCY_CLAIM_BLOCKED_INTERVAL = 15 +MAX_ALLOWED_PRIORITY = 2**31 - 1 + class InstanceConcurrencyContext: """This class is used to manage instance-scoped concurrency for a given run. It wraps the @@ -57,8 +59,11 @@ def __exit__( to_clear.append(step_key) for step_key in to_clear: - del self._pending_timeouts[step_key] - del self._pending_claim_counts[step_key] + if step_key in self._pending_timeouts: + del self._pending_timeouts[step_key] + if step_key in self._pending_claim_counts: + del self._pending_claim_counts[step_key] + self._pending_claims.remove(step_key) self._context_guard = False @@ -105,6 +110,12 @@ def claim(self, concurrency_key: str, step_key: str, step_priority: int = 0): self._pending_claims.add(step_key) priority = self._run_priority + step_priority + + if abs(priority) > MAX_ALLOWED_PRIORITY: + raise Exception( + f"Tried to claim a concurrency slot with a priority {priority} that was not in the allowed range of a 32-bit signed integer." + ) + claim_status = self._instance.event_log_storage.claim_concurrency_slot( concurrency_key, self._run_id, step_key, priority ) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py index fce54b40889bc..65a8bc01da17f 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_instance_concurrency_context.py @@ -292,3 +292,19 @@ def test_run_step_priority(concurrency_instance): time.sleep(0.1) assert low_context.claim("foo", "low_run_low_step", step_priority=-1) # -1001 + + with pytest.raises( + Exception, + match="Tried to claim a concurrency slot with a priority -2147483648 that was not in the allowed range of a 32-bit signed integer.", + ): + low_context.claim("foo", "too_low_step", step_priority=-(2**31 - 1) + 1000 - 1) + + low_context.claim("foo", "too_low_step", step_priority=-(2**31 - 1) + 1000) + + with pytest.raises( + Exception, + match="Tried to claim a concurrency slot with a priority 2147483648 that was not in the allowed range of a 32-bit signed integer.", + ): + regular_context.claim("foo", "too_high_step", step_priority=(2**31 - 1) + 1) + + regular_context.claim("foo", "too_high_step", step_priority=-(2**31 - 1))