diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py new file mode 100644 index 00000000000..5863b53b2bc --- /dev/null +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/ports.py @@ -0,0 +1,35 @@ +from enum import auto + +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey +from models_library.utils.enums import StrAutoEnum +from pydantic import BaseModel + + +class OutputStatus(StrAutoEnum): + UPLOAD_STARTED = auto() + UPLOAD_WAS_ABORTED = auto() + UPLOAD_FINISHED_SUCCESSFULLY = auto() + UPLOAD_FINISHED_WITH_ERRROR = auto() + + +class InputStatus(StrAutoEnum): + DOWNLOAD_STARTED = auto() + DOWNLOAD_WAS_ABORTED = auto() + DOWNLOAD_FINISHED_SUCCESSFULLY = auto() + DOWNLOAD_FINISHED_WITH_ERRROR = auto() + + +class _PortStatusCommon(BaseModel): + project_id: ProjectID + node_id: NodeID + port_key: ServicePortKey + + +class OutputPortStatus(_PortStatusCommon): + status: OutputStatus + + +class InputPortSatus(_PortStatusCommon): + status: InputStatus diff --git a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py index 054b0834bc4..93e34a1682b 100644 --- a/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py +++ b/packages/models-library/src/models_library/api_schemas_dynamic_sidecar/socketio.py @@ -1,3 +1,5 @@ from typing import Final SOCKET_IO_SERVICE_DISK_USAGE_EVENT: Final[str] = "serviceDiskUsage" +SOCKET_IO_STATE_OUTPUT_PORTS_EVENT: Final[str] = "stateOutputPorts" +SOCKET_IO_STATE_INPUT_PORTS_EVENT: Final[str] = "stateInputPorts" diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index 8c78e28a066..9da016b4cea 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -1,4 +1,6 @@ import logging +from abc import ABC, abstractmethod +from asyncio import CancelledError from collections.abc import Callable, Coroutine from pathlib import Path from typing import Any @@ -27,6 +29,20 @@ log = logging.getLogger(__name__) +class OutputsCallbacks(ABC): + @abstractmethod + async def aborted(self, key: ServicePortKey) -> None: + pass + + @abstractmethod + async def finished_succesfully(self, key: ServicePortKey) -> None: + pass + + @abstractmethod + async def finished_with_error(self, key: ServicePortKey) -> None: + pass + + class Nodeports(BaseModel): """ Represents a node in a project and all its input/output ports @@ -148,6 +164,7 @@ async def set_multiple( ], *, progress_bar: ProgressBarData, + outputs_callbacks: OutputsCallbacks | None, ) -> None: """ Sets the provided values to the respective input or output ports @@ -156,26 +173,44 @@ async def set_multiple( raises ValidationError """ + + async def _set_with_notifications( + port_key: ServicePortKey, + value: ItemConcreteValue | None, + set_kwargs: SetKWargs | None, + sub_progress: ProgressBarData, + ) -> None: + try: + # pylint: disable=protected-access + await self.internal_outputs[port_key]._set( # noqa: SLF001 + value, set_kwargs=set_kwargs, progress_bar=sub_progress + ) + if outputs_callbacks: + await outputs_callbacks.finished_succesfully(port_key) + except UnboundPortError: + # not available try inputs + # if this fails it will raise another exception + # pylint: disable=protected-access + await self.internal_inputs[port_key]._set( # noqa: SLF001 + value, set_kwargs=set_kwargs, progress_bar=sub_progress + ) + except CancelledError: + if outputs_callbacks: + await outputs_callbacks.aborted(port_key) + raise + except Exception: + if outputs_callbacks: + await outputs_callbacks.finished_with_error(port_key) + raise + tasks = [] async with progress_bar.sub_progress( steps=len(port_values.items()), description=IDStr("set multiple") ) as sub_progress: for port_key, (value, set_kwargs) in port_values.items(): - # pylint: disable=protected-access - try: - tasks.append( - self.internal_outputs[port_key]._set( - value, set_kwargs=set_kwargs, progress_bar=sub_progress - ) - ) - except UnboundPortError: - # not available try inputs - # if this fails it will raise another exception - tasks.append( - self.internal_inputs[port_key]._set( - value, set_kwargs=set_kwargs, progress_bar=sub_progress - ) - ) + tasks.append( + _set_with_notifications(port_key, value, set_kwargs, sub_progress) + ) results = await logged_gather(*tasks) await self.save_to_db_cb(self) diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py index a9016609d13..2da7011e9b0 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py @@ -13,6 +13,7 @@ from collections.abc import Awaitable, Callable, Iterable from pathlib import Path from typing import Any +from unittest.mock import AsyncMock from uuid import uuid4 import np_helpers @@ -28,13 +29,14 @@ SimcoreS3FileID, ) from models_library.services_types import ServicePortKey +from pytest_mock import MockerFixture from servicelib.progress_bar import ProgressBarData from settings_library.r_clone import RCloneSettings from simcore_sdk import node_ports_v2 from simcore_sdk.node_ports_common.exceptions import UnboundPortError from simcore_sdk.node_ports_v2 import exceptions from simcore_sdk.node_ports_v2.links import ItemConcreteValue, PortLink -from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports +from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports, OutputsCallbacks from simcore_sdk.node_ports_v2.port import Port from utils_port_v2 import CONSTANT_UUID @@ -749,6 +751,34 @@ async def _upload_create_task(item_key: str) -> None: ) +class _Callbacks(OutputsCallbacks): + async def aborted(self, key: ServicePortKey) -> None: + pass + + async def finished_succesfully(self, key: ServicePortKey) -> None: + pass + + async def finished_with_error(self, key: ServicePortKey) -> None: + pass + + +@pytest.fixture +async def output_callbacks() -> _Callbacks: + return _Callbacks() + + +@pytest.fixture +async def spy_outputs_callbaks( + mocker: MockerFixture, output_callbacks: _Callbacks +) -> dict[str, AsyncMock]: + return { + "aborted": mocker.spy(output_callbacks, "aborted"), + "finished_succesfully": mocker.spy(output_callbacks, "finished_succesfully"), + "finished_with_error": mocker.spy(output_callbacks, "finished_with_error"), + } + + +@pytest.mark.parametrize("use_output_callbacks", [True, False]) async def test_batch_update_inputs_outputs( user_id: int, project_id: str, @@ -757,7 +787,12 @@ async def test_batch_update_inputs_outputs( port_count: int, option_r_clone_settings: RCloneSettings | None, faker: Faker, + output_callbacks: _Callbacks, + spy_outputs_callbaks: dict[str, AsyncMock], + use_output_callbacks: bool, ) -> None: + callbacks = output_callbacks if use_output_callbacks else None + outputs = [(f"value_out_{i}", "integer", None) for i in range(port_count)] inputs = [(f"value_in_{i}", "integer", None) for i in range(port_count)] config_dict, _, _ = create_special_configuration(inputs=inputs, outputs=outputs) @@ -771,12 +806,14 @@ async def test_batch_update_inputs_outputs( await check_config_valid(PORTS, config_dict) async with ProgressBarData(num_steps=2, description=faker.pystr()) as progress_bar: + port_values = (await PORTS.outputs).values() await PORTS.set_multiple( - { - ServicePortKey(port.key): (k, None) - for k, port in enumerate((await PORTS.outputs).values()) - }, + {ServicePortKey(port.key): (k, None) for k, port in enumerate(port_values)}, progress_bar=progress_bar, + outputs_callbacks=callbacks, + ) + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == ( + len(port_values) if use_output_callbacks else 0 ) # pylint: disable=protected-access assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 @@ -786,6 +823,11 @@ async def test_batch_update_inputs_outputs( for k, port in enumerate((await PORTS.inputs).values(), start=1000) }, progress_bar=progress_bar, + outputs_callbacks=callbacks, + ) + # inputs do not trigger callbacks + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == ( + len(port_values) if use_output_callbacks else 0 ) assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001 @@ -807,4 +849,11 @@ async def test_batch_update_inputs_outputs( await PORTS.set_multiple( {ServicePortKey("missing_key_in_both"): (123132, None)}, progress_bar=progress_bar, + outputs_callbacks=callbacks, ) + + assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == ( + len(port_values) if use_output_callbacks else 0 + ) + assert len(spy_outputs_callbaks["aborted"].call_args_list) == 0 + assert len(spy_outputs_callbaks["finished_with_error"].call_args_list) == 0 diff --git a/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py b/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py index 91609476b9c..f8d09836213 100644 --- a/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py +++ b/packages/simcore-sdk/tests/unit/test_node_ports_v2_nodeports_v2.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any, Callable +from unittest.mock import AsyncMock import pytest from faker import Faker @@ -138,6 +139,7 @@ async def mock_node_port_creator_cb(*args, **kwargs): + list(original_outputs.values()) }, progress_bar=progress_bar, + outputs_callbacks=AsyncMock(), ) assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 diff --git a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py index 646cb788ad7..d590985680d 100644 --- a/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py +++ b/services/director-v2/tests/integration/02/test_mixed_dynamic_sidecar_and_legacy_project.py @@ -229,6 +229,7 @@ async def _mocked_context_manger(*args, **kwargs) -> AsyncIterator[None]: ) +@pytest.mark.flaky(max_runs=3) async def test_legacy_and_dynamic_sidecar_run( initialized_app: FastAPI, wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]], diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py index ae04a620c8a..52b0e2e7ad6 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py @@ -209,6 +209,7 @@ async def ports_inputs_pull_task( request: Request, tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)], app: Annotated[FastAPI, Depends(get_application)], + settings: Annotated[ApplicationSettings, Depends(get_settings)], mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)], inputs_state: Annotated[InputsState, Depends(get_inputs_state)], port_keys: list[str] | None = None, @@ -223,6 +224,7 @@ async def ports_inputs_pull_task( port_keys=port_keys, mounted_volumes=mounted_volumes, app=app, + settings=settings, inputs_pulling_enabled=inputs_state.inputs_pulling_enabled, ) except TaskAlreadyRunningError as e: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py index f5910ffbffe..20029cac7fc 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/application.py @@ -19,6 +19,7 @@ from ..modules.attribute_monitor import setup_attribute_monitor from ..modules.inputs import setup_inputs from ..modules.mounted_fs import MountedVolumes, setup_mounted_fs +from ..modules.notifications import setup_notifications from ..modules.outputs import setup_outputs from ..modules.prometheus_metrics import setup_prometheus_metrics from ..modules.resource_tracking import setup_resource_tracking @@ -172,6 +173,7 @@ def create_app(): setup_rabbitmq(app) setup_background_log_fetcher(app) setup_resource_tracking(app) + setup_notifications(app) setup_system_monitor(app) setup_mounted_fs(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index a8277415b06..0134d481f78 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -52,6 +52,7 @@ from ..models.shared_store import SharedStore from ..modules import nodeports, user_services_preferences from ..modules.mounted_fs import MountedVolumes +from ..modules.notifications._notifications_ports import PortNotifier from ..modules.outputs import OutputsManager, event_propagation_disabled from .long_running_tasksutils import run_before_shutdown_actions from .resource_tracking import send_service_started, send_service_stopped @@ -472,6 +473,7 @@ async def task_ports_inputs_pull( port_keys: list[str] | None, mounted_volumes: MountedVolumes, app: FastAPI, + settings: ApplicationSettings, *, inputs_pulling_enabled: bool, ) -> int: @@ -505,6 +507,12 @@ async def task_ports_inputs_pull( post_sidecar_log_message, app, log_level=logging.INFO ), progress_bar=root_progress, + port_notifier=PortNotifier( + app, + settings.DY_SIDECAR_USER_ID, + settings.DY_SIDECAR_PROJECT_ID, + settings.DY_SIDECAR_NODE_ID, + ), ) await post_sidecar_log_message( app, "Finished pulling inputs", log_level=logging.INFO @@ -541,6 +549,7 @@ async def task_ports_outputs_pull( post_sidecar_log_message, app, log_level=logging.INFO ), progress_bar=root_progress, + port_notifier=None, ) await post_sidecar_log_message( app, "Finished pulling outputs", log_level=logging.INFO diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index 2213dd1d4ac..0ad00f2c18d 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -4,6 +4,7 @@ import shutil import sys import time +from asyncio import CancelledError from collections import deque from collections.abc import Coroutine from contextlib import AsyncExitStack @@ -24,16 +25,17 @@ from servicelib.file_utils import remove_directory from servicelib.logging_utils import log_context from servicelib.progress_bar import ProgressBarData -from servicelib.utils import logged_gather +from servicelib.utils import limited_gather from simcore_sdk import node_ports_v2 from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB from simcore_sdk.node_ports_v2 import Port from simcore_sdk.node_ports_v2.links import ItemConcreteValue -from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports +from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports, OutputsCallbacks from simcore_sdk.node_ports_v2.port import SetKWargs from simcore_sdk.node_ports_v2.port_utils import is_file_type from ..core.settings import ApplicationSettings, get_settings +from ..modules.notifications import PortNotifier class PortTypeName(str, Enum): @@ -70,13 +72,27 @@ def _get_size_of_value(value: tuple[ItemConcreteValue | None, SetKWargs | None]) ) -# NOTE: outputs_manager guarantees that no parallel calls -# to this function occur -async def upload_outputs( +class OutputCallbacksWrapper(OutputsCallbacks): + def __init__(self, port_notifier: PortNotifier) -> None: + self.port_notifier = port_notifier + + async def aborted(self, key: ServicePortKey) -> None: + await self.port_notifier.send_output_port_upload_was_aborted(key) + + async def finished_succesfully(self, key: ServicePortKey) -> None: + await self.port_notifier.send_output_port_upload_finished_successfully(key) + + async def finished_with_error(self, key: ServicePortKey) -> None: + await self.port_notifier.send_output_port_upload_finished_with_error(key) + + +# NOTE: outputs_manager guarantees that no parallel calls to this function occur +async def upload_outputs( # pylint:disable=too-many-statements # noqa: PLR0915, C901 outputs_path: Path, port_keys: list[str], io_log_redirect_cb: LogRedirectCB | None, progress_bar: ProgressBarData, + port_notifier: PortNotifier, ) -> None: # pylint: disable=too-many-branches logger.debug("uploading data to simcore...") @@ -97,12 +113,17 @@ async def upload_outputs( ServicePortKey, tuple[ItemConcreteValue | None, SetKWargs | None] ] = {} archiving_tasks: deque[Coroutine[None, None, None]] = deque() - ports_to_set = [ + ports_to_set: list[Port] = [ port_value for port_value in (await PORTS.outputs).values() if (not port_keys) or (port_value.key in port_keys) ] + await limited_gather( + *(port_notifier.send_output_port_upload_sarted(p.key) for p in ports_to_set), + limit=4, + ) + async with AsyncExitStack() as stack: sub_progress = await stack.enter_async_context( progress_bar.sub_progress( @@ -147,13 +168,34 @@ async def upload_outputs( # when having multiple directories it is important to # run the compression in parallel to guarantee better performance + async def _archive_dir_notified( + dir_to_compress: Path, destination: Path, port_key: ServicePortKey + ) -> None: + # Errors and cancellation can also be triggered from archving as well + try: + await archive_dir( + dir_to_compress=dir_to_compress, + destination=destination, + compress=False, + store_relative_path=True, + progress_bar=sub_progress, + ) + except CancelledError: + await port_notifier.send_output_port_upload_was_aborted( + port_key + ) + raise + except Exception: + await port_notifier.send_output_port_upload_finished_with_error( + port_key + ) + raise + archiving_tasks.append( - archive_dir( + _archive_dir_notified( dir_to_compress=src_folder, destination=tmp_file, - compress=False, - store_relative_path=True, - progress_bar=sub_progress, + port_key=port.key, ) ) ports_values[port.key] = ( @@ -176,9 +218,13 @@ async def upload_outputs( logger.debug("No file %s to fetch port values from", data_file) if archiving_tasks: - await logged_gather(*archiving_tasks) + await limited_gather(*archiving_tasks, limit=4) - await PORTS.set_multiple(ports_values, progress_bar=sub_progress) + await PORTS.set_multiple( + ports_values, + progress_bar=sub_progress, + outputs_callbacks=OutputCallbacksWrapper(port_notifier), + ) elapsed_time = time.perf_counter() - start_time total_bytes = sum(_get_size_of_value(x) for x in ports_values.values()) @@ -264,6 +310,7 @@ async def download_target_ports( port_keys: list[str], io_log_redirect_cb: LogRedirectCB, progress_bar: ProgressBarData, + port_notifier: PortNotifier | None, ) -> ByteSize: logger.debug("retrieving data from simcore...") start_time = time.perf_counter() @@ -279,22 +326,46 @@ async def download_target_ports( ) # let's gather all the data - ports_to_get = [ + ports_to_get: list[Port] = [ port_value for port_value in (await getattr(PORTS, port_type_name.value)).values() if (not port_keys) or (port_value.key in port_keys) ] + + async def _get_date_from_port_notified( + port: Port, progress_bar: ProgressBarData + ) -> tuple[Port, ItemConcreteValue | None, ByteSize]: + assert port_notifier is not None + await port_notifier.send_input_port_download_started(port.key) + try: + result = await _get_data_from_port( + port, target_dir=target_dir, progress_bar=progress_bar + ) + await port_notifier.send_input_port_download_finished_succesfully(port.key) + return result + + except CancelledError: + await port_notifier.send_input_port_download_was_aborted(port.key) + raise + except Exception: + await port_notifier.send_input_port_download_finished_with_error(port.key) + raise + async with progress_bar.sub_progress( steps=len(ports_to_get), description=IDStr("downloading") ) as sub_progress: - results = await logged_gather( + results = await limited_gather( *[ - _get_data_from_port( - port, target_dir=target_dir, progress_bar=sub_progress + ( + _get_data_from_port( + port, target_dir=target_dir, progress_bar=sub_progress + ) + if port_type_name == PortTypeName.OUTPUTS + else _get_date_from_port_notified(port, progress_bar=sub_progress) ) for port in ports_to_get ], - max_concurrency=2, + limit=2, ) # parse results data = { diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py new file mode 100644 index 00000000000..18254b1d23c --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/__init__.py @@ -0,0 +1,9 @@ +from ._notifications_ports import PortNotifier +from ._notifications_system_monitor import publish_disk_usage +from ._setup import setup_notifications + +__all__: tuple[str, ...] = ( + "PortNotifier", + "publish_disk_usage", + "setup_notifications", +) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py new file mode 100644 index 00000000000..ae48f19a973 --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_ports.py @@ -0,0 +1,78 @@ +from dataclasses import dataclass + +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.ports import InputStatus, OutputStatus +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey +from models_library.users import UserID + +from ._notifier import Notifier + + +@dataclass +class PortNotifier: + app: FastAPI + user_id: UserID + project_id: ProjectID + node_id: NodeID + + async def _send_output_port_status( + self, port_key: ServicePortKey, status: OutputStatus + ) -> None: + notifier: Notifier = Notifier.get_from_app_state(self.app) + await notifier.notify_output_port_status( + self.user_id, self.project_id, self.node_id, port_key, status + ) + + async def _send_input_port_status( + self, port_key: ServicePortKey, status: InputStatus + ) -> None: + notifier: Notifier = Notifier.get_from_app_state(self.app) + await notifier.notify_input_port_status( + self.user_id, self.project_id, self.node_id, port_key, status + ) + + async def send_output_port_upload_sarted(self, port_key: ServicePortKey) -> None: + await self._send_output_port_status(port_key, OutputStatus.UPLOAD_STARTED) + + async def send_output_port_upload_was_aborted( + self, port_key: ServicePortKey + ) -> None: + await self._send_output_port_status(port_key, OutputStatus.UPLOAD_WAS_ABORTED) + + async def send_output_port_upload_finished_successfully( + self, port_key: ServicePortKey + ) -> None: + await self._send_output_port_status( + port_key, OutputStatus.UPLOAD_FINISHED_SUCCESSFULLY + ) + + async def send_output_port_upload_finished_with_error( + self, port_key: ServicePortKey + ) -> None: + await self._send_output_port_status( + port_key, OutputStatus.UPLOAD_FINISHED_WITH_ERRROR + ) + + async def send_input_port_download_started(self, port_key: ServicePortKey) -> None: + await self._send_input_port_status(port_key, InputStatus.DOWNLOAD_STARTED) + + async def send_input_port_download_was_aborted( + self, port_key: ServicePortKey + ) -> None: + await self._send_input_port_status(port_key, InputStatus.DOWNLOAD_WAS_ABORTED) + + async def send_input_port_download_finished_succesfully( + self, port_key: ServicePortKey + ) -> None: + await self._send_input_port_status( + port_key, InputStatus.DOWNLOAD_FINISHED_SUCCESSFULLY + ) + + async def send_input_port_download_finished_with_error( + self, port_key: ServicePortKey + ) -> None: + await self._send_input_port_status( + port_key, InputStatus.DOWNLOAD_FINISHED_WITH_ERRROR + ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py new file mode 100644 index 00000000000..840c47d729e --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifications_system_monitor.py @@ -0,0 +1,17 @@ +from pathlib import Path + +from fastapi import FastAPI +from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage +from models_library.projects_nodes_io import NodeID +from models_library.users import UserID + +from ._notifier import Notifier + + +async def publish_disk_usage( + app: FastAPI, *, user_id: UserID, node_id: NodeID, usage: dict[Path, DiskUsage] +) -> None: + notifier: Notifier = Notifier.get_from_app_state(app) + await notifier.notify_service_disk_usage( + user_id=user_id, node_id=node_id, usage=usage + ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py similarity index 52% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py index 9f97a889bac..0d61e1b388b 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_notifier.py @@ -4,15 +4,25 @@ import socketio # type: ignore[import-untyped] from fastapi import FastAPI from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_dynamic_sidecar.ports import ( + InputPortSatus, + InputStatus, + OutputPortStatus, + OutputStatus, +) from models_library.api_schemas_dynamic_sidecar.socketio import ( SOCKET_IO_SERVICE_DISK_USAGE_EVENT, + SOCKET_IO_STATE_INPUT_PORTS_EVENT, + SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, ) from models_library.api_schemas_dynamic_sidecar.telemetry import ( DiskUsage, ServiceDiskUsage, ) from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from models_library.users import UserID from servicelib.fastapi.app_state import SingletonInAppStateMixin @@ -32,14 +42,47 @@ async def notify_service_disk_usage( room=SocketIORoomStr.from_user_id(user_id), ) + async def notify_output_port_status( + self, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, + port_key: ServicePortKey, + output_status: OutputStatus, + ) -> None: + await self._sio_manager.emit( + SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, + data=jsonable_encoder( + OutputPortStatus( + project_id=project_id, + node_id=node_id, + port_key=port_key, + status=output_status, + ) + ), + room=SocketIORoomStr.from_user_id(user_id), + ) -async def publish_disk_usage( - app: FastAPI, *, user_id: UserID, node_id: NodeID, usage: dict[Path, DiskUsage] -) -> None: - notifier: Notifier = Notifier.get_from_app_state(app) - await notifier.notify_service_disk_usage( - user_id=user_id, node_id=node_id, usage=usage - ) + async def notify_input_port_status( + self, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, + port_key: ServicePortKey, + input_status: InputStatus, + ) -> None: + await self._sio_manager.emit( + SOCKET_IO_STATE_INPUT_PORTS_EVENT, + data=jsonable_encoder( + InputPortSatus( + project_id=project_id, + node_id=node_id, + port_key=port_key, + status=input_status, + ) + ), + room=SocketIORoomStr.from_user_id(user_id), + ) def setup_notifier(app: FastAPI): diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_setup.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_setup.py new file mode 100644 index 00000000000..6de0fae307f --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_setup.py @@ -0,0 +1,15 @@ +import logging + +from fastapi import FastAPI +from servicelib.logging_utils import log_context + +from ..notifications._notifier import setup_notifier +from ..notifications._socketio import setup_socketio + +_logger = logging.getLogger(__name__) + + +def setup_notifications(app: FastAPI) -> None: + with log_context(_logger, logging.INFO, "setup notifications"): + setup_socketio(app) + setup_notifier(app) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_socketio.py similarity index 100% rename from services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py rename to services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/notifications/_socketio.py diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py index 307f8b3d933..d4a8ac8d07a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py @@ -18,6 +18,7 @@ from ...core.rabbitmq import post_log_message, post_progress_message from ...core.settings import ApplicationSettings +from ...modules.notifications._notifications_ports import PortNotifier from ..nodeports import upload_outputs from ._context import OutputsContext @@ -100,6 +101,7 @@ class OutputsManager: # pylint: disable=too-many-instance-attributes def __init__( self, outputs_context: OutputsContext, + port_notifier: PortNotifier, io_log_redirect_cb: LogRedirectCB | None, progress_cb: progress_bar.AsyncReportCB | None, *, @@ -108,6 +110,7 @@ def __init__( task_monitor_interval_s: PositiveFloat = 1.0, ): self.outputs_context = outputs_context + self.port_notifier = port_notifier self.io_log_redirect_cb = io_log_redirect_cb self.upload_upon_api_request = upload_upon_api_request self.task_cancellation_timeout_s = task_cancellation_timeout_s @@ -138,6 +141,7 @@ async def _upload_ports() -> None: port_keys=port_keys, io_log_redirect_cb=self.io_log_redirect_cb, progress_bar=root_progress, + port_notifier=self.port_notifier, ) task_name = f"outputs_manager_port_keys-{'_'.join(port_keys)}" @@ -271,6 +275,12 @@ async def on_startup() -> None: progress_cb=partial( post_progress_message, app, ProgressType.SERVICE_OUTPUTS_PUSHING ), + port_notifier=PortNotifier( + app, + settings.DY_SIDECAR_USER_ID, + settings.DY_SIDECAR_PROJECT_ID, + settings.DY_SIDECAR_NODE_ID, + ), ) await outputs_manager.start() diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py index 1ecc04fdaea..90b06450e6f 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_disk_usage.py @@ -15,7 +15,7 @@ from ...core.settings import ApplicationSettings from ..mounted_fs import MountedVolumes -from ._notifier import publish_disk_usage +from ..notifications import publish_disk_usage _logger = logging.getLogger(__name__) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py index e460f7a9ee3..aa0d36a72b9 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_setup.py @@ -5,8 +5,6 @@ from ...core.settings import SystemMonitorSettings from ._disk_usage import setup_disk_usage -from ._notifier import setup_notifier -from ._socketio import setup_socketio _logger = logging.getLogger(__name__) @@ -19,6 +17,4 @@ def setup_system_monitor(app: FastAPI) -> None: _logger.warning("system monitor disabled") return - setup_socketio(app) # required by notifier - setup_notifier(app) setup_disk_usage(app) diff --git a/services/dynamic-sidecar/tests/unit/conftest.py b/services/dynamic-sidecar/tests/unit/conftest.py index b6e590f71eb..ee2c106bb69 100644 --- a/services/dynamic-sidecar/tests/unit/conftest.py +++ b/services/dynamic-sidecar/tests/unit/conftest.py @@ -17,6 +17,10 @@ docker_compose_down, ) from simcore_service_dynamic_sidecar.core.docker_utils import docker_client +from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from tenacity import retry from tenacity.after import after_log from tenacity.stop import stop_after_delay @@ -142,3 +146,14 @@ def mock_rabbitmq_envs( }, ) return mock_environment + + +@pytest.fixture +def port_notifier(app: FastAPI) -> PortNotifier: + settings: ApplicationSettings = app.state.settings + return PortNotifier( + app, + settings.DY_SIDECAR_USER_ID, + settings.DY_SIDECAR_PROJECT_ID, + settings.DY_SIDECAR_NODE_ID, + ) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py new file mode 100644 index 00000000000..654d2bb1619 --- /dev/null +++ b/services/dynamic-sidecar/tests/unit/test_modules_notifier.py @@ -0,0 +1,400 @@ +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name + +from collections.abc import AsyncIterable, Callable +from contextlib import AsyncExitStack, _AsyncGeneratorContextManager +from pathlib import Path +from typing import Final +from unittest.mock import AsyncMock + +import pytest +import socketio +from asgi_lifespan import LifespanManager +from fastapi import FastAPI +from fastapi.encoders import jsonable_encoder +from models_library.api_schemas_dynamic_sidecar.ports import ( + InputPortSatus, + InputStatus, + OutputPortStatus, + OutputStatus, +) +from models_library.api_schemas_dynamic_sidecar.socketio import ( + SOCKET_IO_SERVICE_DISK_USAGE_EVENT, + SOCKET_IO_STATE_INPUT_PORTS_EVENT, + SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, +) +from models_library.api_schemas_dynamic_sidecar.telemetry import ( + DiskUsage, + ServiceDiskUsage, +) +from models_library.api_schemas_webserver.socketio import SocketIORoomStr +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey +from models_library.users import UserID +from pydantic import ByteSize, NonNegativeInt, parse_obj_as +from pytest_mock import MockerFixture +from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict +from servicelib.utils import logged_gather +from settings_library.rabbit import RabbitSettings +from simcore_service_dynamic_sidecar.core.application import create_app +from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings +from simcore_service_dynamic_sidecar.modules.notifications import ( + PortNotifier, + publish_disk_usage, +) +from simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage import ( + DiskUsageMonitor, +) +from socketio import AsyncServer +from tenacity import AsyncRetrying +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed + +pytest_simcore_core_services_selection = [ + "rabbit", +] + +_NUMBER_OF_CLIENTS: Final[NonNegativeInt] = 10 + + +@pytest.fixture +def mock_environment( + monkeypatch: pytest.MonkeyPatch, + rabbit_service: RabbitSettings, + mock_environment: EnvVarsDict, +) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, + { + "DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE": "true", + "RABBIT_HOST": rabbit_service.RABBIT_HOST, + "RABBIT_PASSWORD": rabbit_service.RABBIT_PASSWORD.get_secret_value(), + "RABBIT_PORT": f"{rabbit_service.RABBIT_PORT}", + "RABBIT_SECURE": f"{rabbit_service.RABBIT_SECURE}", + "RABBIT_USER": rabbit_service.RABBIT_USER, + }, + ) + + +@pytest.fixture +async def app( + mock_environment: EnvVarsDict, + mock_registry_service: AsyncMock, + mock_storage_check: None, + mock_postgres_check: None, + mocker: MockerFixture, +) -> AsyncIterable[FastAPI]: + mocker.patch( + "simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage._get_monitored_paths", + return_value=[], + ) + + app: FastAPI = create_app() + async with LifespanManager(app): + yield app + + +@pytest.fixture +async def disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor: + return app.state.disk_usage_monitor + + +@pytest.fixture +async def socketio_server( + app: FastAPI, + socketio_server_factory: Callable[ + [RabbitSettings], _AsyncGeneratorContextManager[AsyncServer] + ], +) -> AsyncIterable[AsyncServer]: + # Same configuration as simcore_service_webserver/socketio/server.py + settings: ApplicationSettings = app.state.settings + assert settings.RABBIT_SETTINGS + + async with socketio_server_factory(settings.RABBIT_SETTINGS) as server: + yield server + + +@pytest.fixture +def room_name(user_id: UserID) -> SocketIORoomStr: + return SocketIORoomStr.from_user_id(user_id) + + +async def _assert_call_count(mock: AsyncMock, *, call_count: int) -> None: + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), stop=stop_after_delay(5), reraise=True + ): + with attempt: + assert mock.call_count == call_count + + +def _get_mocked_disk_usage(byte_size_str: str) -> DiskUsage: + return DiskUsage( + total=ByteSize(0), + used=ByteSize(0), + free=ByteSize.validate(byte_size_str), + used_percent=0, + ) + + +def _get_on_service_disk_usage_spy( + socketio_client: socketio.AsyncClient, +) -> AsyncMock: + # emulates front-end receiving message + + async def on_service_status(data): + assert parse_obj_as(ServiceDiskUsage, data) is not None + + on_event_spy = AsyncMock(wraps=on_service_status) + socketio_client.on(SOCKET_IO_SERVICE_DISK_USAGE_EVENT, on_event_spy) + + return on_event_spy + + +@pytest.mark.parametrize( + "usage", + [ + pytest.param({}, id="empty"), + pytest.param({Path("/"): _get_mocked_disk_usage("1kb")}, id="one_entry"), + pytest.param( + { + Path("/"): _get_mocked_disk_usage("1kb"), + Path("/tmp"): _get_mocked_disk_usage("2kb"), # noqa: S108 + }, + id="two_entries", + ), + ], +) +async def test_notifier_publish_disk_usage( + disk_usage_monitor: DiskUsageMonitor, + socketio_server_events: dict[str, AsyncMock], + app: FastAPI, + user_id: UserID, + usage: dict[Path, DiskUsage], + node_id: NodeID, + socketio_client_factory: Callable[ + [], _AsyncGeneratorContextManager[socketio.AsyncClient] + ], +): + # web server spy events + server_connect = socketio_server_events["connect"] + server_disconnect = socketio_server_events["disconnect"] + server_on_check = socketio_server_events["on_check"] + + async with AsyncExitStack() as socketio_frontend_clients: + frontend_clients: list[socketio.AsyncClient] = await logged_gather( + *[ + socketio_frontend_clients.enter_async_context(socketio_client_factory()) + for _ in range(_NUMBER_OF_CLIENTS) + ] + ) + await _assert_call_count(server_connect, call_count=_NUMBER_OF_CLIENTS) + + # client emits and check it was received + await logged_gather( + *[ + frontend_client.emit("check", data="an_event") + for frontend_client in frontend_clients + ] + ) + await _assert_call_count(server_on_check, call_count=_NUMBER_OF_CLIENTS) + + # attach spy to client + on_service_disk_usage_events: list[AsyncMock] = [ + _get_on_service_disk_usage_spy(c) for c in frontend_clients + ] + + # server publishes a message + await publish_disk_usage(app, user_id=user_id, node_id=node_id, usage=usage) + + # check that all clients received it + for on_service_disk_usage_event in on_service_disk_usage_events: + await _assert_call_count(on_service_disk_usage_event, call_count=1) + on_service_disk_usage_event.assert_awaited_once_with( + jsonable_encoder(ServiceDiskUsage(node_id=node_id, usage=usage)) + ) + + await _assert_call_count(server_disconnect, call_count=_NUMBER_OF_CLIENTS) + + +@pytest.fixture +def port_key() -> ServicePortKey: + return ServicePortKey("test_port") + + +def _get_on_input_port_spy( + socketio_client: socketio.AsyncClient, +) -> AsyncMock: + # emulates front-end receiving message + + async def on_service_status(data): + assert parse_obj_as(ServiceDiskUsage, data) is not None + + on_event_spy = AsyncMock(wraps=on_service_status) + socketio_client.on(SOCKET_IO_STATE_INPUT_PORTS_EVENT, on_event_spy) + + return on_event_spy + + +@pytest.mark.parametrize("input_status", InputStatus) +async def test_notifier_send_input_port_status( + socketio_server_events: dict[str, AsyncMock], + app: FastAPI, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, + port_key: ServicePortKey, + socketio_client_factory: Callable[ + [], _AsyncGeneratorContextManager[socketio.AsyncClient] + ], + input_status: InputStatus, +): + # web server spy events + server_connect = socketio_server_events["connect"] + server_disconnect = socketio_server_events["disconnect"] + server_on_check = socketio_server_events["on_check"] + + async with AsyncExitStack() as socketio_frontend_clients: + frontend_clients: list[socketio.AsyncClient] = await logged_gather( + *[ + socketio_frontend_clients.enter_async_context(socketio_client_factory()) + for _ in range(_NUMBER_OF_CLIENTS) + ] + ) + await _assert_call_count(server_connect, call_count=_NUMBER_OF_CLIENTS) + + # client emits and check it was received + await logged_gather( + *[ + frontend_client.emit("check", data="an_event") + for frontend_client in frontend_clients + ] + ) + await _assert_call_count(server_on_check, call_count=_NUMBER_OF_CLIENTS) + + # attach spy to client + on_input_port_events: list[AsyncMock] = [ + _get_on_input_port_spy(c) for c in frontend_clients + ] + + port_notifier = PortNotifier(app, user_id, project_id, node_id) + + # server publishes a message + match input_status: + case InputStatus.DOWNLOAD_STARTED: + await port_notifier.send_input_port_download_started(port_key) + case InputStatus.DOWNLOAD_WAS_ABORTED: + await port_notifier.send_input_port_download_was_aborted(port_key) + case InputStatus.DOWNLOAD_FINISHED_SUCCESSFULLY: + await port_notifier.send_input_port_download_finished_succesfully( + port_key + ) + case InputStatus.DOWNLOAD_FINISHED_WITH_ERRROR: + await port_notifier.send_input_port_download_finished_with_error( + port_key + ) + + # check that all clients received it + for on_input_port_event in on_input_port_events: + await _assert_call_count(on_input_port_event, call_count=1) + on_input_port_event.assert_awaited_once_with( + jsonable_encoder( + InputPortSatus( + project_id=project_id, + node_id=node_id, + port_key=port_key, + status=input_status, + ) + ) + ) + + await _assert_call_count(server_disconnect, call_count=_NUMBER_OF_CLIENTS) + + +def _get_on_output_port_spy( + socketio_client: socketio.AsyncClient, +) -> AsyncMock: + # emulates front-end receiving message + + async def on_service_status(data): + assert parse_obj_as(ServiceDiskUsage, data) is not None + + on_event_spy = AsyncMock(wraps=on_service_status) + socketio_client.on(SOCKET_IO_STATE_OUTPUT_PORTS_EVENT, on_event_spy) + + return on_event_spy + + +@pytest.mark.parametrize("output_status", OutputStatus) +async def test_notifier_send_output_port_status( + socketio_server_events: dict[str, AsyncMock], + app: FastAPI, + user_id: UserID, + project_id: ProjectID, + node_id: NodeID, + port_key: ServicePortKey, + socketio_client_factory: Callable[ + [], _AsyncGeneratorContextManager[socketio.AsyncClient] + ], + output_status: OutputStatus, +): + # web server spy events + server_connect = socketio_server_events["connect"] + server_disconnect = socketio_server_events["disconnect"] + server_on_check = socketio_server_events["on_check"] + + async with AsyncExitStack() as socketio_frontend_clients: + frontend_clients: list[socketio.AsyncClient] = await logged_gather( + *[ + socketio_frontend_clients.enter_async_context(socketio_client_factory()) + for _ in range(_NUMBER_OF_CLIENTS) + ] + ) + await _assert_call_count(server_connect, call_count=_NUMBER_OF_CLIENTS) + + # client emits and check it was received + await logged_gather( + *[ + frontend_client.emit("check", data="an_event") + for frontend_client in frontend_clients + ] + ) + await _assert_call_count(server_on_check, call_count=_NUMBER_OF_CLIENTS) + + # attach spy to client + on_output_port_events: list[AsyncMock] = [ + _get_on_output_port_spy(c) for c in frontend_clients + ] + + port_notifier = PortNotifier(app, user_id, project_id, node_id) + + # server publishes a message + match output_status: + case OutputStatus.UPLOAD_STARTED: + await port_notifier.send_output_port_upload_sarted(port_key) + case OutputStatus.UPLOAD_WAS_ABORTED: + await port_notifier.send_output_port_upload_was_aborted(port_key) + case OutputStatus.UPLOAD_FINISHED_SUCCESSFULLY: + await port_notifier.send_output_port_upload_finished_successfully( + port_key + ) + case OutputStatus.UPLOAD_FINISHED_WITH_ERRROR: + await port_notifier.send_output_port_upload_finished_with_error( + port_key + ) + + # check that all clients received it + for on_output_port_event in on_output_port_events: + await _assert_call_count(on_output_port_event, call_count=1) + on_output_port_event.assert_awaited_once_with( + jsonable_encoder( + OutputPortStatus( + project_id=project_id, + node_id=node_id, + port_key=port_key, + status=output_status, + ) + ) + ) + + await _assert_call_count(server_disconnect, call_count=_NUMBER_OF_CLIENTS) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py index 024d966e424..38b217bab8f 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_filter.py @@ -9,6 +9,9 @@ import pytest from pydantic import ByteSize, NonNegativeFloat, NonNegativeInt, parse_obj_as from pytest_mock.plugin import MockerFixture +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from simcore_service_dynamic_sidecar.modules.outputs._context import OutputsContext from simcore_service_dynamic_sidecar.modules.outputs._event_filter import ( BaseDelayPolicy, @@ -56,10 +59,13 @@ async def outputs_context(outputs_path: Path, port_keys: list[str]) -> OutputsCo @pytest.fixture async def outputs_manager( - outputs_context: OutputsContext, + outputs_context: OutputsContext, port_notifier: PortNotifier ) -> AsyncIterator[OutputsManager]: outputs_manager = OutputsManager( - outputs_context=outputs_context, io_log_redirect_cb=None, progress_cb=None + outputs_context=outputs_context, + port_notifier=port_notifier, + io_log_redirect_cb=None, + progress_cb=None, ) await outputs_manager.start() yield outputs_manager diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py index 5f02a500a4d..35ccc7d72df 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_event_handler.py @@ -10,6 +10,9 @@ import pytest from aioprocessing.queues import AioQueue from pydantic import PositiveFloat +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from simcore_service_dynamic_sidecar.modules.outputs._context import OutputsContext from simcore_service_dynamic_sidecar.modules.outputs._event_handler import ( EventHandlerObserver, @@ -39,10 +42,13 @@ async def outputs_context( @pytest.fixture async def outputs_manager( - outputs_context: OutputsContext, + outputs_context: OutputsContext, port_notifier: PortNotifier ) -> AsyncIterable[OutputsManager]: outputs_manager = OutputsManager( - outputs_context, io_log_redirect_cb=None, progress_cb=None + outputs_context, + port_notifier=port_notifier, + io_log_redirect_cb=None, + progress_cb=None, ) await outputs_manager.start() diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py index 40a3db6d3f9..3bf17d09f92 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py @@ -22,6 +22,9 @@ from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings from simcore_service_dynamic_sidecar.modules.mounted_fs import MountedVolumes +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from simcore_service_dynamic_sidecar.modules.outputs._context import ( OutputsContext, setup_outputs_context, @@ -165,10 +168,11 @@ async def outputs_context( @pytest.fixture async def outputs_manager( - outputs_context: OutputsContext, + outputs_context: OutputsContext, port_notifier: PortNotifier ) -> AsyncIterator[OutputsManager]: outputs_manager = OutputsManager( outputs_context=outputs_context, + port_notifier=port_notifier, io_log_redirect_cb=None, task_monitor_interval_s=0.01, progress_cb=None, diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py index f209e4877a7..7f9b81587c2 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_watcher.py @@ -26,6 +26,9 @@ ) from pytest_mock import MockerFixture from simcore_service_dynamic_sidecar.modules.mounted_fs import MountedVolumes +from simcore_service_dynamic_sidecar.modules.notifications._notifications_ports import ( + PortNotifier, +) from simcore_service_dynamic_sidecar.modules.outputs import ( _watcher as outputs_watcher_core, ) @@ -90,10 +93,11 @@ async def outputs_context( @pytest.fixture async def outputs_manager( - outputs_context: OutputsContext, + outputs_context: OutputsContext, port_notifier: PortNotifier ) -> AsyncIterable[OutputsManager]: outputs_manager = OutputsManager( outputs_context=outputs_context, + port_notifier=port_notifier, io_log_redirect_cb=None, task_monitor_interval_s=TICK_INTERVAL, progress_cb=None, diff --git a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py b/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py deleted file mode 100644 index 73184a1b3cb..00000000000 --- a/services/dynamic-sidecar/tests/unit/test_modules_system_monitor__notifier.py +++ /dev/null @@ -1,204 +0,0 @@ -# pylint:disable=unused-argument -# pylint:disable=redefined-outer-name - -from collections.abc import AsyncIterable, Callable -from contextlib import AsyncExitStack, _AsyncGeneratorContextManager -from pathlib import Path -from unittest.mock import AsyncMock - -import pytest -import socketio -from asgi_lifespan import LifespanManager -from fastapi import FastAPI -from fastapi.encoders import jsonable_encoder -from models_library.api_schemas_dynamic_sidecar.socketio import ( - SOCKET_IO_SERVICE_DISK_USAGE_EVENT, -) -from models_library.api_schemas_dynamic_sidecar.telemetry import ( - DiskUsage, - ServiceDiskUsage, -) -from models_library.api_schemas_webserver.socketio import SocketIORoomStr -from models_library.projects_nodes_io import NodeID -from models_library.users import UserID -from pydantic import ByteSize, NonNegativeInt, parse_obj_as -from pytest_mock import MockerFixture -from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict -from servicelib.utils import logged_gather -from settings_library.rabbit import RabbitSettings -from simcore_service_dynamic_sidecar.core.application import create_app -from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings -from simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage import ( - DiskUsageMonitor, -) -from simcore_service_dynamic_sidecar.modules.system_monitor._notifier import ( - publish_disk_usage, -) -from socketio import AsyncServer -from tenacity import AsyncRetrying -from tenacity.stop import stop_after_delay -from tenacity.wait import wait_fixed - -pytest_simcore_core_services_selection = [ - "rabbit", -] - - -@pytest.fixture -def mock_environment( - monkeypatch: pytest.MonkeyPatch, - rabbit_service: RabbitSettings, - mock_environment: EnvVarsDict, -) -> EnvVarsDict: - return setenvs_from_dict( - monkeypatch, - { - "DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE": "true", - "RABBIT_HOST": rabbit_service.RABBIT_HOST, - "RABBIT_PASSWORD": rabbit_service.RABBIT_PASSWORD.get_secret_value(), - "RABBIT_PORT": f"{rabbit_service.RABBIT_PORT}", - "RABBIT_SECURE": f"{rabbit_service.RABBIT_SECURE}", - "RABBIT_USER": rabbit_service.RABBIT_USER, - }, - ) - - -@pytest.fixture -async def app( - mock_environment: EnvVarsDict, - mock_registry_service: AsyncMock, - mock_storage_check: None, - mock_postgres_check: None, - mocker: MockerFixture, -) -> AsyncIterable[FastAPI]: - mocker.patch( - "simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage._get_monitored_paths", - return_value=[], - ) - - app: FastAPI = create_app() - async with LifespanManager(app): - yield app - - -@pytest.fixture -async def disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor: - return app.state.disk_usage_monitor - - -@pytest.fixture -async def socketio_server( - app: FastAPI, - socketio_server_factory: Callable[ - [RabbitSettings], _AsyncGeneratorContextManager[AsyncServer] - ], -) -> AsyncIterable[AsyncServer]: - # Same configuration as simcore_service_webserver/socketio/server.py - settings: ApplicationSettings = app.state.settings - assert settings.RABBIT_SETTINGS - - async with socketio_server_factory(settings.RABBIT_SETTINGS) as server: - yield server - - -@pytest.fixture -def room_name(user_id: UserID) -> SocketIORoomStr: - return SocketIORoomStr.from_user_id(user_id) - - -def _get_on_service_disk_usage_event( - socketio_client: socketio.AsyncClient, -) -> AsyncMock: - # emulates front-end receiving message - - async def on_service_status(data): - assert parse_obj_as(ServiceDiskUsage, data) is not None - - on_event_spy = AsyncMock(wraps=on_service_status) - socketio_client.on(SOCKET_IO_SERVICE_DISK_USAGE_EVENT, on_event_spy) - - return on_event_spy - - -async def _assert_call_count(mock: AsyncMock, *, call_count: int) -> None: - async for attempt in AsyncRetrying( - wait=wait_fixed(0.1), stop=stop_after_delay(5), reraise=True - ): - with attempt: - assert mock.call_count == call_count - - -def _get_mocked_disk_usage(byte_size_str: str) -> DiskUsage: - return DiskUsage( - total=ByteSize(0), - used=ByteSize(0), - free=ByteSize.validate(byte_size_str), - used_percent=0, - ) - - -@pytest.mark.parametrize( - "usage", - [ - pytest.param({}, id="empty"), - pytest.param({Path("/"): _get_mocked_disk_usage("1kb")}, id="one_entry"), - pytest.param( - { - Path("/"): _get_mocked_disk_usage("1kb"), - Path("/tmp"): _get_mocked_disk_usage("2kb"), # noqa: S108 - }, - id="two_entries", - ), - ], -) -async def test_notifier_publish_message( - disk_usage_monitor: DiskUsageMonitor, - socketio_server_events: dict[str, AsyncMock], - app: FastAPI, - user_id: UserID, - usage: dict[Path, DiskUsage], - node_id: NodeID, - socketio_client_factory: Callable[ - [], _AsyncGeneratorContextManager[socketio.AsyncClient] - ], -): - # web server spy events - server_connect = socketio_server_events["connect"] - server_disconnect = socketio_server_events["disconnect"] - server_on_check = socketio_server_events["on_check"] - - number_of_clients: NonNegativeInt = 10 - async with AsyncExitStack() as socketio_frontend_clients: - frontend_clients: list[socketio.AsyncClient] = await logged_gather( - *[ - socketio_frontend_clients.enter_async_context(socketio_client_factory()) - for _ in range(number_of_clients) - ] - ) - await _assert_call_count(server_connect, call_count=number_of_clients) - - # client emits and check it was received - await logged_gather( - *[ - frontend_client.emit("check", data="an_event") - for frontend_client in frontend_clients - ] - ) - await _assert_call_count(server_on_check, call_count=number_of_clients) - - # attach spy to client - on_service_disk_usage_events: list[AsyncMock] = [ - _get_on_service_disk_usage_event(c) for c in frontend_clients - ] - - # server publishes a message - await publish_disk_usage(app, user_id=user_id, node_id=node_id, usage=usage) - - # check that all clients received it - for on_service_disk_usage_event in on_service_disk_usage_events: - await _assert_call_count(on_service_disk_usage_event, call_count=1) - on_service_disk_usage_event.assert_awaited_once_with( - jsonable_encoder(ServiceDiskUsage(node_id=node_id, usage=usage)) - ) - - await _assert_call_count(server_disconnect, call_count=number_of_clients)