Skip to content

Commit

Permalink
added rpc base structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Neagu committed Oct 15, 2024
1 parent b25d613 commit 139bf18
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
from ._routing import get_main_router

__all__: tuple[str, ...] = ("get_main_router",)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from ._routing import get_main_router

__all__: tuple[str, ...] = ("get_main_router",)
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
from fastapi import Depends, FastAPI, Request
from fastapi.datastructures import State
from servicelib.rabbitmq import RabbitMQClient
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient

from ..core import rabbitmq
from ..core.settings import ApplicationSettings
from ..models.schemas.application_health import ApplicationHealth
from ..models.shared_store import SharedStore
from ..modules.inputs import InputsState
from ..modules.mounted_fs import MountedVolumes
from ..modules.outputs import OutputsContext, OutputsManager
from ..modules.prometheus_metrics import UserServicesMetrics
from ...core import rabbitmq
from ...core.settings import ApplicationSettings
from ...models.schemas.application_health import ApplicationHealth
from ...models.shared_store import SharedStore
from ...modules.inputs import InputsState
from ...modules.mounted_fs import MountedVolumes
from ...modules.outputs import OutputsContext, OutputsManager
from ...modules.prometheus_metrics import UserServicesMetrics


def get_application(request: Request) -> FastAPI:
Expand Down Expand Up @@ -84,3 +85,9 @@ def get_rabbitmq_client(
app: Annotated[FastAPI, Depends(get_application)]
) -> RabbitMQClient:
return rabbitmq.get_rabbitmq_client(app)


def get_rabbitmq_rpc_server(
app: Annotated[FastAPI, Depends(get_application)]
) -> RabbitMQRPCClient:
return rabbitmq.get_rabbitmq_rpc_server(app)
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

from fastapi import APIRouter, FastAPI

