From 253141f1475f3986d60f09eb711a3bdf8e9d7c01 Mon Sep 17 00:00:00 2001 From: prha Date: Tue, 7 Jan 2025 17:05:11 -0800 Subject: [PATCH] hoists the run queue config from the run coordinator up to the instance --- .../dagster-graphql/dagster_graphql/schema/instance.py | 7 +++---- .../dagster/dagster/_core/instance/__init__.py | 9 ++++++++- .../run_coordinator/queued_run_coordinator_daemon.py | 4 +++- .../core_tests/instance_tests/test_instance.py | 9 ++++++--- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instance.py b/python_modules/dagster-graphql/dagster_graphql/schema/instance.py index 60d594bc85792..a2515b92366da 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instance.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instance.py @@ -283,10 +283,9 @@ def resolve_runQueuingSupported(self, _graphene_info: ResolveInfo): return isinstance(self._instance.run_coordinator, QueuedRunCoordinator) def resolve_runQueueConfig(self, _graphene_info: ResolveInfo): - from dagster._core.run_coordinator import QueuedRunCoordinator - - if isinstance(self._instance.run_coordinator, QueuedRunCoordinator): - return GrapheneRunQueueConfig(self._instance.run_coordinator.get_run_queue_config()) + run_queue_config = self._instance.get_run_queue_config() + if run_queue_config: + return GrapheneRunQueueConfig(run_queue_config) else: return None diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 496aa136cb211..1d288f347f0b6 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -143,6 +143,7 @@ ) from dagster._core.remote_representation.external import RemoteSchedule from dagster._core.run_coordinator import RunCoordinator + from dagster._core.run_coordinator.queued_run_coordinator import RunQueueConfig from dagster._core.scheduler import Scheduler, SchedulerDebugInfo from dagster._core.scheduler.instigation import ( InstigatorState, @@ -782,7 +783,13 @@ def run_coordinator(self) -> "RunCoordinator": self._run_coordinator.register_instance(self) return self._run_coordinator - # run launcher + def get_run_queue_config(self) -> Optional["RunQueueConfig"]: + from dagster._core.run_coordinator.queued_run_coordinator import QueuedRunCoordinator + + if not isinstance(self.run_coordinator, QueuedRunCoordinator): + return None + + return self.run_coordinator.get_run_queue_config() @property def run_launcher(self) -> "RunLauncher": diff --git a/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py b/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py index dc223044f7f42..3d90c60f42aee 100644 --- a/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py +++ b/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py @@ -79,10 +79,12 @@ def run_iteration( fixed_iteration_time: Optional[float] = None, # used for tests ) -> DaemonIterator: run_coordinator = workspace_process_context.instance.run_coordinator + run_queue_config = workspace_process_context.instance.get_run_queue_config() if not isinstance(run_coordinator, QueuedRunCoordinator): check.failed(f"Expected QueuedRunCoordinator, got {run_coordinator}") - run_queue_config = run_coordinator.get_run_queue_config() + if not run_queue_config: + check.failed("Got invalid run queue config") instance = workspace_process_context.instance runs_to_dequeue = self._get_runs_to_dequeue( diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py index 7a42e6d43c055..aa6b7d1ea865d 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py @@ -175,7 +175,8 @@ def test_run_queue_key(): with instance_for_test(overrides={"run_queue": config}) as instance: assert isinstance(instance.run_coordinator, QueuedRunCoordinator) - run_queue_config = instance.run_coordinator.get_run_queue_config() + run_queue_config = instance.get_run_queue_config() + assert run_queue_config assert run_queue_config.max_concurrent_runs == 50 assert run_queue_config.tag_concurrency_limits == tag_rules @@ -189,7 +190,8 @@ def test_run_queue_key(): } ) as instance: assert isinstance(instance.run_coordinator, QueuedRunCoordinator) - run_queue_config = instance.run_coordinator.get_run_queue_config() + run_queue_config = instance.get_run_queue_config() + assert run_queue_config assert run_queue_config.max_concurrent_runs == 50 assert run_queue_config.tag_concurrency_limits == tag_rules @@ -227,7 +229,8 @@ def test_run_coordinator_key(): overrides={"run_queue": {"max_concurrent_runs": 50, "tag_concurrency_limits": tag_rules}} ) as instance: assert isinstance(instance.run_coordinator, QueuedRunCoordinator) - run_queue_config = instance.run_coordinator.get_run_queue_config() + run_queue_config = instance.get_run_queue_config() + assert run_queue_config assert run_queue_config.max_concurrent_runs == 50 assert run_queue_config.tag_concurrency_limits == tag_rules