Skip to content

Commit

Permalink
added setting to control scheduling concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 26, 2024
1 parent 1eb85e5 commit 865a757
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@

from common_library.pydantic_validators import validate_numeric_string_as_timedelta
from fastapi import FastAPI
from models_library.basic_types import (
LogLevel,
PortInt,
VersionTag,
)
from models_library.basic_types import LogLevel, PortInt, VersionTag
from models_library.clusters import (
DEFAULT_CLUSTER_ID,
Cluster,
Expand All @@ -26,6 +22,7 @@
AnyUrl,
Field,
NonNegativeInt,
PositiveInt,
field_validator,
)
from servicelib.logging_utils_filtering import LoggerName, MessageSubstring
Expand Down Expand Up @@ -77,6 +74,10 @@ class ComputationalBackendSettings(BaseCustomSettings):
COMPUTATIONAL_BACKEND_ENABLED: bool = Field(
default=True,
)
COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY: PositiveInt = Field(
default=50,
description="defines how many pipelines the application can schedule concurrently",
)
COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED: bool = Field(
default=True,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import contextlib
import functools
import logging
Expand All @@ -10,6 +11,7 @@
from servicelib.redis import CouldNotAcquireLockError
from servicelib.redis_utils import exclusive

from ...core.settings import get_application_settings
from ...models.comp_runs import Iteration
from ..rabbitmq import get_rabbitmq_client
from ._constants import MODULE_NAME_WORKER
Expand Down Expand Up @@ -62,17 +64,31 @@ async def _handle_apply_distributed_schedule(app: FastAPI, data: bytes) -> bool:


async def setup_worker(app: FastAPI) -> None:
app_settings = get_application_settings(app)
rabbitmq_client = get_rabbitmq_client(app)
await rabbitmq_client.subscribe(
SchedulePipelineRabbitMessage.get_channel_name(),
functools.partial(_handle_apply_distributed_schedule, app),
exclusive_queue=False,
app.state.scheduler_worker_consumers = await asyncio.gather(
*(
rabbitmq_client.subscribe(
SchedulePipelineRabbitMessage.get_channel_name(),
functools.partial(_handle_apply_distributed_schedule, app),
exclusive_queue=False,
)
for _ in range(
app_settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND.COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY
)
)
)

app.state.scheduler_worker = create_scheduler(app)


async def shutdown_worker(app: FastAPI) -> None:
assert app.state.scheduler_worker # nosec
# TODO: we might need to cancel stuff here. not sure yet what
# unsubscribing is maybe not a good idea if we want to keep the data in the queue
rabbitmq_client = get_rabbitmq_client(app)
await asyncio.gather(
*(
rabbitmq_client.unsubscribe_consumer(*consumer)
for consumer in app.state.scheduler_worker_consumers
),
return_exceptions=False,
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@
# pylint: disable=too-many-statements

import asyncio
from typing import Awaitable, Callable
from collections.abc import Awaitable, Callable
from unittest import mock

import pytest
from _helpers import PublishedProject
from fastapi import FastAPI
from models_library.clusters import DEFAULT_CLUSTER_ID
from pytest_mock import MockerFixture
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from simcore_service_director_v2.models.comp_runs import RunMetadataDict
from simcore_service_director_v2.modules.comp_scheduler._manager import run_new_pipeline
from simcore_service_director_v2.modules.comp_scheduler._models import (
Expand Down Expand Up @@ -83,10 +85,23 @@ async def mocked_scheduler_api(mocker: MockerFixture) -> mock.Mock:
)


@pytest.fixture
def with_scheduling_concurrency(
mock_env: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, scheduling_concurrency: int
) -> EnvVarsDict:
return mock_env | setenvs_from_dict(
monkeypatch,
{"COMPUTATIONAL_BACKEND_SCHEDULING_CONCURRENCY": f"{scheduling_concurrency}"},
)


@pytest.mark.parametrize("scheduling_concurrency", [1, 50, 100])
@pytest.mark.parametrize(
"queue_name", [SchedulePipelineRabbitMessage.get_channel_name()]
)
async def test_worker_scheduling_parallelism(
scheduling_concurrency: int,
with_scheduling_concurrency: EnvVarsDict,
with_disabled_auto_scheduling: mock.Mock,
mocked_scheduler_api: mock.Mock,
initialized_app: FastAPI,
Expand All @@ -113,9 +128,8 @@ async def _project_pipeline_creation_workflow() -> None:
use_on_demand_clusters=False,
)

num_concurrent_calls = 10
await asyncio.gather(
*(_project_pipeline_creation_workflow() for _ in range(num_concurrent_calls))
*(_project_pipeline_creation_workflow() for _ in range(scheduling_concurrency))
)
mocked_scheduler_api.assert_called()
assert mocked_scheduler_api.call_count == num_concurrent_calls
assert mocked_scheduler_api.call_count == scheduling_concurrency

0 comments on commit 865a757

Please sign in to comment.