from .._meta import API_VTAG
from ..core.settings import ApplicationSettings
from ..._meta import API_VTAG
from ...core.settings import ApplicationSettings
from . import (
containers,
containers_extension,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
from pydantic import parse_raw_as
from servicelib.fastapi.requests_decorators import cancel_on_disconnect

from ..core.docker_utils import docker_client
from ..core.errors import (
from ...core.docker_utils import docker_client
from ...core.errors import (
ContainerExecCommandFailedError,
ContainerExecContainerNotFoundError,
ContainerExecTimeoutError,
)
from ..core.settings import ApplicationSettings
from ..core.validation import (
from ...core.settings import ApplicationSettings
from ...core.validation import (
ComposeSpecValidation,
parse_compose_spec,
validate_compose_spec,
)
from ..models.schemas.containers import ContainersComposeSpec
from ..models.shared_store import SharedStore
from ..modules.container_utils import run_command_in_container
from ..modules.mounted_fs import MountedVolumes
from ...models.schemas.containers import ContainersComposeSpec
from ...models.shared_store import SharedStore
from ...modules.container_utils import run_command_in_container
from ...modules.mounted_fs import MountedVolumes
from ._dependencies import (
get_container_restart_lock,
get_mounted_volumes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
from pydantic.main import BaseModel
from simcore_sdk.node_ports_v2.port_utils import is_file_type

from ..core.docker_utils import docker_client
from ..modules.inputs import disable_inputs_pulling, enable_inputs_pulling
from ..modules.mounted_fs import MountedVolumes
from ..modules.outputs import (
from ...core.docker_utils import docker_client
from ...modules.inputs import disable_inputs_pulling, enable_inputs_pulling
from ...modules.mounted_fs import MountedVolumes
from ...modules.outputs import (
OutputsContext,
disable_event_propagation,
enable_event_propagation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
)
from servicelib.fastapi.requests_decorators import cancel_on_disconnect

from ..core.settings import ApplicationSettings
from ..models.schemas.application_health import ApplicationHealth
from ..models.schemas.containers import ContainersCreate
from ..models.shared_store import SharedStore
from ..modules.inputs import InputsState
from ..modules.long_running_tasks import (
from ...core.settings import ApplicationSettings
from ...models.schemas.application_health import ApplicationHealth
from ...models.schemas.containers import ContainersCreate
from ...models.shared_store import SharedStore
from ...modules.inputs import InputsState
from ...modules.long_running_tasks import (
task_containers_restart,
task_create_service_containers,
task_ports_inputs_pull,
Expand All @@ -27,8 +27,8 @@
task_runs_docker_compose_down,
task_save_state,
)
from ..modules.mounted_fs import MountedVolumes
from ..modules.outputs import OutputsManager
from ...modules.mounted_fs import MountedVolumes
from ...modules.outputs import OutputsManager
from ._dependencies import (
get_application,
get_application_health,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fastapi import APIRouter, status

from ..core.reserved_space import remove_reserved_disk_space
from ...core.reserved_space import remove_reserved_disk_space

router = APIRouter()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
from fastapi import APIRouter, Depends, HTTPException, status
from models_library.errors import RABBITMQ_CLIENT_UNHEALTHY_MSG
from servicelib.rabbitmq import RabbitMQClient
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient

from ..models.schemas.application_health import ApplicationHealth
from ._dependencies import get_application_health, get_rabbitmq_client
from ...models.schemas.application_health import ApplicationHealth
from ._dependencies import (
get_application_health,
get_rabbitmq_client,
get_rabbitmq_rpc_server,
)

router = APIRouter()

Expand All @@ -20,13 +25,14 @@
async def health_endpoint(
application_health: Annotated[ApplicationHealth, Depends(get_application_health)],
rabbitmq_client: Annotated[RabbitMQClient, Depends(get_rabbitmq_client)],
rabbitmq_rpc_server: Annotated[RabbitMQRPCClient, Depends(get_rabbitmq_rpc_server)],
) -> ApplicationHealth:
if not application_health.is_healthy:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE, detail=application_health.dict()
)

if not rabbitmq_client.healthy:
if not rabbitmq_client.healthy or not rabbitmq_rpc_server.healthy:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE, detail=RABBITMQ_CLIENT_UNHEALTHY_MSG
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from fastapi import APIRouter, Depends, status
from fastapi.responses import PlainTextResponse

from ..modules.prometheus_metrics import UserServicesMetrics
from ...modules.prometheus_metrics import UserServicesMetrics
from ._dependencies import get_user_services_metrics

router = APIRouter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from models_library.sidecar_volumes import VolumeCategory, VolumeState, VolumeStatus
from pydantic import BaseModel

from ..models.shared_store import SharedStore
from ...models.shared_store import SharedStore
from ._dependencies import get_shared_store

router = APIRouter()
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from simcore_sdk.node_ports_common.exceptions import NodeNotFound

from .._meta import API_VERSION, API_VTAG, PROJECT_NAME, SUMMARY, __version__
from ..api import get_main_router
from ..api.rest import get_main_router
from ..models.schemas.application_health import ApplicationHealth
from ..models.shared_store import SharedStore, setup_shared_store
from ..modules.attribute_monitor import setup_attribute_monitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch, log_context
from servicelib.rabbitmq import RabbitMQClient, is_rabbitmq_responsive
from servicelib.rabbitmq._client_rpc import RabbitMQRPCClient
from settings_library.rabbit import RabbitSettings

from ..core.settings import ApplicationSettings
Expand Down Expand Up @@ -99,32 +100,49 @@ async def wait_for_rabbitmq_liveness(app: FastAPI) -> None:
)


@lru_cache(maxsize=2)
def _is_rabbitmq_initialized(app: FastAPI, state_client_name: str) -> bool:
return hasattr(app.state, state_client_name)


def _raise_if_not_initialized(app: FastAPI, state_client_name: str) -> None:
if not _is_rabbitmq_initialized(app, state_client_name):
msg = "RabbitMQ client is not available. Please check the configuration."
raise RuntimeError(msg)


def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient:
_raise_if_not_initialized(app, "rabbitmq_client")
return cast(RabbitMQClient, app.state.rabbitmq_client)


def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
_raise_if_not_initialized(app, "rabbitmq_rpc_server")
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server)


def setup_rabbitmq(app: FastAPI) -> None:
async def on_startup() -> None:
app_settings: ApplicationSettings = app.state.settings
assert app_settings.RABBIT_SETTINGS # nosec
settings = app_settings.RABBIT_SETTINGS

with log_context(_logger, logging.INFO, msg="Create RabbitMQClient"):
app.state.rabbitmq_client = RabbitMQClient(
client_name=f"dynamic-sidecar_{app_settings.DY_SIDECAR_NODE_ID}",
settings=settings,
)
with log_context(_logger, logging.INFO, msg="Create RabbitMQRPCClient"):
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
client_name=f"dynamic-sidecar_rpc_{app_settings.DY_SIDECAR_NODE_ID}",
settings=settings,
)

async def on_shutdown() -> None:
if app.state.rabbitmq_client:
await app.state.rabbitmq_client.close()
if app.state.rabbitmq_rpc_server:
await app.state.rabbitmq_rpc_server.close()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


@lru_cache(maxsize=1)
def _is_rabbitmq_initialized(app: FastAPI) -> bool:
return hasattr(app.state, "rabbitmq_client")


def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient:
if not _is_rabbitmq_initialized(app):
msg = "RabbitMQ client is not available. Please check the configuration."
raise RuntimeError(msg)
return cast(RabbitMQClient, app.state.rabbitmq_client)
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class DiskUsageMonitor:
_monitor_task: asyncio.Task | None = None
_last_usage: dict[Path, DiskUsage] = field(default_factory=dict)

_incoming_overwrite_usage: dict[str, DiskUsage] = field(default_factory=dict)

async def _publish_disk_usage(self, usage: dict[Path, DiskUsage]):
await publish_disk_usage(
self.app, user_id=self.user_id, node_id=self.node_id, usage=usage
Expand All @@ -47,6 +49,8 @@ async def _monitor(self) -> None:
*[get_usage(monitored_path) for monitored_path in self.monitored_paths]
)

# TODO: take into consideration the usage incoming from the API to overwrite
# TODO: also needs to be composed differently only the common parts need to be merged with a common name
usage: dict[Path, DiskUsage] = dict(
zip(self.monitored_paths, disk_usages, strict=True)
)
Expand All @@ -65,6 +69,15 @@ async def shutdown(self) -> None:
if self._monitor_task:
await stop_periodic_task(self._monitor_task)

async def set_disk_usage_for_path(
self, overwrite_usage: dict[str, DiskUsage]
) -> None:
"""
EFS service manages disk quotas since the underlying FS has no support for them.
Currently this service is
"""
self._incoming_overwrite_usage = overwrite_usage


def _get_monitored_paths(app: FastAPI) -> list[Path]:
mounted_volumes: MountedVolumes = app.state.mounted_volumes
Expand All @@ -74,6 +87,11 @@ def _get_monitored_paths(app: FastAPI) -> list[Path]:
]


def get_disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor:
disk_usage_monitor: DiskUsageMonitor = app.state.disk_usage_monitor
return disk_usage_monitor


def setup_disk_usage(app: FastAPI) -> None:
async def on_startup() -> None:
with log_context(_logger, logging.INFO, "setup disk monitor"):
Expand Down
10 changes: 10 additions & 0 deletions services/dynamic-sidecar/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,16 @@ def mock_core_rabbitmq(mocker: MockerFixture) -> dict[str, AsyncMock]:
return_value=None,
autospec=True,
),
"rpc._rpc_initialize": mocker.patch(
"simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQRPCClient._rpc_initialize",
return_value=None,
autospec=True,
),
"rpc.close": mocker.patch(
"simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQRPCClient.close",
return_value=None,
autospec=True,
),
}


