From b591a9dcc77dd5e98f643e7d03bf7cda6be06949 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 14 Nov 2024 17:18:56 +0100 Subject: [PATCH] only stop if there are tasks to stop --- .../modules/comp_scheduler/_base_scheduler.py | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py index 91db4f4fcca8..e6d8e6da4917 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py @@ -84,9 +84,9 @@ _Current = CompTaskAtDB _MAX_WAITING_FOR_CLUSTER_TIMEOUT_IN_MIN: Final[int] = 10 _SCHEDULER_INTERVAL: Final[datetime.timedelta] = datetime.timedelta(seconds=5) -_TASK_NAME_TEMPLATE: Final[ - str -] = "computational-scheduler-{user_id}:{project_id}:{iteration}" +_TASK_NAME_TEMPLATE: Final[str] = ( + "computational-scheduler-{user_id}:{project_id}:{iteration}" +) PipelineSchedulingTask: TypeAlias = asyncio.Task PipelineSchedulingWakeUpEvent: TypeAlias = asyncio.Event @@ -219,9 +219,9 @@ async def run_new_pipeline( task, wake_up_event = self._start_scheduling( user_id, project_id, new_run.iteration ) - self._scheduled_pipelines[ - (user_id, project_id, new_run.iteration) - ] = ScheduledPipelineParams(scheduler_task=task, scheduler_waker=wake_up_event) + self._scheduled_pipelines[(user_id, project_id, new_run.iteration)] = ( + ScheduledPipelineParams(scheduler_task=task, scheduler_waker=wake_up_event) + ) await publish_project_log( self.rabbitmq_client, user_id, @@ -653,20 +653,17 @@ async def _start_tasks( scheduled_tasks: dict[NodeID, CompTaskAtDB], comp_run: CompRunsAtDB, wake_up_callback: Callable[[], None], - ) -> None: - ... + ) -> None: ... @abstractmethod async def _get_tasks_status( self, user_id: UserID, tasks: list[CompTaskAtDB], comp_run: CompRunsAtDB - ) -> list[RunningState]: - ... + ) -> list[RunningState]: ... @abstractmethod async def _stop_tasks( self, user_id: UserID, tasks: list[CompTaskAtDB], comp_run: CompRunsAtDB - ) -> None: - ... + ) -> None: ... @abstractmethod async def _process_completed_tasks( @@ -675,8 +672,7 @@ async def _process_completed_tasks( tasks: list[CompTaskAtDB], iteration: Iteration, comp_run: CompRunsAtDB, - ) -> None: - ... + ) -> None: ... @staticmethod def _build_exclusive_lock_key(*args, **kwargs) -> str: @@ -816,8 +812,10 @@ async def _schedule_tasks_to_stop( project_id ) # stop any remaining running task, these are already submitted - tasks_to_stop = [t for t in comp_tasks.values() if t.state in PROCESSING_STATES] - await self._stop_tasks(user_id, tasks_to_stop, comp_run) + if tasks_to_stop := [ + t for t in comp_tasks.values() if t.state in PROCESSING_STATES + ]: + await self._stop_tasks(user_id, tasks_to_stop, comp_run) async def _schedule_tasks_to_start( # noqa: C901 self, @@ -877,9 +875,9 @@ async def _schedule_tasks_to_start( # noqa: C901 RunningState.WAITING_FOR_CLUSTER, ) for task in tasks_ready_to_start: - comp_tasks[ - NodeIDStr(f"{task}") - ].state = RunningState.WAITING_FOR_CLUSTER + comp_tasks[NodeIDStr(f"{task}")].state = ( + RunningState.WAITING_FOR_CLUSTER + ) except ComputationalBackendOnDemandNotReadyError as exc: _logger.info( @@ -901,9 +899,9 @@ async def _schedule_tasks_to_start( # noqa: C901 RunningState.WAITING_FOR_CLUSTER, ) for task in tasks_ready_to_start: - comp_tasks[ - NodeIDStr(f"{task}") - ].state = RunningState.WAITING_FOR_CLUSTER + comp_tasks[NodeIDStr(f"{task}")].state = ( + RunningState.WAITING_FOR_CLUSTER + ) except ClustersKeeperNotAvailableError: _logger.exception("Unexpected error while starting tasks:") await publish_project_log(