Skip to content

Commit

Permalink
Merge branch 'master' into 6931-update-webserver-swagger
Browse files Browse the repository at this point in the history
  • Loading branch information
bisgaard-itis committed Dec 17, 2024
2 parents ce53390 + 38109f8 commit 01d07c4
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
from typing import Final

from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
Expand All @@ -11,6 +14,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from pydantic import NonNegativeInt, TypeAdapter
from servicelib.logging_utils import log_decorator
Expand Down Expand Up @@ -95,6 +99,41 @@ async def stop_dynamic_service(
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def restart_user_services(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
timeout_s: NonNegativeInt,
) -> None:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("restart_user_services"),
node_id=node_id,
timeout_s=timeout_s,
)
assert result is None # nosec


@log_decorator(_logger, level=logging.DEBUG)
async def retrieve_inputs(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout_s: NonNegativeInt,
) -> RetrieveDataOutEnveloped:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
_RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_inputs"),
node_id=node_id,
port_keys=port_keys,
timeout_s=timeout_s,
)
assert isinstance(result, RetrieveDataOutEnveloped) # nosec
return result


@log_decorator(_logger, level=logging.DEBUG)
async def update_projects_networks(
rabbitmq_rpc_client: RabbitMQRPCClient, *, project_id: ProjectID
Expand Down
9 changes: 5 additions & 4 deletions services/director/src/simcore_service_director/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ async def _get_node_details(


async def get_services_details(
app: FastAPI, user_id: str | None, study_id: str | None
app: FastAPI, user_id: str | None, project_id: str | None
) -> list[dict]:
app_settings = get_application_settings(app)
async with docker_utils.docker_client() as client: # pylint: disable=not-async-context-manager
Expand All @@ -1091,9 +1091,10 @@ async def get_services_details(
filters.append(
f"{_to_simcore_runtime_docker_label_key('user_id')}=" + user_id
)
if study_id:
if project_id:
filters.append(
f"{_to_simcore_runtime_docker_label_key('project_id')}=" + study_id
f"{_to_simcore_runtime_docker_label_key('project_id')}="
+ project_id
)
list_running_services = await client.services.list(
filters={"label": filters}
Expand All @@ -1104,7 +1105,7 @@ async def get_services_details(
for service in list_running_services
]
except aiodocker.DockerError as err:
msg = f"Error while accessing container for {user_id=}, {study_id=}"
msg = f"Error while accessing container for {user_id=}, {project_id=}"
raise GenericDockerError(err=msg) from err


Expand Down
1 change: 1 addition & 0 deletions services/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ services:
DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING}
DYNAMIC_SCHEDULER_TRACING: ${DYNAMIC_SCHEDULER_TRACING}
DYNAMIC_SCHEDULER_UI_STORAGE_SECRET: ${DYNAMIC_SCHEDULER_UI_STORAGE_SECRET}
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT}
TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}
TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT}
static-webserver:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from servicelib.rabbitmq import RPCRouter
from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import (
Expand Down Expand Up @@ -58,6 +62,20 @@ async def stop_dynamic_service(
)


@router.expose()
async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None:
await scheduler_interface.restart_user_services(app, node_id=node_id)


@router.expose()
async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
return await scheduler_interface.retrieve_inputs(
app, node_id=node_id, port_keys=port_keys
)


@router.expose()
async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
await scheduler_interface.update_projects_networks(app, project_id=project_id)
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,26 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):

DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: datetime.timedelta = Field(
default=datetime.timedelta(minutes=60),
validation_alias=AliasChoices(
"DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT",
"DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT",
),
description=(
"Time to wait before timing out when stopping a dynamic service. "
"Since services require data to be stopped, this operation is timed out after 1 hour"
),
)

DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field(
default=datetime.timedelta(minutes=60),
description=(
"When dynamic services upload and download data from storage, "
"sometimes very big payloads are involved. In order to handle "
"such payloads it is required to have long timeouts which "
"allow the service to finish the operation."
),
)

DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field(
default=False,
description=(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
from typing import Any

from fastapi import FastAPI, status
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from pydantic import TypeAdapter
from servicelib.fastapi.app_state import SingletonInAppStateMixin
Expand Down Expand Up @@ -75,7 +79,7 @@ async def stop_dynamic_service(
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
timeout: datetime.timedelta, # noqa: ASYNC109
timeout: datetime.timedelta # noqa: ASYNC109
) -> None:
try:
await self.thin_client.delete_dynamic_service(
Expand All @@ -100,6 +104,19 @@ async def stop_dynamic_service(

raise

async def retrieve_inputs(
self,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout: datetime.timedelta # noqa: ASYNC109
) -> RetrieveDataOutEnveloped:
response = await self.thin_client.dynamic_service_retrieve(
node_id=node_id, port_keys=port_keys, timeout=timeout
)
dict_response: dict[str, Any] = response.json()
return TypeAdapter(RetrieveDataOutEnveloped).validate_python(dict_response)

async def list_tracked_dynamic_services(
self, *, user_id: UserID | None = None, project_id: ProjectID | None = None
) -> list[DynamicServiceGet]:
Expand All @@ -108,6 +125,9 @@ async def list_tracked_dynamic_services(
)
return TypeAdapter(list[DynamicServiceGet]).validate_python(response.json())

async def restart_user_services(self, *, node_id: NodeID) -> None:
await self.thin_client.post_restart(node_id=node_id)

async def update_projects_networks(self, *, project_id: ProjectID) -> None:
await self.thin_client.patch_projects_networks(project_id=project_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_resources import ServiceResourcesDictHelpers
from models_library.services_types import ServicePortKey
from models_library.users import UserID
from servicelib.common_headers import (
X_DYNAMIC_SIDECAR_REQUEST_DNS,
Expand Down Expand Up @@ -91,7 +92,7 @@ async def delete_dynamic_service(
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
timeout: datetime.timedelta,
timeout: datetime.timedelta, # noqa: ASYNC109
) -> Response:
@retry_on_errors(total_retry_timeout_overwrite=timeout.total_seconds())
@expect_status(status.HTTP_204_NO_CONTENT)
Expand All @@ -112,6 +113,22 @@ async def _(

return await _(self)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def dynamic_service_retrieve(
self,
*,
node_id: NodeID,
port_keys: list[ServicePortKey],
timeout: datetime.timedelta, # noqa: ASYNC109
) -> Response:
post_data = {"port_keys": port_keys}
return await self.client.post(
f"/dynamic_services/{node_id}:retrieve",
content=json_dumps(post_data),
timeout=timeout.total_seconds(),
)

@retry_on_errors()
@expect_status(status.HTTP_200_OK)
async def get_dynamic_services(
Expand All @@ -125,6 +142,11 @@ async def get_dynamic_services(
params=as_dict_exclude_unset(user_id=user_id, project_id=project_id),
)

@retry_on_errors()
@expect_status(status.HTTP_204_NO_CONTENT)
async def post_restart(self, *, node_id: NodeID) -> Response:
return await self.client.post(f"/dynamic_services/{node_id}:restart")

@retry_on_errors()
@expect_status(status.HTTP_204_NO_CONTENT)
async def patch_projects_networks(self, *, project_id: ProjectID) -> Response:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_directorv2.dynamic_services import (
DynamicServiceGet,
RetrieveDataOutEnveloped,
)
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.users import UserID

from ..core.settings import ApplicationSettings
Expand Down Expand Up @@ -75,6 +79,30 @@ async def stop_dynamic_service(
await set_request_as_stopped(app, dynamic_service_stop)


async def restart_user_services(app: FastAPI, *, node_id: NodeID) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
await director_v2_client.restart_user_services(node_id=node_id)


async def retrieve_inputs(
app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey]
) -> RetrieveDataOutEnveloped:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
raise NotImplementedError

director_v2_client = DirectorV2Client.get_from_app_state(app)
return await director_v2_client.retrieve_inputs(
node_id=node_id,
port_keys=port_keys,
timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT,
)


async def update_projects_networks(app: FastAPI, *, project_id: ProjectID) -> None:
settings: ApplicationSettings = app.state.settings
if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER:
Expand Down
Loading

0 comments on commit 01d07c4

Please sign in to comment.