Skip to content

Commit

Permalink
hoists the run queue config from the run coordinator up to the instance
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed Jan 16, 2025
1 parent b5ec0b8 commit 253141f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,9 @@ def resolve_runQueuingSupported(self, _graphene_info: ResolveInfo):
return isinstance(self._instance.run_coordinator, QueuedRunCoordinator)

def resolve_runQueueConfig(self, _graphene_info: ResolveInfo):
from dagster._core.run_coordinator import QueuedRunCoordinator

if isinstance(self._instance.run_coordinator, QueuedRunCoordinator):
return GrapheneRunQueueConfig(self._instance.run_coordinator.get_run_queue_config())
run_queue_config = self._instance.get_run_queue_config()
if run_queue_config:
return GrapheneRunQueueConfig(run_queue_config)
else:
return None

Expand Down
9 changes: 8 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
)
from dagster._core.remote_representation.external import RemoteSchedule
from dagster._core.run_coordinator import RunCoordinator
from dagster._core.run_coordinator.queued_run_coordinator import RunQueueConfig
from dagster._core.scheduler import Scheduler, SchedulerDebugInfo
from dagster._core.scheduler.instigation import (
InstigatorState,
Expand Down Expand Up @@ -782,7 +783,13 @@ def run_coordinator(self) -> "RunCoordinator":
self._run_coordinator.register_instance(self)
return self._run_coordinator

# run launcher
def get_run_queue_config(self) -> Optional["RunQueueConfig"]:
from dagster._core.run_coordinator.queued_run_coordinator import QueuedRunCoordinator

if not isinstance(self.run_coordinator, QueuedRunCoordinator):
return None

return self.run_coordinator.get_run_queue_config()

@property
def run_launcher(self) -> "RunLauncher":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ def run_iteration(
fixed_iteration_time: Optional[float] = None, # used for tests
) -> DaemonIterator:
run_coordinator = workspace_process_context.instance.run_coordinator
run_queue_config = workspace_process_context.instance.get_run_queue_config()
if not isinstance(run_coordinator, QueuedRunCoordinator):
check.failed(f"Expected QueuedRunCoordinator, got {run_coordinator}")

run_queue_config = run_coordinator.get_run_queue_config()
if not run_queue_config:
check.failed("Got invalid run queue config")

instance = workspace_process_context.instance
runs_to_dequeue = self._get_runs_to_dequeue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ def test_run_queue_key():

with instance_for_test(overrides={"run_queue": config}) as instance:
assert isinstance(instance.run_coordinator, QueuedRunCoordinator)
run_queue_config = instance.run_coordinator.get_run_queue_config()
run_queue_config = instance.get_run_queue_config()
assert run_queue_config
assert run_queue_config.max_concurrent_runs == 50
assert run_queue_config.tag_concurrency_limits == tag_rules

Expand All @@ -189,7 +190,8 @@ def test_run_queue_key():
}
) as instance:
assert isinstance(instance.run_coordinator, QueuedRunCoordinator)
run_queue_config = instance.run_coordinator.get_run_queue_config()
run_queue_config = instance.get_run_queue_config()
assert run_queue_config
assert run_queue_config.max_concurrent_runs == 50
assert run_queue_config.tag_concurrency_limits == tag_rules

Expand Down Expand Up @@ -227,7 +229,8 @@ def test_run_coordinator_key():
overrides={"run_queue": {"max_concurrent_runs": 50, "tag_concurrency_limits": tag_rules}}
) as instance:
assert isinstance(instance.run_coordinator, QueuedRunCoordinator)
run_queue_config = instance.run_coordinator.get_run_queue_config()
run_queue_config = instance.get_run_queue_config()
assert run_queue_config
assert run_queue_config.max_concurrent_runs == 50
assert run_queue_config.tag_concurrency_limits == tag_rules

Expand Down

0 comments on commit 253141f

Please sign in to comment.