Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 publish port events to frontend #6396

Merged
merged 31 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
220b7b6
extracting notifications as seprate module
Sep 19, 2024
b419e64
refactor internals
Sep 19, 2024
71678d6
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 19, 2024
d1c838e
rename module
Sep 19, 2024
fc88f8d
extend notifications interface
Sep 19, 2024
6bd6d15
added notifications tests
Sep 19, 2024
249dda7
refactor to work as expected
Sep 19, 2024
0c2153c
added missing project_id to notification
Sep 19, 2024
4e22b75
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 19, 2024
4ed8e2a
adding missing port notifier to constructor
Sep 19, 2024
d9950fa
refactor broken tests
Sep 19, 2024
796e1b5
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 20, 2024
9973d68
fixed tests
Sep 20, 2024
9f2854f
using str auto enum
Sep 20, 2024
d0a7698
fixed relative imports
Sep 23, 2024
c4fe846
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 23, 2024
525bce1
refactor using function
Sep 23, 2024
bf7408a
using limited_gather
Sep 23, 2024
0085f4b
outputs_callback is now totally optional
Sep 23, 2024
05ed52e
added tests
Sep 23, 2024
5d7d1a3
refactor tests
Sep 23, 2024
cb2662c
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 23, 2024
4136cde
fix flaky test
Sep 23, 2024
ab3ace9
pylint
Sep 23, 2024
d4d5fd9
Merge remote-tracking branch 'upstream/master' into pr-osparc-port-ch…
Sep 24, 2024
82b4701
revert changes
Sep 24, 2024
8cb8520
Merge branch 'master' into pr-osparc-port-change-notifications
GitHK Sep 24, 2024
4c32ec0
restore flaky marker
Sep 24, 2024
12e72b7
Merge branch 'pr-osparc-port-change-notifications' of github.com:GitH…
Sep 24, 2024
78771ae
Merge branch 'master' into pr-osparc-port-change-notifications
GitHK Sep 24, 2024
886b151
Merge branch 'master' into pr-osparc-port-change-notifications
GitHK Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from enum import auto

from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import ServicePortKey
from models_library.utils.enums import StrAutoEnum
from pydantic import BaseModel


class OutputStatus(StrAutoEnum):
GitHK marked this conversation as resolved.
Show resolved Hide resolved
UPLOAD_STARTED = auto()
UPLOAD_WAS_ABORTED = auto()
UPLOAD_FINISHED_SUCCESSFULLY = auto()
UPLOAD_FINISHED_WITH_ERRROR = auto()


class InputStatus(StrAutoEnum):
DOWNLOAD_STARTED = auto()
DOWNLOAD_WAS_ABORTED = auto()
DOWNLOAD_FINISHED_SUCCESSFULLY = auto()
DOWNLOAD_FINISHED_WITH_ERRROR = auto()


class _PortStatusCommon(BaseModel):
project_id: ProjectID
node_id: NodeID
port_key: ServicePortKey


class OutputPortStatus(_PortStatusCommon):
status: OutputStatus


class InputPortSatus(_PortStatusCommon):
status: InputStatus
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Final

SOCKET_IO_SERVICE_DISK_USAGE_EVENT: Final[str] = "serviceDiskUsage"
SOCKET_IO_STATE_OUTPUT_PORTS_EVENT: Final[str] = "stateOutputPorts"
SOCKET_IO_STATE_INPUT_PORTS_EVENT: Final[str] = "stateInputPorts"
65 changes: 50 additions & 15 deletions packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from abc import ABC, abstractmethod
from asyncio import CancelledError
from collections.abc import Callable, Coroutine
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -27,6 +29,20 @@
log = logging.getLogger(__name__)


class OutputsCallbacks(ABC):
GitHK marked this conversation as resolved.
Show resolved Hide resolved
@abstractmethod
async def aborted(self, key: ServicePortKey) -> None:
pass

@abstractmethod
async def finished_succesfully(self, key: ServicePortKey) -> None:
pass

@abstractmethod
async def finished_with_error(self, key: ServicePortKey) -> None:
pass


