Skip to content

Commit

Permalink
only stop if there are tasks to stop
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 15, 2024
1 parent bf896c9 commit 13af039
Showing 1 changed file with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@
_Current = CompTaskAtDB
_MAX_WAITING_FOR_CLUSTER_TIMEOUT_IN_MIN: Final[int] = 10
_SCHEDULER_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(seconds=5)
_TASK_NAME_TEMPLATE: Final[
str
] = "computational-scheduler-{user_id}:{project_id}:{iteration}"
_TASK_NAME_TEMPLATE: Final[str] = (
"computational-scheduler-{user_id}:{project_id}:{iteration}"
)

PipelineSchedulingTask: TypeAlias = asyncio.Task
PipelineSchedulingWakeUpEvent: TypeAlias = asyncio.Event
Expand Down Expand Up @@ -219,9 +219,9 @@ async def run_new_pipeline(
task, wake_up_event = self._start_scheduling(
user_id, project_id, new_run.iteration
)
self._scheduled_pipelines[
(user_id, project_id, new_run.iteration)
] = ScheduledPipelineParams(scheduler_task=task, scheduler_waker=wake_up_event)
self._scheduled_pipelines[(user_id, project_id, new_run.iteration)] = (
ScheduledPipelineParams(scheduler_task=task, scheduler_waker=wake_up_event)
)
await publish_project_log(
self.rabbitmq_client,
user_id,
Expand Down Expand Up @@ -653,20 +653,17 @@ async def _start_tasks(
scheduled_tasks: dict[NodeID, CompTaskAtDB],
comp_run: CompRunsAtDB,
wake_up_callback: Callable[[], None],
) -> None:
...
) -> None: ...

@abstractmethod
async def _get_tasks_status(
self, user_id: UserID, tasks: list[CompTaskAtDB], comp_run: CompRunsAtDB
) -> list[RunningState]:
...
) -> list[RunningState]: ...

@abstractmethod
async def _stop_tasks(
self, user_id: UserID, tasks: list[CompTaskAtDB], comp_run: CompRunsAtDB
) -> None:
...
) -> None: ...

@abstractmethod
async def _process_completed_tasks(
Expand All @@ -675,8 +672,7 @@ async def _process_completed_tasks(
tasks: list[CompTaskAtDB],
iteration: Iteration,
comp_run: CompRunsAtDB,
) -> None:
...
) -> None: ...

@staticmethod
def _build_exclusive_lock_key(*args, **kwargs) -> str:
Expand Down Expand Up @@ -816,8 +812,10 @@ async def _schedule_tasks_to_stop(
project_id
)
# stop any remaining running task, these are already submitted
tasks_to_stop = [t for t in comp_tasks.values() if t.state in PROCESSING_STATES]
await self._stop_tasks(user_id, tasks_to_stop, comp_run)
if tasks_to_stop := [
t for t in comp_tasks.values() if t.state in PROCESSING_STATES
]:
await self._stop_tasks(user_id, tasks_to_stop, comp_run)

async def _schedule_tasks_to_start( # noqa: C901
self,
Expand Down Expand Up @@ -877,9 +875,9 @@ async def _schedule_tasks_to_start( # noqa: C901
RunningState.WAITING_FOR_CLUSTER,
)
for task in tasks_ready_to_start:
comp_tasks[
NodeIDStr(f"{task}")
].state = RunningState.WAITING_FOR_CLUSTER
comp_tasks[NodeIDStr(f"{task}")].state = (
RunningState.WAITING_FOR_CLUSTER
)

except ComputationalBackendOnDemandNotReadyError as exc:
_logger.info(
Expand All @@ -901,9 +899,9 @@ async def _schedule_tasks_to_start( # noqa: C901
RunningState.WAITING_FOR_CLUSTER,
)
for task in tasks_ready_to_start:
comp_tasks[
NodeIDStr(f"{task}")
].state = RunningState.WAITING_FOR_CLUSTER
comp_tasks[NodeIDStr(f"{task}")].state = (
RunningState.WAITING_FOR_CLUSTER
)
except ClustersKeeperNotAvailableError:
_logger.exception("Unexpected error while starting tasks:")
await publish_project_log(
Expand Down

0 comments on commit 13af039

Please sign in to comment.