Expand Down
2 changes: 1 addition & 1 deletion services/dynamic-sidecar/tests/unit/test_api_containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from servicelib.docker_constants import SUFFIX_EGRESS_PROXY_NAME
from servicelib.fastapi.long_running_tasks.client import TaskId
from simcore_service_dynamic_sidecar._meta import API_VTAG
from simcore_service_dynamic_sidecar.api.containers import _INACTIVE_FOR_LONG_TIME
from simcore_service_dynamic_sidecar.api.rest.containers import _INACTIVE_FOR_LONG_TIME
from simcore_service_dynamic_sidecar.core.application import AppState
from simcore_service_dynamic_sidecar.core.docker_compose_utils import (
docker_compose_create,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from servicelib.fastapi.long_running_tasks.client import setup as client_setup
from simcore_sdk.node_ports_common.exceptions import NodeNotFound
from simcore_service_dynamic_sidecar._meta import API_VTAG
from simcore_service_dynamic_sidecar.api import containers_long_running_tasks
from simcore_service_dynamic_sidecar.api.rest import containers_long_running_tasks
from simcore_service_dynamic_sidecar.core.validation import InvalidComposeSpecError
from simcore_service_dynamic_sidecar.models.schemas.containers import (
ContainersComposeSpec,
Expand Down

0 comments on commit 139bf18

Please sign in to comment.