diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instance.py b/python_modules/dagster-graphql/dagster_graphql/schema/instance.py index 715f15b6b41cb..b4b46f1dd4fbf 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instance.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instance.py @@ -279,10 +279,9 @@ def resolve_runQueuingSupported(self, _graphene_info: ResolveInfo): return isinstance(self._instance.run_coordinator, QueuedRunCoordinator) def resolve_runQueueConfig(self, _graphene_info: ResolveInfo): - from dagster._core.run_coordinator import QueuedRunCoordinator - - if isinstance(self._instance.run_coordinator, QueuedRunCoordinator): - return GrapheneRunQueueConfig(self._instance.run_coordinator.get_run_queue_config()) + run_queue_config = self._instance.get_run_queue_config() + if run_queue_config: + return GrapheneRunQueueConfig(run_queue_config) else: return None diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 92c177d1d97ce..9d60503eb44f3 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -145,6 +145,7 @@ ) from dagster._core.remote_representation.external import RemoteSchedule from dagster._core.run_coordinator import RunCoordinator + from dagster._core.run_coordinator.queued_run_coordinator import RunQueueConfig from dagster._core.scheduler import Scheduler, SchedulerDebugInfo from dagster._core.scheduler.instigation import ( InstigatorState, @@ -788,7 +789,13 @@ def run_coordinator(self) -> "RunCoordinator": self._run_coordinator.register_instance(self) return self._run_coordinator - # run launcher + def get_run_queue_config(self) -> Optional["RunQueueConfig"]: + from dagster._core.run_coordinator.queued_run_coordinator import QueuedRunCoordinator + + if not isinstance(self.run_coordinator, QueuedRunCoordinator): + return None + + return self.run_coordinator.get_run_queue_config() @property def run_launcher(self) -> "RunLauncher": diff --git a/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py b/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py index aa656482eb9e5..d55cff6c78576 100644 --- a/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py +++ b/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py @@ -78,10 +78,12 @@ def run_iteration( fixed_iteration_time: Optional[float] = None, # used for tests ) -> DaemonIterator: run_coordinator = workspace_process_context.instance.run_coordinator + run_queue_config = workspace_process_context.instance.get_run_queue_config() if not isinstance(run_coordinator, QueuedRunCoordinator): check.failed(f"Expected QueuedRunCoordinator, got {run_coordinator}") - run_queue_config = run_coordinator.get_run_queue_config() + if not run_queue_config: + check.failed("Got invalid run queue config") instance = workspace_process_context.instance runs_to_dequeue = self._get_runs_to_dequeue( diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py index 590a2c2fffa50..696cd57fa3170 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py @@ -174,7 +174,8 @@ def test_run_queue_key(): with instance_for_test(overrides={"run_queue": config}) as instance: assert isinstance(instance.run_coordinator, QueuedRunCoordinator) - run_queue_config = instance.run_coordinator.get_run_queue_config() + run_queue_config = instance.get_run_queue_config() + assert run_queue_config assert run_queue_config.max_concurrent_runs == 50 assert run_queue_config.tag_concurrency_limits == tag_rules @@ -188,7 +189,8 @@ def test_run_queue_key(): } ) as instance: assert isinstance(instance.run_coordinator, QueuedRunCoordinator) - run_queue_config = instance.run_coordinator.get_run_queue_config() + run_queue_config = instance.get_run_queue_config() + assert run_queue_config assert run_queue_config.max_concurrent_runs == 50 assert run_queue_config.tag_concurrency_limits == tag_rules @@ -226,7 +228,8 @@ def test_run_coordinator_key(): overrides={"run_queue": {"max_concurrent_runs": 50, "tag_concurrency_limits": tag_rules}} ) as instance: assert isinstance(instance.run_coordinator, QueuedRunCoordinator) - run_queue_config = instance.run_coordinator.get_run_queue_config() + run_queue_config = instance.get_run_queue_config() + assert run_queue_config assert run_queue_config.max_concurrent_runs == 50 assert run_queue_config.tag_concurrency_limits == tag_rules