Skip to content

Commit

Permalink
🎨 publish port events to frontend (ITISFoundation#6396)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
2 people authored and jsaq007 committed Sep 25, 2024
1 parent 8129783 commit 04b118c
Show file tree
Hide file tree
Showing 26 changed files with 866 additions and 259 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from enum import auto

from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.utils.enums import StrAutoEnum
from pydantic import BaseModel


class OutputStatus(StrAutoEnum):
UPLOAD_STARTED = auto()
UPLOAD_WAS_ABORTED = auto()
UPLOAD_FINISHED_SUCCESSFULLY = auto()
UPLOAD_FINISHED_WITH_ERRROR = auto()


class InputStatus(StrAutoEnum):
DOWNLOAD_STARTED = auto()
DOWNLOAD_WAS_ABORTED = auto()
DOWNLOAD_FINISHED_SUCCESSFULLY = auto()
DOWNLOAD_FINISHED_WITH_ERRROR = auto()


class _PortStatusCommon(BaseModel):
project_id: ProjectID
node_id: NodeID
port_key: ServicePortKey


class OutputPortStatus(_PortStatusCommon):
status: OutputStatus


class InputPortSatus(_PortStatusCommon):
status: InputStatus
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Final

SOCKET_IO_SERVICE_DISK_USAGE_EVENT: Final[str] = "serviceDiskUsage"
SOCKET_IO_STATE_OUTPUT_PORTS_EVENT: Final[str] = "stateOutputPorts"
SOCKET_IO_STATE_INPUT_PORTS_EVENT: Final[str] = "stateInputPorts"
65 changes: 50 additions & 15 deletions packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from abc import ABC, abstractmethod
from asyncio import CancelledError
from collections.abc import Callable, Coroutine
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -27,6 +29,20 @@
log = logging.getLogger(__name__)


class OutputsCallbacks(ABC):
@abstractmethod
async def aborted(self, key: ServicePortKey) -> None:
pass

@abstractmethod
async def finished_succesfully(self, key: ServicePortKey) -> None:
pass

@abstractmethod
async def finished_with_error(self, key: ServicePortKey) -> None:
pass


class Nodeports(BaseModel):
"""
Represents a node in a project and all its input/output ports
Expand Down Expand Up @@ -148,6 +164,7 @@ async def set_multiple(
],
*,
progress_bar: ProgressBarData,
outputs_callbacks: OutputsCallbacks | None,
) -> None:
"""
Sets the provided values to the respective input or output ports
Expand All @@ -156,26 +173,44 @@ async def set_multiple(
raises ValidationError
"""

async def _set_with_notifications(
port_key: ServicePortKey,
value: ItemConcreteValue | None,
set_kwargs: SetKWargs | None,
sub_progress: ProgressBarData,
) -> None:
try:
# pylint: disable=protected-access
await self.internal_outputs[port_key]._set( # noqa: SLF001
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
if outputs_callbacks:
await outputs_callbacks.finished_succesfully(port_key)
except UnboundPortError:
# not available try inputs
# if this fails it will raise another exception
# pylint: disable=protected-access
await self.internal_inputs[port_key]._set( # noqa: SLF001
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
except CancelledError:
if outputs_callbacks:
await outputs_callbacks.aborted(port_key)
raise
except Exception:
if outputs_callbacks:
await outputs_callbacks.finished_with_error(port_key)
raise

tasks = []
async with progress_bar.sub_progress(
steps=len(port_values.items()), description=IDStr("set multiple")
) as sub_progress:
for port_key, (value, set_kwargs) in port_values.items():
# pylint: disable=protected-access
try:
tasks.append(
self.internal_outputs[port_key]._set(
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
)
except UnboundPortError:
# not available try inputs
# if this fails it will raise another exception
tasks.append(
self.internal_inputs[port_key]._set(
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
)
tasks.append(
_set_with_notifications(port_key, value, set_kwargs, sub_progress)
)

results = await logged_gather(*tasks)
await self.save_to_db_cb(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from collections.abc import Awaitable, Callable, Iterable
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock
from uuid import uuid4

import np_helpers
Expand All @@ -28,13 +29,14 @@
SimcoreS3FileID,
)
from models_library.services_types import ServicePortKey
from pytest_mock import MockerFixture
from servicelib.progress_bar import ProgressBarData
from settings_library.r_clone import RCloneSettings
from simcore_sdk import node_ports_v2
from simcore_sdk.node_ports_common.exceptions import UnboundPortError
from simcore_sdk.node_ports_v2 import exceptions
from simcore_sdk.node_ports_v2.links import ItemConcreteValue, PortLink
from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports
from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports, OutputsCallbacks
from simcore_sdk.node_ports_v2.port import Port
from utils_port_v2 import CONSTANT_UUID

Expand Down Expand Up @@ -749,6 +751,34 @@ async def _upload_create_task(item_key: str) -> None:
)


class _Callbacks(OutputsCallbacks):
async def aborted(self, key: ServicePortKey) -> None:
pass

async def finished_succesfully(self, key: ServicePortKey) -> None:
pass

async def finished_with_error(self, key: ServicePortKey) -> None:
pass


@pytest.fixture
async def output_callbacks() -> _Callbacks:
return _Callbacks()


@pytest.fixture
async def spy_outputs_callbaks(
mocker: MockerFixture, output_callbacks: _Callbacks
) -> dict[str, AsyncMock]:
return {
"aborted": mocker.spy(output_callbacks, "aborted"),
"finished_succesfully": mocker.spy(output_callbacks, "finished_succesfully"),
"finished_with_error": mocker.spy(output_callbacks, "finished_with_error"),
}


@pytest.mark.parametrize("use_output_callbacks", [True, False])
async def test_batch_update_inputs_outputs(
user_id: int,
project_id: str,
Expand All @@ -757,7 +787,12 @@ async def test_batch_update_inputs_outputs(
port_count: int,
option_r_clone_settings: RCloneSettings | None,
faker: Faker,
output_callbacks: _Callbacks,
spy_outputs_callbaks: dict[str, AsyncMock],
use_output_callbacks: bool,
) -> None:
callbacks = output_callbacks if use_output_callbacks else None

outputs = [(f"value_out_{i}", "integer", None) for i in range(port_count)]
inputs = [(f"value_in_{i}", "integer", None) for i in range(port_count)]
config_dict, _, _ = create_special_configuration(inputs=inputs, outputs=outputs)
Expand All @@ -771,12 +806,14 @@ async def test_batch_update_inputs_outputs(
await check_config_valid(PORTS, config_dict)

async with ProgressBarData(num_steps=2, description=faker.pystr()) as progress_bar:
port_values = (await PORTS.outputs).values()
await PORTS.set_multiple(
{
ServicePortKey(port.key): (k, None)
for k, port in enumerate((await PORTS.outputs).values())
},
{ServicePortKey(port.key): (k, None) for k, port in enumerate(port_values)},
progress_bar=progress_bar,
outputs_callbacks=callbacks,
)
assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == (
len(port_values) if use_output_callbacks else 0
)
# pylint: disable=protected-access
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001
Expand All @@ -786,6 +823,11 @@ async def test_batch_update_inputs_outputs(
for k, port in enumerate((await PORTS.inputs).values(), start=1000)
},
progress_bar=progress_bar,
outputs_callbacks=callbacks,
)
# inputs do not trigger callbacks
assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == (
len(port_values) if use_output_callbacks else 0
)
assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001

Expand All @@ -807,4 +849,11 @@ async def test_batch_update_inputs_outputs(
await PORTS.set_multiple(
{ServicePortKey("missing_key_in_both"): (123132, None)},
progress_bar=progress_bar,
outputs_callbacks=callbacks,
)

assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == (
len(port_values) if use_output_callbacks else 0
)
assert len(spy_outputs_callbaks["aborted"].call_args_list) == 0
assert len(spy_outputs_callbaks["finished_with_error"].call_args_list) == 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from pathlib import Path
from typing import Any, Callable
from unittest.mock import AsyncMock

import pytest
from faker import Faker
Expand Down Expand Up @@ -138,6 +139,7 @@ async def mock_node_port_creator_cb(*args, **kwargs):
+ list(original_outputs.values())
},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
)
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ async def _mocked_context_manger(*args, **kwargs) -> AsyncIterator[None]:
)


@pytest.mark.flaky(max_runs=3)
async def test_legacy_and_dynamic_sidecar_run(
initialized_app: FastAPI,
wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ async def ports_inputs_pull_task(
request: Request,
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
app: Annotated[FastAPI, Depends(get_application)],
settings: Annotated[ApplicationSettings, Depends(get_settings)],
mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)],
inputs_state: Annotated[InputsState, Depends(get_inputs_state)],
port_keys: list[str] | None = None,
Expand All @@ -223,6 +224,7 @@ async def ports_inputs_pull_task(
port_keys=port_keys,
mounted_volumes=mounted_volumes,
app=app,
settings=settings,
inputs_pulling_enabled=inputs_state.inputs_pulling_enabled,
)
except TaskAlreadyRunningError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..modules.attribute_monitor import setup_attribute_monitor
from ..modules.inputs import setup_inputs
from ..modules.mounted_fs import MountedVolumes, setup_mounted_fs
from ..modules.notifications import setup_notifications
from ..modules.outputs import setup_outputs
from ..modules.prometheus_metrics import setup_prometheus_metrics
from ..modules.resource_tracking import setup_resource_tracking
Expand Down Expand Up @@ -172,6 +173,7 @@ def create_app():
setup_rabbitmq(app)
setup_background_log_fetcher(app)
setup_resource_tracking(app)
setup_notifications(app)
setup_system_monitor(app)

setup_mounted_fs(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from ..models.shared_store import SharedStore
from ..modules import nodeports, user_services_preferences
from ..modules.mounted_fs import MountedVolumes
from ..modules.notifications._notifications_ports import PortNotifier
from ..modules.outputs import OutputsManager, event_propagation_disabled
from .long_running_tasksutils import run_before_shutdown_actions
from .resource_tracking import send_service_started, send_service_stopped
Expand Down Expand Up @@ -472,6 +473,7 @@ async def task_ports_inputs_pull(
port_keys: list[str] | None,
mounted_volumes: MountedVolumes,
app: FastAPI,
settings: ApplicationSettings,
*,
inputs_pulling_enabled: bool,
) -> int:
Expand Down Expand Up @@ -505,6 +507,12 @@ async def task_ports_inputs_pull(
post_sidecar_log_message, app, log_level=logging.INFO
),
progress_bar=root_progress,
port_notifier=PortNotifier(
app,
settings.DY_SIDECAR_USER_ID,
settings.DY_SIDECAR_PROJECT_ID,
settings.DY_SIDECAR_NODE_ID,
),
)
await post_sidecar_log_message(
app, "Finished pulling inputs", log_level=logging.INFO
Expand Down Expand Up @@ -541,6 +549,7 @@ async def task_ports_outputs_pull(
post_sidecar_log_message, app, log_level=logging.INFO
),
progress_bar=root_progress,
port_notifier=None,
)
await post_sidecar_log_message(
app, "Finished pulling outputs", log_level=logging.INFO
Expand Down
Loading

0 comments on commit 04b118c

Please sign in to comment.