Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ Redirecting inputs retrieval via dynamic-scheduler ⚠️ #6908

Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
from typing import Final

from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
Expand All @@ -11,6 +14,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from pydantic import NonNegativeInt, TypeAdapter
from servicelib.logging_utils import log_decorator
Expand Down Expand Up @@ -95,6 +99,25 @@ async def stop_dynamic_service(
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def retrieve_inputs(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout_s: NonNegativeInt,
) -> RetrieveDataOutEnveloped:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_inputs"),
node_id=node_id,
port_keys=port_keys,
timeout_s=timeout_s,
)
assert isinstance(result, RetrieveDataOutEnveloped) # nosec
return result


@log_decorator(_logger, level=logging.DEBUG)
async def update_projects_networks(
rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID
Expand Down
9 changes: 5 additions & 4 deletions services/director/src/simcore_service_director/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ async def _get_node_details(


async def get_services_details(
app: FastAPI, user_id: str | None, study_id: str | None
app: FastAPI, user_id: str | None, project_id: str | None
) -> list[dict]:
app_settings = get_application_settings(app)
async with docker_utils.docker_client() as client: # pylint: disable=not-async-context-manager
Expand All @@ -1091,9 +1091,10 @@ async def get_services_details(
filters.append(
f"{_to_simcore_runtime_docker_label_key('user_id')}=" + user_id
)
if study_id:
if project_id:
filters.append(
f"{_to_simcore_runtime_docker_label_key('project_id')}=" + study_id
f"{_to_simcore_runtime_docker_label_key('project_id')}="
+ project_id
)
list_running_services = await client.services.list(
filters={"label": filters}
Expand All @@ -1104,7 +1105,7 @@ async def get_services_details(
for service in list_running_services
]
except aiodocker.DockerError as err:
msg = f"Error while accessing container for {user_id=}, {study_id=}"
msg = f"Error while accessing container for {user_id=}, {project_id=}"
raise GenericDockerError(err=msg) from err


Expand Down
1 change: 1 addition & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ services:
DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING}
DYNAMIC_SCHEDULER_TRACING: ${DYNAMIC_SCHEDULER_TRACING}
DYNAMIC_SCHEDULER_UI_STORAGE_SECRET: ${DYNAMIC_SCHEDULER_UI_STORAGE_SECRET}
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT}
TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}
TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT}
static-webserver:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from servicelib.rabbitmq import RPCRouter
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import (
Expand Down Expand Up @@ -58,6 +62,15 @@ async def stop_dynamic_service(
)


@router.expose()
async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
return await scheduler_interface.retrieve_inputs(
app, node_id=node_id, port_keys=port_keys
)


@router.expose()
async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
await scheduler_interface.update_projects_networks(app, project_id=project_id)
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,26 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):

DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: datetime.timedelta = Field(
default=datetime.timedelta(minutes=60),
validation_alias=AliasChoices(
"DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT",
"DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT",
),
description=(
"Time to wait before timing out when stopping a dynamic service. "
"Since services require data to be stopped, this operation is timed out after 1 hour"
),
)

DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field(
default=datetime.timedelta(minutes=60),
description=(
"When dynamic services upload and download data from storage, "
"sometimes very big payloads are involved. In order to handle "
"such payloads it is required to have long timeouts which "
"allow the service to finish the operation."
),
)

DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field(
default=False,
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
from typing import Any

from fastapi import FastAPI, status
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.fastapi.app_state import SingletonInAppStateMixin
Expand Down Expand Up @@ -75,7 +79,7 @@ async def stop_dynamic_service(
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
timeout: datetime.timedelta, # noqa: ASYNC109
timeout: datetime.timedelta # noqa: ASYNC109
) -> None:
try:
await self.thin_client.delete_dynamic_service(
Expand All @@ -100,6 +104,19 @@ async def stop_dynamic_service(

raise

async def retrieve_inputs(
self,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout: datetime.timedelta # noqa: ASYNC109
) -> RetrieveDataOutEnveloped:
response = await self.thin_client.dynamic_service_retrieve(
node_id=node_id, port_keys=port_keys, timeout=timeout
)
dict_response: dict[str, Any] = response.json()
return TypeAdapter(RetrieveDataOutEnveloped).validate_python(dict_response)

async def list_tracked_dynamic_services(
self, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_resources import ServiceResourcesDictHelpers
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from servicelib.common_headers import (
X_DYNAMIC_SIDECAR_REQUEST_DNS,
Expand Down Expand Up @@ -91,7 +92,7 @@ async def delete_dynamic_service(
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
timeout: datetime.timedelta,
timeout: datetime.timedelta, # noqa: ASYNC109
) -> Response:
@retry_on_errors(total_retry_timeout_overwrite=timeout.total_seconds())
@expect_status(status.HTTP_204_NO_CONTENT)
Expand All @@ -112,6 +113,22 @@ async def _(

return await _(self)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def dynamic_service_retrieve(
self,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout: datetime.timedelta, # noqa: ASYNC109
) -> Response:
post_data = {"port_keys": port_keys}
return await self.client.post(
f"/dynamic_services/{node_id}:retrieve",
content=json_dumps(post_data),
timeout=timeout.total_seconds(),
)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def get_dynamic_services(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID

from ..core.settings import ApplicationSettings
Expand Down Expand Up @@ -75,6 +79,21 @@ async def stop_dynamic_service(
await set_request_as_stopped(app, dynamic_service_stop)


async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
return await director_v2_client.retrieve_inputs(
node_id=node_id,
port_keys=port_keys,
timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT,
)


async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from faker import Faker
from fastapi import FastAPI, status
from fastapi.encoders import jsonable_encoder
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
Expand Down Expand Up @@ -55,14 +58,14 @@ def node_not_found(faker: Faker) -> NodeID:
@pytest.fixture
def service_status_new_style() -> DynamicServiceGet:
return TypeAdapter(DynamicServiceGet).validate_python(
DynamicServiceGet.model_config["json_schema_extra"]["examples"][1]
DynamicServiceGet.model_json_schema()["examples"][1]
GitHK marked this conversation as resolved.
Show resolved Hide resolved
)


@pytest.fixture
def service_status_legacy() -> NodeGet:
return TypeAdapter(NodeGet).validate_python(
NodeGet.model_config["json_schema_extra"]["examples"][1]
NodeGet.model_json_schema()["examples"][1]
)


Expand Down Expand Up @@ -112,9 +115,7 @@ def mock_director_v2_service_state(
mock.get("/dynamic_services").respond(
status.HTTP_200_OK,
text=json.dumps(
jsonable_encoder(
DynamicServiceGet.model_config["json_schema_extra"]["examples"]
)
jsonable_encoder(DynamicServiceGet.model_json_schema()["examples"])
),
)

Expand Down Expand Up @@ -193,7 +194,7 @@ async def test_list_tracked_dynamic_services(rpc_client: RabbitMQRPCClient):
assert len(results) == 2
assert results == [
TypeAdapter(DynamicServiceGet).validate_python(x)
for x in DynamicServiceGet.model_config["json_schema_extra"]["examples"]
for x in DynamicServiceGet.model_json_schema()["examples"]
]


Expand Down Expand Up @@ -223,7 +224,7 @@ async def test_get_state(
def dynamic_service_start() -> DynamicServiceStart:
# one for legacy and one for new style?
return TypeAdapter(DynamicServiceStart).validate_python(
DynamicServiceStart.model_config["json_schema_extra"]["example"]
DynamicServiceStart.model_json_schema()["example"]
)


Expand Down Expand Up @@ -492,6 +493,41 @@ async def test_stop_dynamic_service_serializes_generic_errors(
)


@pytest.fixture
def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]:
with respx.mock(
base_url="http://director-v2:8000/v2",
assert_all_called=False,
assert_all_mocked=True, # IMPORTANT: KEEP always True!
) as mock:
request_ok = mock.post(f"/dynamic_services/{node_id}:retrieve")

request_ok.respond(
status.HTTP_200_OK,
text=TypeAdapter(RetrieveDataOutEnveloped)
.validate_python(
RetrieveDataOutEnveloped.model_json_schema()["examples"][0]
)
.model_dump_json(),
)

yield None


async def test_retrieve_inputs(
mock_director_v2_service_retrieve_inputs: None,
rpc_client: RabbitMQRPCClient,
node_id: NodeID,
):
results = await services.retrieve_inputs(
rpc_client, node_id=node_id, port_keys=[], timeout_s=10
)
assert (
results.model_dump(mode="python")
== RetrieveDataOutEnveloped.model_json_schema()["examples"][0]
)


@pytest.fixture
def mock_director_v2_update_projects_networks(project_id: ProjectID) -> Iterator[None]:
with respx.mock(
Expand Down
Loading
Loading