From 18579ef240e90ad0b0274ccda795a515e5453fbd Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 16:36:35 +0100 Subject: [PATCH 01/23] ruff --- .../src/simcore_service_director_v2/api/routes/computations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 72bdf37e6c7..06f5c7e3795 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -288,7 +288,7 @@ async def _try_start_pipeline( ) # NOTE: in case of a burst of calls to that endpoint, we might end up in a weird state. @run_sequentially_in_context(target_args=["computation.project_id"]) -async def create_computation( # noqa: PLR0913 # pylint:disable=too-many-positional-arguments +async def create_computation( # noqa: PLR0913 computation: ComputationCreate, request: Request, project_repo: Annotated[ From 6e7193ece0acfe5d7e05e4fcc1cfaa41c8108be7 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 16:36:49 +0100 Subject: [PATCH 02/23] ruff --- services/director-v2/tests/unit/_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/tests/unit/_helpers.py b/services/director-v2/tests/unit/_helpers.py index 60eea950f12..ad1eafc90ae 100644 --- a/services/director-v2/tests/unit/_helpers.py +++ b/services/director-v2/tests/unit/_helpers.py @@ -30,7 +30,7 @@ class RunningProject(PublishedProject): async def trigger_comp_scheduler(scheduler: BaseCompScheduler) -> None: # trigger the scheduler - scheduler._wake_up_scheduler_now() # pylint: disable=protected-access + scheduler._wake_up_scheduler_now() # pylint: disable=protected-access # noqa: SLF001 # let the scheduler be actually triggered await asyncio.sleep(1) From e8618a0699fe73a2d6e60f16071cc5565b286070 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 16:55:15 +0100 Subject: [PATCH 03/23] reverted tools in dv-0 --- services/director/requirements/_tools.in | 7 ------ services/director/requirements/_tools.txt | 29 +++++++++++++++++++---- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/services/director/requirements/_tools.in b/services/director/requirements/_tools.in index 05f1ab1646f..e69de29bb2d 100644 --- a/services/director/requirements/_tools.in +++ b/services/director/requirements/_tools.in @@ -1,7 +0,0 @@ ---constraint _base.txt ---constraint _test.txt - -watchdog[watchmedo] -black~=20.8b0 -pip-tools -bump2version diff --git a/services/director/requirements/_tools.txt b/services/director/requirements/_tools.txt index 821e63f1a10..38ed7220aed 100644 --- a/services/director/requirements/_tools.txt +++ b/services/director/requirements/_tools.txt @@ -1,3 +1,9 @@ +# +# This file is autogenerated by pip-compile with python 3.6 +# To update, run: +# +# pip-compile --output-file=requirements/_tools.txt --strip-extras requirements/_tools.in +# appdirs==1.4.4 # via black black==20.8b1 @@ -8,14 +14,22 @@ click==8.0.3 # via # black # pip-tools +dataclasses==0.7 + # via + # -c requirements/_base.txt + # -c requirements/_test.txt + # black +importlib-metadata==2.0.0 + # via + # -c requirements/_test.txt + # click + # pep517 mypy-extensions==0.4.3 # via black pathspec==0.9.0 # via black pep517==0.12.0 # via pip-tools -pip==24.2 - # via pip-tools pip-tools==6.4.0 # via -r requirements/_tools.in pyyaml==5.4 @@ -25,8 +39,6 @@ pyyaml==5.4 # watchdog regex==2022.1.18 # via black -setuptools==75.2.0 - # via pip-tools toml==0.10.2 # via # -c requirements/_test.txt @@ -43,3 +55,12 @@ watchdog==2.1.6 # via -r requirements/_tools.in wheel==0.37.1 # via pip-tools +zipp==3.4.0 + # via + # -c requirements/_test.txt + # importlib-metadata + # pep517 + +# The following packages are considered to be unsafe in a requirements file: +# pip +# setuptools From dd60056ecb0264d62e896ce6ba9c2984c1bca9d1 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 17:12:39 +0100 Subject: [PATCH 04/23] use service lib background task --- .../modules/comp_scheduler/background_task.py | 73 +++++++------------ 1 file changed, 25 insertions(+), 48 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py index b6fc18efbce..3aac539be13 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py @@ -1,69 +1,46 @@ -import asyncio +import datetime import logging -from asyncio import CancelledError -from contextlib import suppress -from typing import Any, Callable, Coroutine +from collections.abc import Callable, Coroutine +from typing import Any, Final from fastapi import FastAPI +from servicelib.background_task import start_periodic_task, stop_periodic_task +from servicelib.logging_utils import log_context +from servicelib.redis import RedisClientsManager +from servicelib.redis_utils import exclusive +from settings_library.redis import RedisDatabase from . import factory logger = logging.getLogger(__name__) -_DEFAULT_TIMEOUT_S: int = 5 - - -async def scheduler_task(app: FastAPI) -> None: - scheduler = app.state.scheduler - while app.state.comp_scheduler_running: - try: - logger.debug("Computational scheduler task running...") - await scheduler.schedule_all_pipelines() - with suppress(asyncio.TimeoutError): - await asyncio.wait_for( - scheduler.wake_up_event.wait(), timeout=_DEFAULT_TIMEOUT_S - ) - except CancelledError: - logger.info("Computational scheduler task cancelled") - raise - except Exception: # pylint: disable=broad-except - if not app.state.comp_scheduler_running: - logger.warning("Forced to stop computational scheduler") - break - logger.exception( - "Unexpected error in computational scheduler task, restarting scheduler now..." - ) - # wait a bit before restarting the task - await asyncio.sleep(_DEFAULT_TIMEOUT_S) +_DEFAULT_TIMEOUT_S: Final[datetime.timedelta] = datetime.timedelta(seconds=5) +_TASK_NAME: Final[str] = "computational services scheduler" def on_app_startup(app: FastAPI) -> Callable[[], Coroutine[Any, Any, None]]: async def start_scheduler() -> None: - # FIXME: added this variable to overcome the state in which the - # task cancelation is ignored and the exceptions enter in a loop - # that never stops the background task. This flag is an additional - # mechanism to enforce stopping the background task - app.state.comp_scheduler_running = True - app.state.scheduler = await factory.create_from_db(app) - app.state.scheduler_task = asyncio.create_task( - scheduler_task(app), name="computational services scheduler" - ) - logger.info("Computational services Scheduler started") + with log_context( + logger, level=logging.INFO, msg="starting computational scheduler" + ): + redis_clients_manager: RedisClientsManager = app.state.redis_clients_manager + lock_key = f"{app.title}:computational_scheduler" + app.state.scheduler = scheduler = await factory.create_from_db(app) + app.state.computational_scheduler_task = start_periodic_task( + exclusive( + redis_clients_manager.client(RedisDatabase.LOCKS), + lock_key=lock_key, + )(scheduler.schedule_all_pipelines), + interval=_DEFAULT_TIMEOUT_S, + task_name=_TASK_NAME, + ) return start_scheduler def on_app_shutdown(app: FastAPI) -> Callable[[], Coroutine[Any, Any, None]]: async def stop_scheduler() -> None: - logger.info("Computational services Scheduler stopping...") - task = app.state.scheduler_task - with suppress(CancelledError): - app.state.comp_scheduler_running = False - task.cancel() - await task - app.state.scheduler = None - app.state.scheduler_task = None - logger.info("Computational services Scheduler stopped") + await stop_periodic_task(app.state.computational_scheduler_task) return stop_scheduler From 527ca47d1478058bff58a666fd332358586f6b37 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 17:21:21 +0100 Subject: [PATCH 05/23] renamed logger and --- .../modules/comp_scheduler/background_task.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py index 3aac539be13..16653cb8ae8 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py @@ -14,7 +14,9 @@ logger = logging.getLogger(__name__) -_DEFAULT_TIMEOUT_S: Final[datetime.timedelta] = datetime.timedelta(seconds=5) +_COMPUTATIONAL_SCHEDULER_INTERVAL: Final[datetime.timedelta] = datetime.timedelta( + seconds=5 +) _TASK_NAME: Final[str] = "computational services scheduler" @@ -31,7 +33,7 @@ async def start_scheduler() -> None: redis_clients_manager.client(RedisDatabase.LOCKS), lock_key=lock_key, )(scheduler.schedule_all_pipelines), - interval=_DEFAULT_TIMEOUT_S, + interval=_COMPUTATIONAL_SCHEDULER_INTERVAL, task_name=_TASK_NAME, ) From f617a4d752db031b31f73c71358250fbb46053fd Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 17:21:30 +0100 Subject: [PATCH 06/23] logger --- .../modules/comp_scheduler/background_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py index 16653cb8ae8..a0b950be845 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py @@ -12,7 +12,7 @@ from . import factory -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) _COMPUTATIONAL_SCHEDULER_INTERVAL: Final[datetime.timedelta] = datetime.timedelta( seconds=5 @@ -23,7 +23,7 @@ def on_app_startup(app: FastAPI) -> Callable[[], Coroutine[Any, Any, None]]: async def start_scheduler() -> None: with log_context( - logger, level=logging.INFO, msg="starting computational scheduler" + _logger, level=logging.INFO, msg="starting computational scheduler" ): redis_clients_manager: RedisClientsManager = app.state.redis_clients_manager lock_key = f"{app.title}:computational_scheduler" From 2f41a1fd986198a14413cb332114918085bdaaea Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 17:37:55 +0100 Subject: [PATCH 07/23] renaming modules --- .../api/dependencies/scheduler.py | 2 +- .../api/routes/computations.py | 2 +- .../modules/comp_scheduler/__init__.py | 16 ++++++++++++++-- .../{base_scheduler.py => _base_scheduler.py} | 0 .../{dask_scheduler.py => _dask_scheduler.py} | 18 ++++++++++-------- .../{factory.py => _scheduler_factory.py} | 16 ++++++++-------- .../{background_task.py => _task.py} | 11 ++++------- services/director-v2/tests/unit/_helpers.py | 2 +- ...st_modules_comp_scheduler_dask_scheduler.py | 8 ++++---- 9 files changed, 43 insertions(+), 32 deletions(-) rename services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/{base_scheduler.py => _base_scheduler.py} (100%) rename services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/{dask_scheduler.py => _dask_scheduler.py} (96%) rename services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/{factory.py => _scheduler_factory.py} (79%) rename services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/{background_task.py => _task.py} (86%) diff --git a/services/director-v2/src/simcore_service_director_v2/api/dependencies/scheduler.py b/services/director-v2/src/simcore_service_director_v2/api/dependencies/scheduler.py index cd661759bc5..a0903608789 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/dependencies/scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/api/dependencies/scheduler.py @@ -1,7 +1,7 @@ from fastapi import Depends, FastAPI, Request from ...core.settings import ComputationalBackendSettings -from ...modules.comp_scheduler.base_scheduler import BaseCompScheduler +from ...modules.comp_scheduler import BaseCompScheduler from . import get_app diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 06f5c7e3795..722535605e3 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -63,7 +63,7 @@ from ...models.comp_runs import CompRunsAtDB, ProjectMetadataDict, RunMetadataDict from ...models.comp_tasks import CompTaskAtDB from ...modules.catalog import CatalogClient -from ...modules.comp_scheduler.base_scheduler import BaseCompScheduler +from ...modules.comp_scheduler import BaseCompScheduler from ...modules.db.repositories.clusters import ClustersRepository from ...modules.db.repositories.comp_pipelines import CompPipelinesRepository from ...modules.db.repositories.comp_runs import CompRunsRepository diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/__init__.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/__init__.py index 1f9222007c5..1eb6c3dab10 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/__init__.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/__init__.py @@ -1,3 +1,15 @@ -from .background_task import setup +from fastapi import FastAPI -__all__: tuple[str, ...] = ("setup",) +from ._base_scheduler import BaseCompScheduler +from ._task import on_app_shutdown, on_app_startup + + +def setup(app: FastAPI): + app.add_event_handler("startup", on_app_startup(app)) + app.add_event_handler("shutdown", on_app_shutdown(app)) + + +__all__: tuple[str, ...] = ( + "setup", + "BaseCompScheduler", +) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py similarity index 100% rename from services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/base_scheduler.py rename to services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_dask_scheduler.py similarity index 96% rename from services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py rename to services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_dask_scheduler.py index 3890ee1f7ad..51fb3b1a3fb 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_dask_scheduler.py @@ -30,10 +30,6 @@ from ...models.comp_runs import RunMetadataDict from ...models.comp_tasks import CompTaskAtDB from ...models.dask_subsystem import DaskClientTaskState -from ...modules.dask_client import DaskClient, PublishedComputationTask -from ...modules.dask_clients_pool import DaskClientsPool -from ...modules.db.repositories.clusters import ClustersRepository -from ...modules.db.repositories.comp_runs import CompRunsRepository from ...utils.comp_scheduler import Iteration, get_resource_tracking_run_id from ...utils.dask import ( clean_task_output_and_log_files_if_invalid, @@ -48,8 +44,12 @@ publish_service_stopped_metrics, ) from ..clusters_keeper import get_or_create_on_demand_cluster +from ..dask_client import DaskClient, PublishedComputationTask +from ..dask_clients_pool import DaskClientsPool +from ..db.repositories.clusters import ClustersRepository +from ..db.repositories.comp_runs import CompRunsRepository from ..db.repositories.comp_tasks import CompTasksRepository -from .base_scheduler import BaseCompScheduler, ScheduledPipelineParams +from ._base_scheduler import BaseCompScheduler, ScheduledPipelineParams _logger = logging.getLogger(__name__) @@ -158,9 +158,11 @@ async def _get_tasks_status( for dask_task_state, task in zip(tasks_statuses, tasks, strict=True): if dask_task_state is DaskClientTaskState.PENDING_OR_STARTED: running_states += [ - RunningState.STARTED - if task.progress is not None - else RunningState.PENDING + ( + RunningState.STARTED + if task.progress is not None + else RunningState.PENDING + ) ] else: running_states += [ diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/factory.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py similarity index 79% rename from services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/factory.py rename to services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py index 756fda1cb77..458950e9798 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/factory.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py @@ -2,16 +2,16 @@ from fastapi import FastAPI from models_library.clusters import DEFAULT_CLUSTER_ID -from simcore_service_director_v2.core.settings import AppSettings from ...core.errors import ConfigurationError +from ...core.settings import AppSettings from ...models.comp_runs import CompRunsAtDB -from ...modules.dask_clients_pool import DaskClientsPool -from ...modules.rabbitmq import get_rabbitmq_client, get_rabbitmq_rpc_client from ...utils.comp_scheduler import SCHEDULED_STATES +from ..dask_clients_pool import DaskClientsPool from ..db.repositories.comp_runs import CompRunsRepository -from .base_scheduler import BaseCompScheduler, ScheduledPipelineParams -from .dask_scheduler import DaskScheduler +from ..rabbitmq import get_rabbitmq_client, get_rabbitmq_rpc_client +from ._base_scheduler import BaseCompScheduler, ScheduledPipelineParams +from ._dask_scheduler import DaskScheduler logger = logging.getLogger(__name__) @@ -43,9 +43,9 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler: db_engine=db_engine, scheduled_pipelines={ (r.user_id, r.project_uuid, r.iteration): ScheduledPipelineParams( - cluster_id=r.cluster_id - if r.cluster_id is not None - else DEFAULT_CLUSTER_ID, + cluster_id=( + r.cluster_id if r.cluster_id is not None else DEFAULT_CLUSTER_ID + ), run_metadata=r.metadata, mark_for_cancellation=False, use_on_demand_clusters=r.use_on_demand_clusters, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_task.py similarity index 86% rename from services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py rename to services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_task.py index a0b950be845..1b068c55357 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/background_task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_task.py @@ -10,7 +10,7 @@ from servicelib.redis_utils import exclusive from settings_library.redis import RedisDatabase -from . import factory +from . import _scheduler_factory _logger = logging.getLogger(__name__) @@ -27,7 +27,9 @@ async def start_scheduler() -> None: ): redis_clients_manager: RedisClientsManager = app.state.redis_clients_manager lock_key = f"{app.title}:computational_scheduler" - app.state.scheduler = scheduler = await factory.create_from_db(app) + app.state.scheduler = scheduler = await _scheduler_factory.create_from_db( + app + ) app.state.computational_scheduler_task = start_periodic_task( exclusive( redis_clients_manager.client(RedisDatabase.LOCKS), @@ -45,8 +47,3 @@ async def stop_scheduler() -> None: await stop_periodic_task(app.state.computational_scheduler_task) return stop_scheduler - - -def setup(app: FastAPI): - app.add_event_handler("startup", on_app_startup(app)) - app.add_event_handler("shutdown", on_app_shutdown(app)) diff --git a/services/director-v2/tests/unit/_helpers.py b/services/director-v2/tests/unit/_helpers.py index ad1eafc90ae..2654c63a3e1 100644 --- a/services/director-v2/tests/unit/_helpers.py +++ b/services/director-v2/tests/unit/_helpers.py @@ -11,7 +11,7 @@ from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB from simcore_service_director_v2.models.comp_runs import CompRunsAtDB from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB -from simcore_service_director_v2.modules.comp_scheduler.base_scheduler import ( +from simcore_service_director_v2.modules.comp_scheduler._base_scheduler import ( BaseCompScheduler, ) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index d15ab46a498..11ee8b5e490 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -65,11 +65,11 @@ from simcore_service_director_v2.models.comp_runs import CompRunsAtDB, RunMetadataDict from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB, Image from simcore_service_director_v2.models.dask_subsystem import DaskClientTaskState -from simcore_service_director_v2.modules.comp_scheduler import background_task -from simcore_service_director_v2.modules.comp_scheduler.base_scheduler import ( +from simcore_service_director_v2.modules.comp_scheduler import _task +from simcore_service_director_v2.modules.comp_scheduler._base_scheduler import ( BaseCompScheduler, ) -from simcore_service_director_v2.modules.comp_scheduler.dask_scheduler import ( +from simcore_service_director_v2.modules.comp_scheduler._dask_scheduler import ( DaskScheduler, ) from simcore_service_director_v2.modules.dask_client import ( @@ -219,7 +219,7 @@ def mocked_clean_task_output_fct(mocker: MockerFixture) -> mock.MagicMock: @pytest.fixture def with_disabled_scheduler_task(mocker: MockerFixture) -> None: """disables the scheduler task, note that it needs to be triggered manually then""" - mocker.patch.object(background_task, "scheduler_task") + mocker.patch.object(_task, "scheduler_task") @pytest.fixture From ec41289d7f48f4c1df5b0ffd1af520e304cd9d77 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:32:57 +0100 Subject: [PATCH 08/23] revert pylint --- .../src/simcore_service_director_v2/api/routes/computations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 722535605e3..b7f47b186e7 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -288,7 +288,7 @@ async def _try_start_pipeline( ) # NOTE: in case of a burst of calls to that endpoint, we might end up in a weird state. @run_sequentially_in_context(target_args=["computation.project_id"]) -async def create_computation( # noqa: PLR0913 +async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positional-arguments computation: ComputationCreate, request: Request, project_repo: Annotated[ From 91c6c489766c24f3d8e401644627ee202fa0b521 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:37:07 +0100 Subject: [PATCH 09/23] fix disabled periodic task --- .../test_modules_comp_scheduler_dask_scheduler.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 11ee8b5e490..d60dc26828e 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -219,7 +219,16 @@ def mocked_clean_task_output_fct(mocker: MockerFixture) -> mock.MagicMock: @pytest.fixture def with_disabled_scheduler_task(mocker: MockerFixture) -> None: """disables the scheduler task, note that it needs to be triggered manually then""" - mocker.patch.object(_task, "scheduler_task") + mocker.patch.object(_task, "computational_scheduler_task") + mocker.patch( + "simcore_service_director_v2.modules.comp_scheduler._task.start_periodic_task", + autospec=True, + ) + + mocker.patch( + "simcore_service_director_v2.modules.comp_scheduler._task.stop_periodic_task", + autospec=True, + ) @pytest.fixture From 681aa770fcfb8073ed623b2218386e2ddebc7919 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:40:06 +0100 Subject: [PATCH 10/23] typo --- .../unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index d60dc26828e..76c6f07aacd 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -65,7 +65,6 @@ from simcore_service_director_v2.models.comp_runs import CompRunsAtDB, RunMetadataDict from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB, Image from simcore_service_director_v2.models.dask_subsystem import DaskClientTaskState -from simcore_service_director_v2.modules.comp_scheduler import _task from simcore_service_director_v2.modules.comp_scheduler._base_scheduler import ( BaseCompScheduler, ) @@ -219,7 +218,6 @@ def mocked_clean_task_output_fct(mocker: MockerFixture) -> mock.MagicMock: @pytest.fixture def with_disabled_scheduler_task(mocker: MockerFixture) -> None: """disables the scheduler task, note that it needs to be triggered manually then""" - mocker.patch.object(_task, "computational_scheduler_task") mocker.patch( "simcore_service_director_v2.modules.comp_scheduler._task.start_periodic_task", autospec=True, From 5e8e9ebc947dd00ed4117eba5c9bd1c5c967a1a4 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:41:49 +0100 Subject: [PATCH 11/23] ensure redis client is created --- .../src/simcore_service_director_v2/core/application.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index 9972a42cce5..a3f9d2ca6db 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -174,8 +174,9 @@ def init_app(settings: AppSettings | None = None) -> FastAPI: if dynamic_scheduler_enabled or computational_backend_enabled: rabbitmq.setup(app) - if dynamic_scheduler_enabled: + if dynamic_scheduler_enabled or computational_backend_enabled: redis.setup(app) + if dynamic_scheduler_enabled: dynamic_sidecar.setup(app) socketio.setup(app) notifier.setup(app) From dbb136a6aee6e7e7a8dce85cad1c413d55e686fb Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:42:14 +0100 Subject: [PATCH 12/23] add redis --- .../unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 76c6f07aacd..51f37adafce 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -83,7 +83,7 @@ from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed -pytest_simcore_core_services_selection = ["postgres", "rabbit"] +pytest_simcore_core_services_selection = ["postgres", "rabbit", "redis"] pytest_simcore_ops_services_selection = [ "adminer", ] From ef8c5d8cc838b4035b29ca4da9ee672b4e94d71e Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:43:22 +0100 Subject: [PATCH 13/23] add some noqa --- .../with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 51f37adafce..22fe4c730b1 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -102,7 +102,9 @@ def _assert_dask_client_correctly_initialized( ) mocked_dask_client.register_handlers.assert_called_once_with( TaskHandlers( - cast(DaskScheduler, scheduler)._task_progress_change_handler, + cast( + DaskScheduler, scheduler + )._task_progress_change_handler, # noqa: SLF001 cast(DaskScheduler, scheduler)._task_log_change_handler, # noqa: SLF001 ) ) @@ -236,7 +238,7 @@ async def minimal_app(async_client: httpx.AsyncClient) -> FastAPI: # a new thread on which it creates a new loop # causing issues downstream with coroutines not # being created on the same loop - return async_client._transport.app # type: ignore + return async_client._transport.app # type: ignore # noqa: SLF001 @pytest.fixture From 1a3e8fdc5e71a05279a359cf4b6e7d5e41dcfd23 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:44:59 +0100 Subject: [PATCH 14/23] missing fixture --- .../unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 22fe4c730b1..16c6f5ea553 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -46,6 +46,7 @@ from pytest_simcore.helpers.typing_env import EnvVarsDict from servicelib.rabbitmq import RabbitMQClient from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings from simcore_postgres_database.models.comp_runs import comp_runs from simcore_postgres_database.models.comp_tasks import NodeClass, comp_tasks from simcore_service_director_v2.core.application import init_app @@ -164,6 +165,7 @@ def minimal_dask_scheduler_config( postgres_host_config: dict[str, str], monkeypatch: pytest.MonkeyPatch, rabbit_service: RabbitSettings, + redis_service: RedisSettings, faker: Faker, ) -> None: """set a minimal configuration for testing the dask connection only""" From 1d0fad4a447c209c13388a5fcf78593088eb5428 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:47:36 +0100 Subject: [PATCH 15/23] fix --- .../unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 16c6f5ea553..5e7c39e8913 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -258,7 +258,7 @@ async def test_scheduler_gracefully_starts_and_stops( minimal_app: FastAPI, ): # check it started correctly - assert minimal_app.state.scheduler_task is not None + assert minimal_app.state.computational_scheduler_task is not None @pytest.mark.parametrize( From d7eab7da7fa910490fedc9e100c8ef4bf8bfce12 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:50:46 +0100 Subject: [PATCH 16/23] another typo --- .../test_modules_comp_scheduler_dask_scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 5e7c39e8913..ea862b6e078 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -205,7 +205,7 @@ def mocked_dask_client(mocker: MockerFixture) -> mock.MagicMock: @pytest.fixture def mocked_parse_output_data_fct(mocker: MockerFixture) -> mock.Mock: return mocker.patch( - "simcore_service_director_v2.modules.comp_scheduler.dask_scheduler.parse_output_data", + "simcore_service_director_v2.modules.comp_scheduler._dask_scheduler.parse_output_data", autospec=True, ) @@ -213,7 +213,7 @@ def mocked_parse_output_data_fct(mocker: MockerFixture) -> mock.Mock: @pytest.fixture def mocked_clean_task_output_fct(mocker: MockerFixture) -> mock.MagicMock: return mocker.patch( - "simcore_service_director_v2.modules.comp_scheduler.dask_scheduler.clean_task_output_and_log_files_if_invalid", + "simcore_service_director_v2.modules.comp_scheduler._dask_scheduler.clean_task_output_and_log_files_if_invalid", return_value=None, autospec=True, ) @@ -246,7 +246,7 @@ async def minimal_app(async_client: httpx.AsyncClient) -> FastAPI: @pytest.fixture def mocked_clean_task_output_and_log_files_if_invalid(mocker: MockerFixture) -> None: mocker.patch( - "simcore_service_director_v2.modules.comp_scheduler.dask_scheduler.clean_task_output_and_log_files_if_invalid", + "simcore_service_director_v2.modules.comp_scheduler._dask_scheduler.clean_task_output_and_log_files_if_invalid", autospec=True, ) @@ -1066,7 +1066,7 @@ async def test_handling_of_disconnected_dask_scheduler( ): # this will create a non connected backend issue that will trigger re-connection mocked_dask_client_send_task = mocker.patch( - "simcore_service_director_v2.modules.comp_scheduler.dask_scheduler.DaskClient.send_computation_tasks", + "simcore_service_director_v2.modules.comp_scheduler._dask_scheduler.DaskClient.send_computation_tasks", side_effect=backend_error, ) assert mocked_dask_client_send_task From 7a606accbcce5d39d7dcb0b1bdd780c93a17f3f9 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Fri, 1 Nov 2024 18:51:12 +0100 Subject: [PATCH 17/23] again --- .../unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index ea862b6e078..fbc90204f83 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -1371,7 +1371,7 @@ async def _return_1st_task_running(job_ids: list[str]) -> list[DaskClientTaskSta @pytest.fixture async def mocked_get_or_create_cluster(mocker: MockerFixture) -> mock.Mock: return mocker.patch( - "simcore_service_director_v2.modules.comp_scheduler.dask_scheduler.get_or_create_on_demand_cluster", + "simcore_service_director_v2.modules.comp_scheduler._dask_scheduler.get_or_create_on_demand_cluster", autospec=True, ) From a6e88a7fb233b8f0969c1d01c00286efdcb4bcf9 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 4 Nov 2024 10:09:45 +0100 Subject: [PATCH 18/23] allows background task to have wake up events --- .../src/servicelib/background_task.py | 23 +++++- .../tests/test_background_task.py | 78 +++++++++++++++---- 2 files changed, 86 insertions(+), 15 deletions(-) diff --git a/packages/service-library/src/servicelib/background_task.py b/packages/service-library/src/servicelib/background_task.py index e7a4c665c49..bb3537f33cc 100644 --- a/packages/service-library/src/servicelib/background_task.py +++ b/packages/service-library/src/servicelib/background_task.py @@ -25,15 +25,34 @@ class PeriodicTaskCancellationError(PydanticErrorMixin, Exception): msg_template: str = "Could not cancel task '{task_name}'" +class SleepUsingAsyncioEvent: + """Sleep strategy that waits on an event to be set.""" + + def __init__(self, event: "asyncio.Event") -> None: + self.event = event + + async def __call__(self, timeout: float | None) -> None: + with contextlib.suppress(TimeoutError): + await asyncio.wait_for(self.event.wait(), timeout=timeout) + self.event.clear() + + async def _periodic_scheduled_task( task: Callable[..., Awaitable[None]], *, interval: datetime.timedelta, task_name: str, + early_wake_up_event: asyncio.Event | None, **task_kwargs, ) -> None: # NOTE: This retries forever unless cancelled - async for attempt in AsyncRetrying(wait=wait_fixed(interval.total_seconds())): + nap = asyncio.sleep + if early_wake_up_event is not None: + nap = SleepUsingAsyncioEvent(early_wake_up_event) + async for attempt in AsyncRetrying( + sleep=nap, + wait=wait_fixed(interval.total_seconds()), + ): with attempt: with log_context( _logger, @@ -51,6 +70,7 @@ def start_periodic_task( interval: datetime.timedelta, task_name: str, wait_before_running: datetime.timedelta = datetime.timedelta(0), + early_wake_up_event: asyncio.Event | None = None, **kwargs, ) -> asyncio.Task: with log_context( @@ -64,6 +84,7 @@ def start_periodic_task( task, interval=interval, task_name=task_name, + early_wake_up_event=early_wake_up_event, **kwargs, ), name=task_name, diff --git a/packages/service-library/tests/test_background_task.py b/packages/service-library/tests/test_background_task.py index 59b2aaeff3f..8d69b5f5e2b 100644 --- a/packages/service-library/tests/test_background_task.py +++ b/packages/service-library/tests/test_background_task.py @@ -6,7 +6,8 @@ import asyncio import datetime -from typing import AsyncIterator, Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable +from typing import Final from unittest import mock import pytest @@ -18,13 +19,13 @@ stop_periodic_task, ) -_FAST_POLL_INTERVAL = 1 +_FAST_POLL_INTERVAL: Final[int] = 1 +_VERY_SLOW_POLL_INTERVAL: Final[int] = 100 @pytest.fixture def mock_background_task(mocker: MockerFixture) -> mock.AsyncMock: - mocked_task = mocker.AsyncMock(return_value=None) - return mocked_task + return mocker.AsyncMock(return_value=None) @pytest.fixture @@ -32,7 +33,12 @@ def task_interval() -> datetime.timedelta: return datetime.timedelta(seconds=_FAST_POLL_INTERVAL) -@pytest.fixture(params=[None, 1]) +@pytest.fixture +def very_long_task_interval() -> datetime.timedelta: + return datetime.timedelta(seconds=_VERY_SLOW_POLL_INTERVAL) + + +@pytest.fixture(params=[None, 1], ids=lambda x: f"stop-timeout={x}") def stop_task_timeout(request: pytest.FixtureRequest) -> float | None: return request.param @@ -40,16 +46,23 @@ def stop_task_timeout(request: pytest.FixtureRequest) -> float | None: @pytest.fixture async def create_background_task( faker: Faker, stop_task_timeout: float | None -) -> AsyncIterator[Callable[[datetime.timedelta, Callable], Awaitable[asyncio.Task]]]: +) -> AsyncIterator[ + Callable[ + [datetime.timedelta, Callable, asyncio.Event | None], Awaitable[asyncio.Task] + ] +]: created_tasks = [] async def _creator( - interval: datetime.timedelta, task: Callable[..., Awaitable] + interval: datetime.timedelta, + task: Callable[..., Awaitable], + early_wake_up_event: asyncio.Event | None, ) -> asyncio.Task: background_task = start_periodic_task( task, interval=interval, task_name=faker.pystr(), + early_wake_up_event=early_wake_up_event, ) assert background_task created_tasks.append(background_task) @@ -62,33 +75,69 @@ async def _creator( ) +@pytest.mark.parametrize( + "wake_up_event", [None, asyncio.Event], ids=lambda x: f"wake-up-event: {x}" +) async def test_background_task_created_and_deleted( mock_background_task: mock.AsyncMock, task_interval: datetime.timedelta, create_background_task: Callable[ - [datetime.timedelta, Callable], Awaitable[asyncio.Task] + [datetime.timedelta, Callable, asyncio.Event | None], Awaitable[asyncio.Task] ], + wake_up_event: Callable | None, ): - task = await create_background_task( + event = wake_up_event() if wake_up_event else None + _task = await create_background_task( task_interval, mock_background_task, + event, ) await asyncio.sleep(5 * task_interval.total_seconds()) mock_background_task.assert_called() - assert mock_background_task.call_count > 1 + assert mock_background_task.call_count > 2 + + +async def test_background_task_wakes_up_early( + mock_background_task: mock.AsyncMock, + very_long_task_interval: datetime.timedelta, + create_background_task: Callable[ + [datetime.timedelta, Callable, asyncio.Event | None], Awaitable[asyncio.Task] + ], +): + wake_up_event = asyncio.Event() + _task = await create_background_task( + very_long_task_interval, + mock_background_task, + wake_up_event, + ) + await asyncio.sleep(5 * _FAST_POLL_INTERVAL) + # now the task should have run only once + mock_background_task.assert_called_once() + await asyncio.sleep(5 * _FAST_POLL_INTERVAL) + mock_background_task.assert_called_once() + # this should wake up the task + wake_up_event.set() + await asyncio.sleep(5 * _FAST_POLL_INTERVAL) + mock_background_task.assert_called() + assert mock_background_task.call_count == 2 + # no change this now waits again a very long time + await asyncio.sleep(5 * _FAST_POLL_INTERVAL) + mock_background_task.assert_called() + assert mock_background_task.call_count == 2 async def test_background_task_raises_restarts( mock_background_task: mock.AsyncMock, task_interval: datetime.timedelta, create_background_task: Callable[ - [datetime.timedelta, Callable], Awaitable[asyncio.Task] + [datetime.timedelta, Callable, asyncio.Event | None], Awaitable[asyncio.Task] ], ): mock_background_task.side_effect = RuntimeError("pytest faked runtime error") - task = await create_background_task( + _task = await create_background_task( task_interval, mock_background_task, + None, ) await asyncio.sleep(5 * task_interval.total_seconds()) mock_background_task.assert_called() @@ -99,13 +148,14 @@ async def test_background_task_correctly_cancels( mock_background_task: mock.AsyncMock, task_interval: datetime.timedelta, create_background_task: Callable[ - [datetime.timedelta, Callable], Awaitable[asyncio.Task] + [datetime.timedelta, Callable, asyncio.Event | None], Awaitable[asyncio.Task] ], ): mock_background_task.side_effect = asyncio.CancelledError - task = await create_background_task( + _task = await create_background_task( task_interval, mock_background_task, + None, ) await asyncio.sleep(5 * task_interval.total_seconds()) # the task will be called once, and then stop From 1ee57023c811575a45c2979e1c55eaf81b8c1bc6 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 4 Nov 2024 10:10:32 +0100 Subject: [PATCH 19/23] ensure wake up still works --- .../simcore_service_director_v2/modules/comp_scheduler/_task.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_task.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_task.py index 1b068c55357..0e1c79ff8b6 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_task.py @@ -37,6 +37,7 @@ async def start_scheduler() -> None: )(scheduler.schedule_all_pipelines), interval=_COMPUTATIONAL_SCHEDULER_INTERVAL, task_name=_TASK_NAME, + early_wake_up_event=scheduler.wake_up_event, ) return start_scheduler From 8481395f00f6225288b4f208b0fff7ece328bfdc Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 4 Nov 2024 10:13:07 +0100 Subject: [PATCH 20/23] simplify --- .../src/simcore_service_director_v2/core/application.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index a3f9d2ca6db..f1c81f18f98 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -173,9 +173,8 @@ def init_app(settings: AppSettings | None = None) -> FastAPI: ) if dynamic_scheduler_enabled or computational_backend_enabled: rabbitmq.setup(app) - - if dynamic_scheduler_enabled or computational_backend_enabled: redis.setup(app) + if dynamic_scheduler_enabled: dynamic_sidecar.setup(app) socketio.setup(app) From f76783397b7d575ba5bd297b5607f1075cc10ee4 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 4 Nov 2024 10:33:08 +0100 Subject: [PATCH 21/23] redis dependency was missing --- .../tests/unit/with_dbs/test_api_route_computations.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py b/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py index 81034fbaee5..1135465ef61 100644 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py +++ b/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py @@ -53,6 +53,7 @@ from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings from simcore_postgres_database.models.comp_pipeline import StateType from simcore_postgres_database.models.comp_tasks import NodeClass from simcore_postgres_database.utils_projects_nodes import ProjectNodesRepo @@ -65,7 +66,7 @@ ) from simcore_service_director_v2.utils.computations import to_node_class -pytest_simcore_core_services_selection = ["postgres", "rabbit"] +pytest_simcore_core_services_selection = ["postgres", "rabbit", "redis"] pytest_simcore_ops_services_selection = [ "adminer", ] @@ -84,6 +85,7 @@ def minimal_configuration( mock_env: EnvVarsDict, postgres_host_config: dict[str, str], rabbit_service: RabbitSettings, + redis_service: RedisSettings, monkeypatch: pytest.MonkeyPatch, mocked_rabbit_mq_client: None, faker: Faker, From 44f2848a1cb8159273cdb52b544800cbbe722187 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 4 Nov 2024 10:35:02 +0100 Subject: [PATCH 22/23] mypy --- .../service-library/src/servicelib/background_task.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/service-library/src/servicelib/background_task.py b/packages/service-library/src/servicelib/background_task.py index bb3537f33cc..5bd2f09ecb6 100644 --- a/packages/service-library/src/servicelib/background_task.py +++ b/packages/service-library/src/servicelib/background_task.py @@ -46,9 +46,11 @@ async def _periodic_scheduled_task( **task_kwargs, ) -> None: # NOTE: This retries forever unless cancelled - nap = asyncio.sleep - if early_wake_up_event is not None: - nap = SleepUsingAsyncioEvent(early_wake_up_event) + nap = ( + asyncio.sleep + if early_wake_up_event is None + else SleepUsingAsyncioEvent(early_wake_up_event) + ) async for attempt in AsyncRetrying( sleep=nap, wait=wait_fixed(interval.total_seconds()), From 58db47bb45fb0a13baf51f7d60ce5f9e6d5ed32b Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Mon, 4 Nov 2024 11:35:23 +0100 Subject: [PATCH 23/23] requires redis dependency --- .../director-v2/tests/integration/01/test_computation_api.py | 2 ++ .../02/test_dynamic_sidecar_nodeports_integration.py | 1 + 2 files changed, 3 insertions(+) diff --git a/services/director-v2/tests/integration/01/test_computation_api.py b/services/director-v2/tests/integration/01/test_computation_api.py index 110dbd5f89b..16a6311da1b 100644 --- a/services/director-v2/tests/integration/01/test_computation_api.py +++ b/services/director-v2/tests/integration/01/test_computation_api.py @@ -32,6 +32,7 @@ from pytest_simcore.helpers.postgres_tools import PostgresTestConfig from pytest_simcore.helpers.typing_env import EnvVarsDict from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings from starlette import status from starlette.testclient import TestClient from yarl import URL @@ -87,6 +88,7 @@ def minimal_configuration( postgres_db: sa.engine.Engine, postgres_host_config: PostgresTestConfig, rabbit_service: RabbitSettings, + redis_service: RedisSettings, simcore_services_ready: None, storage_service: URL, ) -> None: diff --git a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py index 17d3fe4bcca..720e7d0c3e1 100644 --- a/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py +++ b/services/director-v2/tests/integration/02/test_dynamic_sidecar_nodeports_integration.py @@ -108,6 +108,7 @@ "migration", "postgres", "rabbit", + "redis", "storage", "redis", ]