class Nodeports(BaseModel):
"""
Represents a node in a project and all its input/output ports
Expand Down Expand Up @@ -148,6 +164,7 @@ async def set_multiple(
],
*,
progress_bar: ProgressBarData,
outputs_callbacks: OutputsCallbacks | None,
) -> None:
"""
Sets the provided values to the respective input or output ports
Expand All @@ -156,26 +173,44 @@ async def set_multiple(

raises ValidationError
"""

async def _set_with_notifications(
port_key: ServicePortKey,
value: ItemConcreteValue | None,
set_kwargs: SetKWargs | None,
sub_progress: ProgressBarData,
) -> None:
try:
# pylint: disable=protected-access
await self.internal_outputs[port_key]._set( # noqa: SLF001
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
if outputs_callbacks:
await outputs_callbacks.finished_succesfully(port_key)
except UnboundPortError:
# not available try inputs
# if this fails it will raise another exception
# pylint: disable=protected-access
await self.internal_inputs[port_key]._set( # noqa: SLF001
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
except CancelledError:
if outputs_callbacks:
await outputs_callbacks.aborted(port_key)
raise
except Exception:
if outputs_callbacks:
await outputs_callbacks.finished_with_error(port_key)
raise

tasks = []
async with progress_bar.sub_progress(
steps=len(port_values.items()), description=IDStr("set multiple")
) as sub_progress:
for port_key, (value, set_kwargs) in port_values.items():
# pylint: disable=protected-access
try:
tasks.append(
self.internal_outputs[port_key]._set(
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
)
except UnboundPortError:
# not available try inputs
# if this fails it will raise another exception
tasks.append(
self.internal_inputs[port_key]._set(
value, set_kwargs=set_kwargs, progress_bar=sub_progress
)
)
tasks.append(
_set_with_notifications(port_key, value, set_kwargs, sub_progress)
)

results = await logged_gather(*tasks)
await self.save_to_db_cb(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from collections.abc import Awaitable, Callable, Iterable
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock
from uuid import uuid4

import np_helpers
Expand All @@ -28,13 +29,14 @@
SimcoreS3FileID,
)
from models_library.services_types import ServicePortKey
from pytest_mock import MockerFixture
from servicelib.progress_bar import ProgressBarData
from settings_library.r_clone import RCloneSettings
from simcore_sdk import node_ports_v2
from simcore_sdk.node_ports_common.exceptions import UnboundPortError
from simcore_sdk.node_ports_v2 import exceptions
from simcore_sdk.node_ports_v2.links import ItemConcreteValue, PortLink
from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports
from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports, OutputsCallbacks
from simcore_sdk.node_ports_v2.port import Port
from utils_port_v2 import CONSTANT_UUID

Expand Down Expand Up @@ -749,6 +751,34 @@ async def _upload_create_task(item_key: str) -> None:
)


class _Callbacks(OutputsCallbacks):
async def aborted(self, key: ServicePortKey) -> None:
pass

async def finished_succesfully(self, key: ServicePortKey) -> None:
pass

async def finished_with_error(self, key: ServicePortKey) -> None:
pass


@pytest.fixture
async def output_callbacks() -> _Callbacks:
return _Callbacks()


@pytest.fixture
async def spy_outputs_callbaks(
mocker: MockerFixture, output_callbacks: _Callbacks
) -> dict[str, AsyncMock]:
return {
"aborted": mocker.spy(output_callbacks, "aborted"),
"finished_succesfully": mocker.spy(output_callbacks, "finished_succesfully"),
"finished_with_error": mocker.spy(output_callbacks, "finished_with_error"),
}


@pytest.mark.parametrize("use_output_callbacks", [True, False])
async def test_batch_update_inputs_outputs(
user_id: int,
project_id: str,
Expand All @@ -757,7 +787,12 @@ async def test_batch_update_inputs_outputs(
port_count: int,
option_r_clone_settings: RCloneSettings | None,
faker: Faker,
output_callbacks: _Callbacks,
spy_outputs_callbaks: dict[str, AsyncMock],
use_output_callbacks: bool,
) -> None:
callbacks = output_callbacks if use_output_callbacks else None

outputs = [(f"value_out_{i}", "integer", None) for i in range(port_count)]
inputs = [(f"value_in_{i}", "integer", None) for i in range(port_count)]
config_dict, _, _ = create_special_configuration(inputs=inputs, outputs=outputs)
Expand All @@ -771,12 +806,14 @@ async def test_batch_update_inputs_outputs(
await check_config_valid(PORTS, config_dict)

async with ProgressBarData(num_steps=2, description=faker.pystr()) as progress_bar:
port_values = (await PORTS.outputs).values()
await PORTS.set_multiple(
{
ServicePortKey(port.key): (k, None)
for k, port in enumerate((await PORTS.outputs).values())
},
{ServicePortKey(port.key): (k, None) for k, port in enumerate(port_values)},
progress_bar=progress_bar,
outputs_callbacks=callbacks,
)
assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == (
len(port_values) if use_output_callbacks else 0
)
# pylint: disable=protected-access
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001
Expand All @@ -786,6 +823,11 @@ async def test_batch_update_inputs_outputs(
for k, port in enumerate((await PORTS.inputs).values(), start=1000)
},
progress_bar=progress_bar,
outputs_callbacks=callbacks,
)
# inputs do not trigger callbacks
assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == (
len(port_values) if use_output_callbacks else 0
)
assert progress_bar._current_steps == pytest.approx(2) # noqa: SLF001

Expand All @@ -807,4 +849,11 @@ async def test_batch_update_inputs_outputs(
await PORTS.set_multiple(
{ServicePortKey("missing_key_in_both"): (123132, None)},
progress_bar=progress_bar,
outputs_callbacks=callbacks,
)

assert len(spy_outputs_callbaks["finished_succesfully"].call_args_list) == (
len(port_values) if use_output_callbacks else 0
)
assert len(spy_outputs_callbaks["aborted"].call_args_list) == 0
assert len(spy_outputs_callbaks["finished_with_error"].call_args_list) == 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from pathlib import Path
from typing import Any, Callable
from unittest.mock import AsyncMock

import pytest
from faker import Faker
Expand Down Expand Up @@ -138,6 +139,7 @@ async def mock_node_port_creator_cb(*args, **kwargs):
+ list(original_outputs.values())
},
progress_bar=progress_bar,
outputs_callbacks=AsyncMock(),
GitHK marked this conversation as resolved.
Show resolved Hide resolved
)
assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ async def _mocked_context_manger(*args, **kwargs) -> AsyncIterator[None]:
)


@pytest.mark.flaky(max_runs=3)
async def test_legacy_and_dynamic_sidecar_run(
initialized_app: FastAPI,
wait_for_catalog_service: Callable[[UserID, str], Awaitable[None]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ async def ports_inputs_pull_task(
request: Request,
tasks_manager: Annotated[TasksManager, Depends(get_tasks_manager)],
app: Annotated[FastAPI, Depends(get_application)],
settings: Annotated[ApplicationSettings, Depends(get_settings)],
mounted_volumes: Annotated[MountedVolumes, Depends(get_mounted_volumes)],
inputs_state: Annotated[InputsState, Depends(get_inputs_state)],
port_keys: list[str] | None = None,
Expand All @@ -223,6 +224,7 @@ async def ports_inputs_pull_task(
port_keys=port_keys,
mounted_volumes=mounted_volumes,
app=app,
settings=settings,
inputs_pulling_enabled=inputs_state.inputs_pulling_enabled,
)
except TaskAlreadyRunningError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..modules.attribute_monitor import setup_attribute_monitor
from ..modules.inputs import setup_inputs
from ..modules.mounted_fs import MountedVolumes, setup_mounted_fs
from ..modules.notifications import setup_notifications
from ..modules.outputs import setup_outputs
from ..modules.prometheus_metrics import setup_prometheus_metrics
from ..modules.resource_tracking import setup_resource_tracking
Expand Down Expand Up @@ -172,6 +173,7 @@ def create_app():
setup_rabbitmq(app)
setup_background_log_fetcher(app)
setup_resource_tracking(app)
setup_notifications(app)
GitHK marked this conversation as resolved.
Show resolved Hide resolved
setup_system_monitor(app)

setup_mounted_fs(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from ..models.shared_store import SharedStore
from ..modules import nodeports, user_services_preferences
from ..modules.mounted_fs import MountedVolumes
from ..modules.notifications._notifications_ports import PortNotifier
from ..modules.outputs import OutputsManager, event_propagation_disabled
from .long_running_tasksutils import run_before_shutdown_actions
from .resource_tracking import send_service_started, send_service_stopped
Expand Down Expand Up @@ -472,6 +473,7 @@ async def task_ports_inputs_pull(
port_keys: list[str] | None,
mounted_volumes: MountedVolumes,
app: FastAPI,
settings: ApplicationSettings,
*,
inputs_pulling_enabled: bool,
) -> int:
Expand Down Expand Up @@ -505,6 +507,12 @@ async def task_ports_inputs_pull(
post_sidecar_log_message, app, log_level=logging.INFO
),
progress_bar=root_progress,
port_notifier=PortNotifier(
app,
settings.DY_SIDECAR_USER_ID,
settings.DY_SIDECAR_PROJECT_ID,
settings.DY_SIDECAR_NODE_ID,
),
)
await post_sidecar_log_message(
app, "Finished pulling inputs", log_level=logging.INFO
Expand Down Expand Up @@ -541,6 +549,7 @@ async def task_ports_outputs_pull(
post_sidecar_log_message, app, log_level=logging.INFO
),
progress_bar=root_progress,
port_notifier=None,
)
await post_sidecar_log_message(
app, "Finished pulling outputs", log_level=logging.INFO
Expand Down
Loading
Loading