diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index e64074c6d99c..fe0af49fc5c6 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -8,11 +8,7 @@ from common_library.pydantic_validators import validate_numeric_string_as_timedelta from fastapi import FastAPI -from models_library.basic_types import ( - LogLevel, - PortInt, - VersionTag, -) +from models_library.basic_types import LogLevel, PortInt, VersionTag from models_library.clusters import ( DEFAULT_CLUSTER_ID, Cluster, @@ -26,6 +22,7 @@ AnyUrl, Field, NonNegativeInt, + PositiveInt, field_validator, ) from servicelib.logging_utils_filtering import LoggerName, MessageSubstring @@ -77,6 +74,10 @@ class ComputationalBackendSettings(BaseCustomSettings): COMPUTATIONAL_BACKEND_ENABLED: bool = Field( default=True, ) + COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY: PositiveInt = Field( + default=50, + description="defines how many pipelines the application can schedule concurrently", + ) COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED: bool = Field( default=True, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py index 38b575751427..397b68db0c9a 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_worker.py @@ -1,3 +1,4 @@ +import asyncio import contextlib import functools import logging @@ -10,6 +11,7 @@ from servicelib.redis import CouldNotAcquireLockError from servicelib.redis_utils import exclusive +from ...core.settings import get_application_settings from ...models.comp_runs import Iteration from ..rabbitmq import get_rabbitmq_client from ._constants import MODULE_NAME_WORKER @@ -62,11 +64,19 @@ async def _handle_apply_distributed_schedule(app: FastAPI, data: bytes) -> bool: async def setup_worker(app: FastAPI) -> None: + app_settings = get_application_settings(app) rabbitmq_client = get_rabbitmq_client(app) - await rabbitmq_client.subscribe( - SchedulePipelineRabbitMessage.get_channel_name(), - functools.partial(_handle_apply_distributed_schedule, app), - exclusive_queue=False, + app.state.scheduler_worker_consumers = await asyncio.gather( + *( + rabbitmq_client.subscribe( + SchedulePipelineRabbitMessage.get_channel_name(), + functools.partial(_handle_apply_distributed_schedule, app), + exclusive_queue=False, + ) + for _ in range( + app_settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND.COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY + ) + ) ) app.state.scheduler_worker = create_scheduler(app) @@ -74,5 +84,11 @@ async def setup_worker(app: FastAPI) -> None: async def shutdown_worker(app: FastAPI) -> None: assert app.state.scheduler_worker # nosec - # TODO: we might need to cancel stuff here. not sure yet what - # unsubscribing is maybe not a good idea if we want to keep the data in the queue + rabbitmq_client = get_rabbitmq_client(app) + await asyncio.gather( + *( + rabbitmq_client.unsubscribe_consumer(*consumer) + for consumer in app.state.scheduler_worker_consumers + ), + return_exceptions=False, + ) diff --git a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py index 9c21d769809e..e9bc9e831a80 100644 --- a/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py +++ b/services/director-v2/tests/unit/with_dbs/comp_scheduler/test_worker.py @@ -8,7 +8,7 @@ # pylint: disable=too-many-statements import asyncio -from typing import Awaitable, Callable +from collections.abc import Awaitable, Callable from unittest import mock import pytest @@ -16,6 +16,8 @@ from fastapi import FastAPI from models_library.clusters import DEFAULT_CLUSTER_ID from pytest_mock import MockerFixture +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.typing_env import EnvVarsDict from simcore_service_director_v2.models.comp_runs import RunMetadataDict from simcore_service_director_v2.modules.comp_scheduler._manager import run_new_pipeline from simcore_service_director_v2.modules.comp_scheduler._models import ( @@ -83,10 +85,23 @@ async def mocked_scheduler_api(mocker: MockerFixture) -> mock.Mock: ) +@pytest.fixture +def with_scheduling_concurrency( + mock_env: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, scheduling_concurrency: int +) -> EnvVarsDict: + return mock_env | setenvs_from_dict( + monkeypatch, + {"COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY": f"{scheduling_concurrency}"}, + ) + + +@pytest.mark.parametrize("scheduling_concurrency", [1, 50, 100]) @pytest.mark.parametrize( "queue_name", [SchedulePipelineRabbitMessage.get_channel_name()] ) async def test_worker_scheduling_parallelism( + scheduling_concurrency: int, + with_scheduling_concurrency: EnvVarsDict, with_disabled_auto_scheduling: mock.Mock, mocked_scheduler_api: mock.Mock, initialized_app: FastAPI, @@ -113,9 +128,8 @@ async def _project_pipeline_creation_workflow() -> None: use_on_demand_clusters=False, ) - num_concurrent_calls = 10 await asyncio.gather( - *(_project_pipeline_creation_workflow() for _ in range(num_concurrent_calls)) + *(_project_pipeline_creation_workflow() for _ in range(scheduling_concurrency)) ) mocked_scheduler_api.assert_called() - assert mocked_scheduler_api.call_count == num_concurrent_calls + assert mocked_scheduler_api.call_count == scheduling_concurrency