diff --git a/packages/models-library/src/models_library/progress_bar.py b/packages/models-library/src/models_library/progress_bar.py new file mode 100644 index 00000000000..5c0a85503c4 --- /dev/null +++ b/packages/models-library/src/models_library/progress_bar.py @@ -0,0 +1,36 @@ +from typing import Any, ClassVar, Literal, TypeAlias + +from pydantic import BaseModel + +# NOTE: keep a list of possible unit, and please use correct official unit names +ProgressUnit: TypeAlias = Literal["Byte"] + + +class ProgressReport(BaseModel): + actual_value: float + total: float + unit: ProgressUnit | None = None + + @property + def percent_value(self) -> float: + if self.total != 0: + return max(min(self.actual_value / self.total, 1.0), 0.0) + return 0 + + class Config: + frozen = True + schema_extra: ClassVar[dict[str, Any]] = { + "examples": [ + # typical percent progress (no units) + { + "actual_value": 0.3, + "total": 1.0, + }, + # typical byte progress + { + "actual_value": 128.5, + "total": 1024.0, + "unit": "Byte", + }, + ] + } diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index ddf4004f00e..42374baeb4c 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -6,10 +6,10 @@ from typing import Any, Literal, TypeAlias import arrow -from models_library.products import ProductName from pydantic import BaseModel, Field -from pydantic.types import NonNegativeFloat +from .products import ProductName +from .progress_bar import ProgressReport from .projects import ProjectID from .projects_nodes_io import NodeID from .projects_state import RunningState @@ -99,7 +99,7 @@ class ProgressMessageMixin(RabbitMessageBase): progress_type: ProgressType = ( ProgressType.COMPUTATION_RUNNING ) # NOTE: backwards compatible - progress: NonNegativeFloat + report: ProgressReport def routing_key(self) -> str | None: return None diff --git a/packages/models-library/tests/test_rabbit_messages.py b/packages/models-library/tests/test_rabbit_messages.py index 8e2aed5de1e..8c95af75e67 100644 --- a/packages/models-library/tests/test_rabbit_messages.py +++ b/packages/models-library/tests/test_rabbit_messages.py @@ -2,6 +2,7 @@ import pytest from faker import Faker +from models_library.progress_bar import ProgressReport from models_library.rabbitmq_messages import ( ProgressRabbitMessageNode, ProgressRabbitMessageProject, @@ -21,7 +22,7 @@ user_id=faker.uuid4(cast_to=None), node_id=faker.uuid4(cast_to=None), progress_type=ProgressType.SERVICE_OUTPUTS_PULLING, - progress=0.4, + report=ProgressReport(actual_value=0.4, total=1), ).json(), ProgressRabbitMessageNode, id="node_progress", @@ -31,7 +32,7 @@ project_id=faker.uuid4(cast_to=None), user_id=faker.uuid4(cast_to=None), progress_type=ProgressType.PROJECT_CLOSING, - progress=0.4, + report=ProgressReport(actual_value=0.4, total=1), ).json(), ProgressRabbitMessageProject, id="project_progress", diff --git a/packages/service-library/src/servicelib/docker_utils.py b/packages/service-library/src/servicelib/docker_utils.py index b84af7217a5..bad7b3930fe 100644 --- a/packages/service-library/src/servicelib/docker_utils.py +++ b/packages/service-library/src/servicelib/docker_utils.py @@ -204,7 +204,9 @@ async def pull_image( assert parsed_progress.id # nosec layer_id_to_size.setdefault( parsed_progress.id, _PulledStatus(0) - ).downloaded = layer_id_to_size[parsed_progress.id].size + ).downloaded = layer_id_to_size.setdefault( + parsed_progress.id, _PulledStatus(0) + ).size case "extracting": assert parsed_progress.id # nosec assert parsed_progress.progress_detail # nosec diff --git a/packages/service-library/src/servicelib/fastapi/docker_utils.py b/packages/service-library/src/servicelib/fastapi/docker_utils.py index 67713df91cc..be8a268e4cf 100644 --- a/packages/service-library/src/servicelib/fastapi/docker_utils.py +++ b/packages/service-library/src/servicelib/fastapi/docker_utils.py @@ -99,7 +99,7 @@ async def retrieve_image_layer_information( return None -_DEFAULT_MIN_IMAGE_SIZE: Final[ByteSize] = parse_obj_as(ByteSize, "50MiB") +_DEFAULT_MIN_IMAGE_SIZE: Final[ByteSize] = parse_obj_as(ByteSize, "200MiB") async def pull_images( @@ -122,6 +122,7 @@ async def pull_images( async with ProgressBarData( num_steps=images_total_size, progress_report_cb=progress_cb, + progress_unit="Byte", ) as pbar: await asyncio.gather( diff --git a/packages/service-library/src/servicelib/progress_bar.py b/packages/service-library/src/servicelib/progress_bar.py index 30034d0c0c9..37f850553b0 100644 --- a/packages/service-library/src/servicelib/progress_bar.py +++ b/packages/service-library/src/servicelib/progress_bar.py @@ -4,6 +4,9 @@ from inspect import isawaitable from typing import Final, Optional, Protocol, runtime_checkable +from models_library.progress_bar import ProgressReport, ProgressUnit +from pydantic import parse_obj_as + from .logging_utils import log_catch _logger = logging.getLogger(__name__) @@ -14,13 +17,13 @@ @runtime_checkable class AsyncReportCB(Protocol): - async def __call__(self, progress_value: float) -> None: + async def __call__(self, report: ProgressReport) -> None: ... @runtime_checkable class ReportCB(Protocol): - def __call__(self, progress_value: float) -> None: + def __call__(self, report: ProgressReport) -> None: ... @@ -73,6 +76,7 @@ async def main_fct(): "description": "Optionally defines the step relative weight (defaults to steps of equal weights)" }, ) + progress_unit: ProgressUnit | None = None progress_report_cb: AsyncReportCB | ReportCB | None = None _current_steps: float = _INITIAL_VALUE _children: list = field(default_factory=list) @@ -81,6 +85,8 @@ async def main_fct(): _last_report_value: float = _INITIAL_VALUE def __post_init__(self) -> None: + if self.progress_unit is not None: + parse_obj_as(ProgressUnit, self.progress_unit) self._continuous_value_lock = asyncio.Lock() self.num_steps = max(1, self.num_steps) if self.step_weights: @@ -112,7 +118,14 @@ async def _report_external(self, value: float, *, force: bool = False) -> None: or ((value - self._last_report_value) > _MIN_PROGRESS_UPDATE_PERCENT) or value == _FINAL_VALUE ): - call = self.progress_report_cb(value) + call = self.progress_report_cb( + ProgressReport( + # NOTE: here we convert back to actual value since this is possibly weighted + actual_value=value * self.num_steps, + total=self.num_steps, + unit=self.progress_unit, + ), + ) if isawaitable(call): await call self._last_report_value = value diff --git a/packages/service-library/tests/aiohttp/test_docker_utils.py b/packages/service-library/tests/aiohttp/test_docker_utils.py index 26af2daa1d6..5e93c640349 100644 --- a/packages/service-library/tests/aiohttp/test_docker_utils.py +++ b/packages/service-library/tests/aiohttp/test_docker_utils.py @@ -1,10 +1,16 @@ # pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable from collections.abc import Awaitable, Callable from typing import Any +from unittest import mock from unittest.mock import call import pytest from models_library.docker import DockerGenericTag +from models_library.progress_bar import ProgressReport from pydantic import parse_obj_as from pytest_mock import MockerFixture from servicelib import progress_bar @@ -68,6 +74,33 @@ async def test_retrieve_image_layer_information_from_external_registry( assert layer_information +@pytest.fixture +async def mocked_log_cb(mocker: MockerFixture) -> mock.AsyncMock: + async def _log_cb(*args, **kwargs) -> None: + print(f"received log: {args}, {kwargs}") + + return mocker.AsyncMock(side_effect=_log_cb) + + +@pytest.fixture +async def mocked_progress_cb(mocker: MockerFixture) -> mock.AsyncMock: + async def _progress_cb(*args, **kwargs) -> None: + print(f"received progress: {args}, {kwargs}") + + return mocker.AsyncMock(side_effect=_progress_cb) + + +def _assert_progress_report_values( + mocked_progress_cb: mock.AsyncMock, *, total: float +) -> None: + assert mocked_progress_cb.call_args_list[0] == call( + ProgressReport(actual_value=0, total=total) + ) + assert mocked_progress_cb.call_args_list[-1] == call( + ProgressReport(actual_value=total, total=total) + ) + + @pytest.mark.parametrize( "image", ["itisfoundation/sleeper:1.0.0", "nginx:latest"], @@ -76,7 +109,8 @@ async def test_pull_image( remove_images_from_host: Callable[[list[str]], Awaitable[None]], image: DockerGenericTag, registry_settings: RegistrySettings, - mocker: MockerFixture, + mocked_log_cb: mock.AsyncMock, + mocked_progress_cb: mock.AsyncMock, caplog: pytest.LogCaptureFixture, ): # clean first @@ -84,47 +118,52 @@ async def test_pull_image( layer_information = await retrieve_image_layer_information(image, registry_settings) assert layer_information - async def _log_cb(*args, **kwargs) -> None: - print(f"received log: {args}, {kwargs}") - - fake_progress_report_cb = mocker.AsyncMock() async with progress_bar.ProgressBarData( num_steps=layer_information.layers_total_size, - progress_report_cb=fake_progress_report_cb, + progress_report_cb=mocked_progress_cb, ) as main_progress_bar: - fake_log_cb = mocker.AsyncMock(side_effect=_log_cb) await pull_image( - image, registry_settings, main_progress_bar, fake_log_cb, layer_information + image, + registry_settings, + main_progress_bar, + mocked_log_cb, + layer_information, ) - fake_log_cb.assert_called() + mocked_log_cb.assert_called() assert ( main_progress_bar._current_steps # noqa: SLF001 == layer_information.layers_total_size ) - assert fake_progress_report_cb.call_args_list[0] == call(0.0) - fake_progress_report_cb.assert_called_with(1.0) + _assert_progress_report_values( + mocked_progress_cb, total=layer_information.layers_total_size + ) + + mocked_progress_cb.reset_mock() + mocked_log_cb.reset_mock() # check there were no warnings popping up from the docker pull # NOTE: this would pop up in case docker changes its pulling statuses - for record in caplog.records: - assert record.levelname != "WARNING", record.message + assert not [r.message for r in caplog.records if r.levelname == "WARNING"] # pull a second time should, the image is already there async with progress_bar.ProgressBarData( num_steps=layer_information.layers_total_size, - progress_report_cb=fake_progress_report_cb, + progress_report_cb=mocked_progress_cb, ) as main_progress_bar: - fake_log_cb = mocker.AsyncMock(side_effect=_log_cb) await pull_image( - image, registry_settings, main_progress_bar, fake_log_cb, layer_information + image, + registry_settings, + main_progress_bar, + mocked_log_cb, + layer_information, ) - fake_log_cb.assert_called() + mocked_log_cb.assert_called() assert ( main_progress_bar._current_steps # noqa: SLF001 == layer_information.layers_total_size ) - assert fake_progress_report_cb.call_args_list[0] == call(0.0) - fake_progress_report_cb.assert_called_with(1.0) + _assert_progress_report_values( + mocked_progress_cb, total=layer_information.layers_total_size + ) # check there were no warnings - for record in caplog.records: - assert record.levelname != "WARNING", record.message + assert not [r.message for r in caplog.records if r.levelname == "WARNING"] diff --git a/packages/service-library/tests/fastapi/test_docker_utils.py b/packages/service-library/tests/fastapi/test_docker_utils.py index e2e0de947fe..385895c16d5 100644 --- a/packages/service-library/tests/fastapi/test_docker_utils.py +++ b/packages/service-library/tests/fastapi/test_docker_utils.py @@ -3,6 +3,7 @@ # pylint: disable=too-many-arguments # pylint: disable=unused-argument # pylint: disable=unused-variable +import asyncio from collections.abc import Awaitable, Callable from typing import Any from unittest import mock @@ -10,6 +11,7 @@ import pytest from models_library.docker import DockerGenericTag +from models_library.progress_bar import ProgressReport from pydantic import ByteSize, parse_obj_as from pytest_mock import MockerFixture from servicelib import progress_bar @@ -93,6 +95,17 @@ async def _progress_cb(*args, **kwargs) -> None: return mocker.AsyncMock(side_effect=_progress_cb) +def _assert_progress_report_values( + mocked_progress_cb: mock.AsyncMock, *, total: float +) -> None: + assert mocked_progress_cb.call_args_list[0] == call( + ProgressReport(actual_value=0, total=total, unit="Byte") + ) + assert mocked_progress_cb.call_args_list[-1] == call( + ProgressReport(actual_value=total, total=total, unit="Byte") + ) + + @pytest.mark.parametrize( "image", ["itisfoundation/sleeper:1.0.0", "nginx:latest", "busybox:latest"], @@ -112,6 +125,7 @@ async def test_pull_image( async with progress_bar.ProgressBarData( num_steps=layer_information.layers_total_size, progress_report_cb=mocked_progress_cb, + progress_unit="Byte", ) as main_progress_bar: await pull_image( @@ -126,8 +140,10 @@ async def test_pull_image( main_progress_bar._current_steps # noqa: SLF001 == layer_information.layers_total_size ) - assert mocked_progress_cb.call_args_list[0] == call(0.0) - assert mocked_progress_cb.call_args_list[-1] == call(1.0) + _assert_progress_report_values( + mocked_progress_cb, total=layer_information.layers_total_size + ) + mocked_progress_cb.reset_mock() mocked_log_cb.reset_mock() @@ -139,6 +155,7 @@ async def test_pull_image( async with progress_bar.ProgressBarData( num_steps=layer_information.layers_total_size, progress_report_cb=mocked_progress_cb, + progress_unit="Byte", ) as main_progress_bar: await pull_image( image, @@ -152,8 +169,9 @@ async def test_pull_image( main_progress_bar._current_steps # noqa: SLF001 == layer_information.layers_total_size ) - assert mocked_progress_cb.call_args_list[0] == call(0.0) - assert mocked_progress_cb.call_args_list[-1] == call(1.0) + _assert_progress_report_values( + mocked_progress_cb, total=layer_information.layers_total_size + ) # check there were no warnings assert not [r.message for r in caplog.records if r.levelname == "WARNING"] @@ -174,20 +192,21 @@ async def test_pull_image_without_layer_information( layer_information = await retrieve_image_layer_information(image, registry_settings) assert layer_information + fake_number_of_steps = parse_obj_as(ByteSize, "200MiB") + assert fake_number_of_steps > layer_information.layers_total_size async with progress_bar.ProgressBarData( - num_steps=parse_obj_as(ByteSize, "200MiB"), + num_steps=fake_number_of_steps, progress_report_cb=mocked_progress_cb, + progress_unit="Byte", ) as main_progress_bar: await pull_image( image, registry_settings, main_progress_bar, mocked_log_cb, None ) mocked_log_cb.assert_called() - assert ( - main_progress_bar._current_steps # noqa: SLF001 - == layer_information.layers_total_size + assert main_progress_bar._current_steps == float( # noqa: SLF001 + layer_information.layers_total_size ) - assert mocked_progress_cb.call_args_list[0] == call(0.0) - assert mocked_progress_cb.call_args_list[-1] == call(1.0) + _assert_progress_report_values(mocked_progress_cb, total=fake_number_of_steps) mocked_progress_cb.reset_mock() mocked_log_cb.reset_mock() @@ -204,14 +223,14 @@ async def test_pull_image_without_layer_information( async with progress_bar.ProgressBarData( num_steps=1, progress_report_cb=mocked_progress_cb, + progress_unit="Byte", ) as main_progress_bar: await pull_image( image, registry_settings, main_progress_bar, mocked_log_cb, None ) mocked_log_cb.assert_called() assert main_progress_bar._current_steps == 0 # noqa: SLF001 - assert mocked_progress_cb.call_args_list[0] == call(0.0) - assert mocked_progress_cb.call_args_list[-1] == call(1.0) + _assert_progress_report_values(mocked_progress_cb, total=1) # check there were no warnings assert not [ r.message @@ -235,11 +254,18 @@ async def test_pull_images_set( caplog: pytest.LogCaptureFixture, ): await remove_images_from_host(list(images_set)) + layer_informations = await asyncio.gather( + *[ + retrieve_image_layer_information(image, registry_settings) + for image in images_set + ] + ) + assert layer_informations + images_total_size = sum(_.layers_total_size for _ in layer_informations if _) await pull_images(images_set, registry_settings, mocked_progress_cb, mocked_log_cb) mocked_log_cb.assert_called() - assert mocked_progress_cb.call_args_list[0] == call(0.0) - assert mocked_progress_cb.call_args_list[-1] == call(1.0) + _assert_progress_report_values(mocked_progress_cb, total=images_total_size) # check there were no warnings # NOTE: this would pop up in case docker changes its pulling statuses diff --git a/packages/service-library/tests/test_progress_bar.py b/packages/service-library/tests/test_progress_bar.py index 0be8060fa71..dc2432926f9 100644 --- a/packages/service-library/tests/test_progress_bar.py +++ b/packages/service-library/tests/test_progress_bar.py @@ -8,79 +8,116 @@ from unittest import mock import pytest +from models_library.progress_bar import ProgressReport +from pydantic import ValidationError from pytest_mock import MockerFixture -from servicelib.progress_bar import _FINAL_VALUE, _INITIAL_VALUE, ProgressBarData +from servicelib.progress_bar import ( + _INITIAL_VALUE, + _MIN_PROGRESS_UPDATE_PERCENT, + ProgressBarData, +) @pytest.fixture def mocked_progress_bar_cb(mocker: MockerFixture) -> mock.Mock: - return mocker.Mock() + def _progress_cb(*args, **kwargs) -> None: + print(f"received progress: {args}, {kwargs}") + + return mocker.Mock(side_effect=_progress_cb) @pytest.fixture def async_mocked_progress_bar_cb(mocker: MockerFixture) -> mock.AsyncMock: - return mocker.AsyncMock() + async def _progress_cb(*args, **kwargs) -> None: + print(f"received progress: {args}, {kwargs}") + + return mocker.AsyncMock(side_effect=_progress_cb) @pytest.mark.parametrize( "progress_report_cb_type", ["mocked_progress_bar_cb", "async_mocked_progress_bar_cb"], ) -async def test_progress_bar( +async def test_progress_bar_progress_report_cb( progress_report_cb_type: str, mocked_progress_bar_cb: mock.Mock, async_mocked_progress_bar_cb: mock.AsyncMock, ): - mocked_cb = { + mocked_cb: mock.Mock | mock.AsyncMock = { "mocked_progress_bar_cb": mocked_progress_bar_cb, "async_mocked_progress_bar_cb": async_mocked_progress_bar_cb, }[progress_report_cb_type] - - async with ProgressBarData(num_steps=3, progress_report_cb=mocked_cb) as root: - assert root.num_steps == 3 + outer_num_steps = 3 + async with ProgressBarData( + num_steps=outer_num_steps, progress_report_cb=mocked_cb, progress_unit="Byte" + ) as root: + assert root.num_steps == outer_num_steps assert root.step_weights is None # i.e. all steps have equal weight assert root._current_steps == pytest.approx(0) # noqa: SLF001 - mocked_cb.assert_called_once_with(pytest.approx(0)) + mocked_cb.assert_called_once_with( + ProgressReport(actual_value=0, total=outer_num_steps, unit="Byte") + ) mocked_cb.reset_mock() # first step is done right away await root.update() assert root._current_steps == pytest.approx(1) # noqa: SLF001 - mocked_cb.assert_called_once_with(pytest.approx(1 / 3)) + mocked_cb.assert_called_once_with( + ProgressReport(actual_value=1, total=outer_num_steps, unit="Byte") + ) mocked_cb.reset_mock() # 2nd step is a sub progress bar of 10 steps - async with root.sub_progress(steps=10) as sub: + inner_num_steps_step2 = 100 + async with root.sub_progress(steps=inner_num_steps_step2) as sub: assert sub._current_steps == pytest.approx(0) # noqa: SLF001 assert root._current_steps == pytest.approx(1) # noqa: SLF001 - for i in range(10): + for i in range(inner_num_steps_step2): await sub.update() assert sub._current_steps == pytest.approx(float(i + 1)) # noqa: SLF001 assert root._current_steps == pytest.approx( # noqa: SLF001 - 1 + float(i + 1) / 10.0 + 1 + float(i + 1) / float(inner_num_steps_step2) ) + assert sub._current_steps == pytest.approx( # noqa: SLF001 + inner_num_steps_step2 + ) assert root._current_steps == pytest.approx(2) # noqa: SLF001 mocked_cb.assert_called() - assert mocked_cb.call_count == 10 - assert mocked_cb.call_args_list[9].args[0] == pytest.approx(2 / 3) + assert mocked_cb.call_args_list[-1].args[0].percent_value == pytest.approx( + 2 / 3 + ) + for call_index, call in enumerate(mocked_cb.call_args_list[1:-1]): + assert ( + call.args[0].percent_value + - mocked_cb.call_args_list[call_index].args[0].percent_value + ) > _MIN_PROGRESS_UPDATE_PERCENT + mocked_cb.reset_mock() # 3rd step is another subprogress of 50 steps - async with root.sub_progress(steps=50) as sub: + inner_num_steps_step3 = 50 + async with root.sub_progress(steps=inner_num_steps_step3) as sub: assert sub._current_steps == pytest.approx(0) # noqa: SLF001 assert root._current_steps == pytest.approx(2) # noqa: SLF001 - for i in range(50): + for i in range(inner_num_steps_step3): await sub.update() assert sub._current_steps == pytest.approx(float(i + 1)) # noqa: SLF001 assert root._current_steps == pytest.approx( # noqa: SLF001 - 2 + float(i + 1) / 50.0 + 2 + float(i + 1) / float(inner_num_steps_step3) ) + assert sub._current_steps == pytest.approx( # noqa: SLF001 + inner_num_steps_step3 + ) assert root._current_steps == pytest.approx(3) # noqa: SLF001 mocked_cb.assert_called() - assert mocked_cb.call_count == 25 - assert mocked_cb.call_args_list[24].args[0] == pytest.approx(1) + assert mocked_cb.call_args_list[-1].args[0].percent_value == 1.0 mocked_cb.reset_mock() +def test_creating_progress_bar_with_invalid_unit_fails(): + with pytest.raises(ValidationError): + ProgressBarData(num_steps=321, progress_unit="invalid") + + async def test_progress_bar_always_reports_0_on_creation_and_1_on_finish( mocked_progress_bar_cb: mock.Mock, ): @@ -92,11 +129,15 @@ async def test_progress_bar_always_reports_0_on_creation_and_1_on_finish( async with progress_bar as root: assert root is progress_bar assert root._current_steps == 0 # noqa: SLF001 - mocked_progress_bar_cb.assert_called_once_with(0) + mocked_progress_bar_cb.assert_called_once_with( + ProgressReport(actual_value=0, total=num_steps) + ) # going out of scope always updates to final number of steps assert progress_bar._current_steps == num_steps # noqa: SLF001 - assert mocked_progress_bar_cb.call_args_list[-1] == mock.call(_FINAL_VALUE) + assert mocked_progress_bar_cb.call_args_list[-1] == mock.call( + ProgressReport(actual_value=num_steps, total=num_steps) + ) async def test_progress_bar_always_reports_1_on_finish( @@ -114,7 +155,9 @@ async def test_progress_bar_always_reports_1_on_finish( async with progress_bar as root: assert root is progress_bar assert root._current_steps == 0 # noqa: SLF001 - mocked_progress_bar_cb.assert_called_once_with(0) + mocked_progress_bar_cb.assert_called_once_with( + ProgressReport(actual_value=0, total=num_steps) + ) for _ in range(num_chunked_steps): await root.update(chunks) await root.update(last_step) @@ -122,7 +165,9 @@ async def test_progress_bar_always_reports_1_on_finish( # going out of scope always updates to final number of steps assert progress_bar._current_steps == pytest.approx(num_steps) # noqa: SLF001 - assert mocked_progress_bar_cb.call_args_list[-1] == mock.call(_FINAL_VALUE) + assert mocked_progress_bar_cb.call_args_list[-1] == mock.call( + ProgressReport(actual_value=num_steps, total=num_steps) + ) async def test_set_progress( @@ -189,23 +234,33 @@ async def test_too_many_updates_does_not_raise_but_show_warning_with_stack( async def test_weighted_progress_bar(mocked_progress_bar_cb: mock.Mock): + outer_num_steps = 3 async with ProgressBarData( - num_steps=3, + num_steps=outer_num_steps, step_weights=[1, 3, 1], progress_report_cb=mocked_progress_bar_cb, ) as root: - mocked_progress_bar_cb.assert_called_once_with(pytest.approx(0)) + mocked_progress_bar_cb.assert_called_once_with( + ProgressReport(actual_value=0, total=outer_num_steps) + ) mocked_progress_bar_cb.reset_mock() assert root.step_weights == [1 / 5, 3 / 5, 1 / 5, 0] await root.update() - mocked_progress_bar_cb.assert_called_once_with(pytest.approx(1 / 5)) + assert mocked_progress_bar_cb.call_args.args[0].percent_value == pytest.approx( + 1 / 5 + ) mocked_progress_bar_cb.reset_mock() assert root._current_steps == pytest.approx(1) # noqa: SLF001 await root.update() - mocked_progress_bar_cb.assert_called_once_with(pytest.approx(1 / 5 + 3 / 5)) + assert mocked_progress_bar_cb.call_args.args[0].percent_value == pytest.approx( + 1 / 5 + 3 / 5 + ) mocked_progress_bar_cb.reset_mock() assert root._current_steps == pytest.approx(2) # noqa: SLF001 - mocked_progress_bar_cb.assert_called_once_with(1) + + mocked_progress_bar_cb.assert_called_once_with( + ProgressReport(actual_value=outer_num_steps, total=outer_num_steps) + ) mocked_progress_bar_cb.reset_mock() assert root._current_steps == pytest.approx(3) # noqa: SLF001 @@ -213,17 +268,22 @@ async def test_weighted_progress_bar(mocked_progress_bar_cb: mock.Mock): async def test_weighted_progress_bar_with_weighted_sub_progress( mocked_progress_bar_cb: mock.Mock, ): + outer_num_steps = 3 async with ProgressBarData( - num_steps=3, + num_steps=outer_num_steps, step_weights=[1, 3, 1], progress_report_cb=mocked_progress_bar_cb, ) as root: - mocked_progress_bar_cb.assert_called_once_with(pytest.approx(0)) + mocked_progress_bar_cb.assert_called_once_with( + ProgressReport(actual_value=0, total=outer_num_steps) + ) mocked_progress_bar_cb.reset_mock() assert root.step_weights == [1 / 5, 3 / 5, 1 / 5, 0] # first step await root.update() - mocked_progress_bar_cb.assert_called_once_with(pytest.approx(1 / 5)) + assert mocked_progress_bar_cb.call_args.args[0].percent_value == pytest.approx( + 1 / 5 + ) mocked_progress_bar_cb.reset_mock() assert root._current_steps == pytest.approx(1) # noqa: SLF001 @@ -263,12 +323,14 @@ async def test_weighted_progress_bar_with_weighted_sub_progress( assert root._current_steps == pytest.approx(2) # noqa: SLF001 mocked_progress_bar_cb.assert_called() assert mocked_progress_bar_cb.call_count == 5 - assert mocked_progress_bar_cb.call_args_list[4].args[0] == pytest.approx( - 1 / 5 + 3 / 5 - ) + assert mocked_progress_bar_cb.call_args_list[4].args[ + 0 + ].percent_value == pytest.approx(1 / 5 + 3 / 5) mocked_progress_bar_cb.reset_mock() assert root._current_steps == pytest.approx(2) # noqa: SLF001 - mocked_progress_bar_cb.assert_called_once_with(1) + mocked_progress_bar_cb.assert_called_once_with( + ProgressReport(actual_value=outer_num_steps, total=outer_num_steps) + ) mocked_progress_bar_cb.reset_mock() assert root._current_steps == pytest.approx(3) # noqa: SLF001 diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py index ada03d351a0..36204bbe30c 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_common/file_io_utils.py @@ -139,7 +139,7 @@ class ProgressData: @runtime_checkable class LogRedirectCB(Protocol): - async def __call__(self, logs: str) -> None: + async def __call__(self, log: str) -> None: ... @@ -204,9 +204,11 @@ async def download_link_to_file( **( _TQDM_FILE_OPTIONS | { - "miniters": _compute_tqdm_miniters(file_size) - if file_size - else 1 + "miniters": ( + _compute_tqdm_miniters(file_size) + if file_size + else 1 + ) } ), ) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py index d8c88c50a86..4b5699cac52 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/rabbitmq.py @@ -5,6 +5,7 @@ from fastapi import FastAPI from models_library.docker import StandardSimcoreDockerLabels from models_library.generated_models.docker_rest_api import Task +from models_library.progress_bar import ProgressReport from models_library.rabbitmq_messages import ( LoggerRabbitMessage, ProgressRabbitMessageNode, @@ -46,7 +47,7 @@ async def post_task_progress_message(app: FastAPI, task: Task, progress: float) user_id=simcore_label_keys.user_id, project_id=simcore_label_keys.project_id, progress_type=ProgressType.CLUSTER_UP_SCALING, - progress=progress, + report=ProgressReport(actual_value=progress, total=1), ) await post_message(app, message) diff --git a/services/autoscaling/tests/unit/test_utils_rabbitmq.py b/services/autoscaling/tests/unit/test_utils_rabbitmq.py index 9391d656be6..850fe03a19b 100644 --- a/services/autoscaling/tests/unit/test_utils_rabbitmq.py +++ b/services/autoscaling/tests/unit/test_utils_rabbitmq.py @@ -12,6 +12,7 @@ from fastapi import FastAPI from models_library.docker import DockerLabelKey, StandardSimcoreDockerLabels from models_library.generated_models.docker_rest_api import Service, Task +from models_library.progress_bar import ProgressReport from models_library.rabbitmq_messages import ( LoggerRabbitMessage, ProgressRabbitMessageNode, @@ -188,8 +189,8 @@ async def test_post_task_progress_message( node_id=osparc_docker_label_keys.node_id, project_id=osparc_docker_label_keys.project_id, user_id=osparc_docker_label_keys.user_id, - progress=progress_value, progress_type=ProgressType.CLUSTER_UP_SCALING, + report=ProgressReport(actual_value=progress_value, total=1), ) .json() .encode() diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py index 4f438209cf2..0c38c136f6d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_event_create_sidecars.py @@ -6,6 +6,7 @@ from fastapi import FastAPI from fastapi.encoders import jsonable_encoder from models_library.aiodocker_api import AioDockerServiceSpec +from models_library.progress_bar import ProgressReport from models_library.projects import ProjectAtDB from models_library.projects_nodes import Node from models_library.projects_nodes_io import NodeIDStr @@ -255,12 +256,12 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: include=_DYNAMIC_SIDECAR_SERVICE_EXTENDABLE_SPECS, ) ) - rabbit_message = ProgressRabbitMessageNode( + rabbit_message = ProgressRabbitMessageNode.construct( user_id=scheduler_data.user_id, project_id=scheduler_data.project_id, node_id=scheduler_data.node_uuid, progress_type=ProgressType.SIDECARS_PULLING, - progress=0, + report=ProgressReport(actual_value=0, total=1), ) await rabbitmq_client.publish(rabbit_message.channel_name, rabbit_message) dynamic_sidecar_id = await create_service_and_get_id( @@ -273,12 +274,12 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: ) ) - rabbit_message = ProgressRabbitMessageNode( + rabbit_message = ProgressRabbitMessageNode.construct( user_id=scheduler_data.user_id, project_id=scheduler_data.project_id, node_id=scheduler_data.node_uuid, progress_type=ProgressType.SIDECARS_PULLING, - progress=1, + report=ProgressReport(actual_value=1, total=1), ) await rabbitmq_client.publish(rabbit_message.channel_name, rabbit_message) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py index feea24b83cb..87b99c46071 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/rabbitmq.py @@ -1,5 +1,6 @@ from typing import Any +from models_library.progress_bar import ProgressReport from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState @@ -165,7 +166,7 @@ async def publish_service_progress( user_id=user_id, project_id=project_id, node_id=node_id, - progress=progress, + report=ProgressReport(actual_value=progress, total=1), ) await rabbitmq_client.publish(message.channel_name, message) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py index 211a5cf3da9..4d60d7200ea 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py @@ -10,6 +10,7 @@ from typing import Final from fastapi import FastAPI +from models_library.progress_bar import ProgressReport from models_library.rabbitmq_messages import ProgressType from servicelib.async_utils import run_sequentially_in_context from servicelib.fastapi.docker_utils import pull_images @@ -102,9 +103,9 @@ async def docker_compose_pull(app: FastAPI, compose_spec_yaml: str) -> None: registry_settings = app_settings.REGISTRY_SETTINGS list_of_images = get_docker_service_images(compose_spec_yaml) - async def _progress_cb(progress_value: float) -> None: + async def _progress_cb(report: ProgressReport) -> None: await post_progress_message( - app, ProgressType.SERVICE_IMAGES_PULLING, progress_value + app, ProgressType.SERVICE_IMAGES_PULLING, report=report ) async def _log_cb(msg: LogMessageStr, log_level: LogLevelInt) -> None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py index a7649ed6ba3..b0daf7f881c 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py @@ -3,6 +3,7 @@ from typing import cast from fastapi import FastAPI +from models_library.progress_bar import ProgressReport from models_library.rabbitmq_messages import ( EventRabbitMessage, LoggerRabbitMessage, @@ -12,7 +13,6 @@ RabbitMessageBase, RabbitResourceTrackingMessages, ) -from pydantic import NonNegativeFloat from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch, log_context from servicelib.rabbitmq import RabbitMQClient, is_rabbitmq_responsive from settings_library.rabbit import RabbitSettings @@ -38,7 +38,7 @@ async def post_log_message( app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt ) -> None: app_settings: ApplicationSettings = app.state.settings - message = LoggerRabbitMessage( + message = LoggerRabbitMessage.construct( node_id=app_settings.DY_SIDECAR_NODE_ID, user_id=app_settings.DY_SIDECAR_USER_ID, project_id=app_settings.DY_SIDECAR_PROJECT_ID, @@ -50,15 +50,15 @@ async def post_log_message( async def post_progress_message( - app: FastAPI, progress_type: ProgressType, progress_value: NonNegativeFloat + app: FastAPI, progress_type: ProgressType, report: ProgressReport ) -> None: app_settings: ApplicationSettings = app.state.settings - message = ProgressRabbitMessageNode( + message = ProgressRabbitMessageNode.construct( node_id=app_settings.DY_SIDECAR_NODE_ID, user_id=app_settings.DY_SIDECAR_USER_ID, project_id=app_settings.DY_SIDECAR_PROJECT_ID, progress_type=progress_type, - progress=progress_value, + report=report, ) await _post_rabbit_message(app, message) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 593df38ebbb..1b302b434ac 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -6,6 +6,7 @@ from typing import Final from fastapi import FastAPI +from models_library.api_schemas_long_running_tasks.base import ProgressPercent from models_library.generated_models.docker_rest_api import ContainerState from models_library.rabbitmq_messages import ProgressType, SimcorePlatformStatus from pydantic import PositiveInt @@ -150,7 +151,7 @@ async def task_create_service_containers( app: FastAPI, application_health: ApplicationHealth, ) -> list[str]: - progress.update(message="validating service spec", percent=0) + progress.update(message="validating service spec", percent=ProgressPercent(0)) async with shared_store: compose_spec_validation: ComposeSpecValidation = await validate_compose_spec( @@ -178,24 +179,26 @@ async def task_create_service_containers( result = await docker_compose_rm(shared_store.compose_spec, settings) _raise_for_errors(result, "rm") - progress.update(message="pulling images", percent=0.01) + progress.update(message="pulling images", percent=ProgressPercent(0.01)) await post_sidecar_log_message( app, "pulling service images", log_level=logging.INFO ) - await post_progress_message(app, ProgressType.SERVICE_IMAGES_PULLING, 0) await docker_compose_pull(app, shared_store.compose_spec) await post_sidecar_log_message( app, "service images ready", log_level=logging.INFO ) - await post_progress_message(app, ProgressType.SERVICE_IMAGES_PULLING, 1) - progress.update(message="creating and starting containers", percent=0.90) + progress.update( + message="creating and starting containers", percent=ProgressPercent(0.90) + ) await post_sidecar_log_message( app, "starting service containers", log_level=logging.INFO ) await _retry_docker_compose_create(shared_store.compose_spec, settings) - progress.update(message="ensure containers are started", percent=0.95) + progress.update( + message="ensure containers are started", percent=ProgressPercent(0.95) + ) compose_start_result = await _retry_docker_compose_start( shared_store.compose_spec, settings ) @@ -277,7 +280,9 @@ async def _send_resource_tracking_stop(platform_status: SimcorePlatformStatus): await send_service_stopped(app, simcore_platform_status) try: - progress.update(message="running docker-compose-down", percent=0.1) + progress.update( + message="running docker-compose-down", percent=ProgressPercent(0.1) + ) await run_before_shutdown_actions( shared_store, settings.DY_SIDECAR_CALLBACKS_MAPPING.before_shutdown @@ -290,11 +295,13 @@ async def _send_resource_tracking_stop(platform_status: SimcorePlatformStatus): result = await _retry_docker_compose_down(shared_store.compose_spec, settings) _raise_for_errors(result, "down") - progress.update(message="stopping logs", percent=0.9) + progress.update(message="stopping logs", percent=ProgressPercent(0.9)) for container_name in shared_store.container_names: await stop_log_fetching(app, container_name) - progress.update(message="removing pending resources", percent=0.95) + progress.update( + message="removing pending resources", percent=ProgressPercent(0.95) + ) result = await docker_compose_rm(shared_store.compose_spec, settings) _raise_for_errors(result, "rm") except Exception: @@ -308,7 +315,27 @@ async def _send_resource_tracking_stop(platform_status: SimcorePlatformStatus): async with shared_store: shared_store.compose_spec = None shared_store.container_names = [] - progress.update(message="done", percent=0.99) + progress.update(message="done", percent=ProgressPercent(0.99)) + + +async def _restore_state_folder( + app: FastAPI, + *, + settings: ApplicationSettings, + progress_bar: ProgressBarData, + state_path: Path, +) -> None: + await data_manager.pull( + user_id=settings.DY_SIDECAR_USER_ID, + project_id=settings.DY_SIDECAR_PROJECT_ID, + node_uuid=settings.DY_SIDECAR_NODE_ID, + destination_path=state_path, + io_log_redirect_cb=functools.partial( + post_sidecar_log_message, app, log_level=logging.INFO + ), + r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS, + progress_bar=progress_bar, + ) async def task_restore_state( @@ -327,20 +354,7 @@ async def task_restore_state( # NOTE: this implies that the legacy format will always be decompressed # until it is not removed. - async def _restore_state_folder(state_path: Path) -> None: - await data_manager.pull( - user_id=settings.DY_SIDECAR_USER_ID, - project_id=settings.DY_SIDECAR_PROJECT_ID, - node_uuid=settings.DY_SIDECAR_NODE_ID, - destination_path=state_path, - io_log_redirect_cb=functools.partial( - post_sidecar_log_message, app, log_level=logging.INFO - ), - r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS, - progress_bar=root_progress, - ) - - progress.update(message="Downloading state", percent=0.05) + progress.update(message="Downloading state", percent=ProgressPercent(0.05)) state_paths = list(mounted_volumes.disk_state_paths_iter()) await post_sidecar_log_message( app, @@ -355,7 +369,9 @@ async def _restore_state_folder(state_path: Path) -> None: ) as root_progress: await logged_gather( *( - _restore_state_folder(path) + _restore_state_folder( + app, settings=settings, progress_bar=root_progress, state_path=path + ) for path in mounted_volumes.disk_state_paths_iter() ), max_concurrency=CONCURRENCY_STATE_SAVE_RESTORE, @@ -365,7 +381,29 @@ async def _restore_state_folder(state_path: Path) -> None: await post_sidecar_log_message( app, "Finished state downloading", log_level=logging.INFO ) - progress.update(message="state restored", percent=0.99) + progress.update(message="state restored", percent=ProgressPercent(0.99)) + + +async def _save_state_folder( + app: FastAPI, + *, + settings: ApplicationSettings, + progress_bar: ProgressBarData, + state_path: Path, + mounted_volumes: MountedVolumes, +) -> None: + await data_manager.push( + user_id=settings.DY_SIDECAR_USER_ID, + project_id=settings.DY_SIDECAR_PROJECT_ID, + node_uuid=settings.DY_SIDECAR_NODE_ID, + source_path=state_path, + r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS, + exclude_patterns=mounted_volumes.state_exclude, + io_log_redirect_cb=functools.partial( + post_sidecar_log_message, app, log_level=logging.INFO + ), + progress_bar=progress_bar, + ) async def task_save_state( @@ -379,24 +417,7 @@ async def task_save_state( If a legacy archive is detected, it will be removed after saving the new format. """ - - async def _save_state_folder( - state_path: Path, root_progress: ProgressBarData - ) -> None: - await data_manager.push( - user_id=settings.DY_SIDECAR_USER_ID, - project_id=settings.DY_SIDECAR_PROJECT_ID, - node_uuid=settings.DY_SIDECAR_NODE_ID, - source_path=state_path, - r_clone_settings=settings.DY_SIDECAR_R_CLONE_SETTINGS, - exclude_patterns=mounted_volumes.state_exclude, - io_log_redirect_cb=functools.partial( - post_sidecar_log_message, app, log_level=logging.INFO - ), - progress_bar=root_progress, - ) - - progress.update(message="starting state save", percent=0.0) + progress.update(message="starting state save", percent=ProgressPercent(0.0)) state_paths = list(mounted_volumes.disk_state_paths_iter()) async with ProgressBarData( num_steps=len(state_paths), @@ -406,14 +427,20 @@ async def _save_state_folder( ) as root_progress: await logged_gather( *[ - _save_state_folder(state_path, root_progress) + _save_state_folder( + app, + settings=settings, + progress_bar=root_progress, + state_path=state_path, + mounted_volumes=mounted_volumes, + ) for state_path in state_paths ], max_concurrency=CONCURRENCY_STATE_SAVE_RESTORE, ) await post_sidecar_log_message(app, "Finished state saving", log_level=logging.INFO) - progress.update(message="finished state saving", percent=0.99) + progress.update(message="finished state saving", percent=ProgressPercent(0.99)) async def task_ports_inputs_pull( @@ -428,12 +455,12 @@ async def task_ports_inputs_pull( _logger.info("Received request to pull inputs but was ignored") return 0 - progress.update(message="starting inputs pulling", percent=0.0) + progress.update(message="starting inputs pulling", percent=ProgressPercent(0.0)) port_keys = [] if port_keys is None else port_keys await post_sidecar_log_message( app, f"Pulling inputs for {port_keys}", log_level=logging.INFO ) - progress.update(message="pulling inputs", percent=0.1) + progress.update(message="pulling inputs", percent=ProgressPercent(0.1)) async with ProgressBarData( num_steps=1, progress_report_cb=functools.partial( @@ -452,7 +479,7 @@ async def task_ports_inputs_pull( await post_sidecar_log_message( app, "Finished pulling inputs", log_level=logging.INFO ) - progress.update(message="finished inputs pulling", percent=0.99) + progress.update(message="finished inputs pulling", percent=ProgressPercent(0.99)) return int(transferred_bytes) @@ -462,7 +489,7 @@ async def task_ports_outputs_pull( mounted_volumes: MountedVolumes, app: FastAPI, ) -> int: - progress.update(message="starting outputs pulling", percent=0.0) + progress.update(message="starting outputs pulling", percent=ProgressPercent(0.0)) port_keys = [] if port_keys is None else port_keys await post_sidecar_log_message( app, f"Pulling output for {port_keys}", log_level=logging.INFO @@ -485,14 +512,14 @@ async def task_ports_outputs_pull( await post_sidecar_log_message( app, "Finished pulling outputs", log_level=logging.INFO ) - progress.update(message="finished outputs pulling", percent=0.99) + progress.update(message="finished outputs pulling", percent=ProgressPercent(0.99)) return int(transferred_bytes) async def task_ports_outputs_push( progress: TaskProgress, outputs_manager: OutputsManager, app: FastAPI ) -> None: - progress.update(message="starting outputs pushing", percent=0.0) + progress.update(message="starting outputs pushing", percent=ProgressPercent(0.0)) await post_sidecar_log_message( app, f"waiting for outputs {outputs_manager.outputs_context.file_type_port_keys} to be pushed", @@ -504,7 +531,7 @@ async def task_ports_outputs_push( await post_sidecar_log_message( app, "finished outputs pushing", log_level=logging.INFO ) - progress.update(message="finished outputs pushing", percent=0.99) + progress.update(message="finished outputs pushing", percent=ProgressPercent(0.99)) async def task_containers_restart( @@ -519,7 +546,9 @@ async def task_containers_restart( # or some other state, the service will get shutdown, to prevent this # blocking status while containers are being restarted. async with app.state.container_restart_lock: - progress.update(message="starting containers restart", percent=0.0) + progress.update( + message="starting containers restart", percent=ProgressPercent(0.0) + ) if shared_store.compose_spec is None: msg = "No spec for docker compose command was found" raise RuntimeError(msg) @@ -527,20 +556,20 @@ async def task_containers_restart( for container_name in shared_store.container_names: await stop_log_fetching(app, container_name) - progress.update(message="stopped log fetching", percent=0.1) + progress.update(message="stopped log fetching", percent=ProgressPercent(0.1)) result = await docker_compose_restart(shared_store.compose_spec, settings) _raise_for_errors(result, "restart") - progress.update(message="containers restarted", percent=0.8) + progress.update(message="containers restarted", percent=ProgressPercent(0.8)) for container_name in shared_store.container_names: await start_log_fetching(app, container_name) - progress.update(message="started log fetching", percent=0.9) + progress.update(message="started log fetching", percent=ProgressPercent(0.9)) await post_sidecar_log_message( app, "Service was restarted please reload the UI", log_level=logging.INFO ) await post_event_reload_iframe(app) - progress.update(message="started log fetching", percent=0.99) + progress.update(message="started log fetching", percent=ProgressPercent(0.99)) diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py index 3822db681f3..e8ca825986e 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py @@ -413,7 +413,7 @@ async def test_regression_io_log_redirect_cb( # ensure logger used in nodeports deos not change assert inspect.getfullargspec(LogRedirectCB.__call__) == FullArgSpec( - args=["self", "logs"], + args=["self", "log"], varargs=None, varkw=None, defaults=None, @@ -421,6 +421,6 @@ async def test_regression_io_log_redirect_cb( kwonlydefaults=None, annotations={ "return": None, - "logs": str, + "log": str, }, ) diff --git a/services/static-webserver/client/source/class/osparc/data/model/Study.js b/services/static-webserver/client/source/class/osparc/data/model/Study.js index a7f1ac6272e..d92fd874129 100644 --- a/services/static-webserver/client/source/class/osparc/data/model/Study.js +++ b/services/static-webserver/client/source/class/osparc/data/model/Study.js @@ -432,7 +432,7 @@ qx.Class.define("osparc.data.model.Study", { const node = workbench.getNode(nodeId); if (node) { const progressType = nodeProgressData["progress_type"]; - const progress = nodeProgressData["progress"]; + const progress = nodeProgressData["progress_report"]["actual_value"] / nodeProgressData["progress_report"]["total"]; node.setNodeProgressSequence(progressType, progress); } }, diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py index 5c2cff67248..411b7ab5392 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py @@ -12,10 +12,11 @@ NodeGetIdle, NodeGetUnknown, ) +from models_library.progress_bar import ProgressReport from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_messages import ProgressRabbitMessageProject, ProgressType -from pydantic.types import NonNegativeFloat, PositiveInt +from pydantic.types import PositiveInt from servicelib.progress_bar import ProgressBarData from servicelib.rabbitmq import RabbitMQClient, RPCServerError from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler import services @@ -75,13 +76,13 @@ async def _post_progress_message( rabbitmq_client: RabbitMQClient, user_id: PositiveInt, project_id: str, - progress_value: NonNegativeFloat, + report: ProgressReport, ) -> None: progress_message = ProgressRabbitMessageProject( user_id=user_id, project_id=ProjectID(project_id), progress_type=ProgressType.PROJECT_CLOSING, - progress=progress_value, + report=report, ) await rabbitmq_client.publish(progress_message.channel_name, progress_message) diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py index 779d369a17d..4055be3121c 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_exclusive_queue_consumers.py @@ -24,13 +24,12 @@ from ..socketio.messages import ( SOCKET_IO_EVENT, SOCKET_IO_LOG_EVENT, - SOCKET_IO_NODE_PROGRESS_EVENT, SOCKET_IO_NODE_UPDATED_EVENT, - SOCKET_IO_PROJECT_PROGRESS_EVENT, SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT, send_message_to_standard_group, send_message_to_user, ) +from ..socketio.models import WebSocketNodeProgress, WebSocketProjectProgress from ..wallets import api as wallets_api from ._rabbitmq_consumers_common import SubcribeArgumentsTuple, subscribe_to_rabbitmq @@ -39,20 +38,6 @@ _APP_RABBITMQ_CONSUMERS_KEY: Final[str] = f"{__name__}.rabbit_consumers" -def _convert_to_project_progress_event( - message: ProgressRabbitMessageProject, -) -> SocketMessageDict: - return SocketMessageDict( - event_type=SOCKET_IO_PROJECT_PROGRESS_EVENT, - data={ - "project_id": message.project_id, - "user_id": message.user_id, - "progress_type": message.progress_type, - "progress": message.progress, - }, - ) - - async def _convert_to_node_update_event( app: web.Application, message: ProgressRabbitMessageNode ) -> SocketMessageDict | None: @@ -63,7 +48,7 @@ async def _convert_to_node_update_event( if f"{message.node_id}" in project["workbench"]: # update the project node progress with the latest value project["workbench"][f"{message.node_id}"].update( - {"progress": round(message.progress * 100.0)} + {"progress": round(message.report.percent_value * 100.0)} ) return SocketMessageDict( event_type=SOCKET_IO_NODE_UPDATED_EVENT, @@ -79,34 +64,23 @@ async def _convert_to_node_update_event( return None -def _convert_to_node_progress_event( - message: ProgressRabbitMessageNode, -) -> SocketMessageDict: - return SocketMessageDict( - event_type=SOCKET_IO_NODE_PROGRESS_EVENT, - data={ - "project_id": message.project_id, - "node_id": message.node_id, - "user_id": message.user_id, - "progress_type": message.progress_type, - "progress": message.progress, - }, - ) - - async def _progress_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message: ProgressRabbitMessageNode | ProgressRabbitMessageProject = ( parse_raw_as(ProgressRabbitMessageNode | ProgressRabbitMessageProject, data) ) message: SocketMessageDict | None = None if isinstance(rabbit_message, ProgressRabbitMessageProject): - message = _convert_to_project_progress_event(rabbit_message) + message = WebSocketProjectProgress.from_rabbit_message( + rabbit_message + ).to_socket_dict() elif rabbit_message.progress_type is ProgressType.COMPUTATION_RUNNING: message = await _convert_to_node_update_event(app, rabbit_message) else: - message = _convert_to_node_progress_event(rabbit_message) + message = WebSocketNodeProgress.from_rabbit_message( + rabbit_message + ).to_socket_dict() if message: await send_message_to_user( diff --git a/services/web/server/src/simcore_service_webserver/socketio/messages.py b/services/web/server/src/simcore_service_webserver/socketio/messages.py index 08ad506e6ab..849ddf2b1e8 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/messages.py +++ b/services/web/server/src/simcore_service_webserver/socketio/messages.py @@ -10,6 +10,7 @@ from models_library.socketio import SocketMessageDict from models_library.users import GroupID, UserID from models_library.utils.fastapi_encoders import jsonable_encoder +from servicelib.logging_utils import log_catch from socketio import AsyncServer from ._utils import get_socket_server @@ -23,9 +24,8 @@ SOCKET_IO_EVENT: Final[str] = "event" SOCKET_IO_HEARTBEAT_EVENT: Final[str] = "set_heartbeat_emit_interval" SOCKET_IO_LOG_EVENT: Final[str] = "logger" -SOCKET_IO_NODE_PROGRESS_EVENT: Final[str] = "nodeProgress" + SOCKET_IO_NODE_UPDATED_EVENT: Final[str] = "nodeUpdated" -SOCKET_IO_PROJECT_PROGRESS_EVENT: Final[str] = "projectProgress" SOCKET_IO_PROJECT_UPDATED_EVENT: Final[str] = "projectStateUpdated" SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT: Final[str] = "walletOsparcCreditsUpdated" @@ -42,7 +42,7 @@ async def _safe_emit( # client without having to send his message first to rabbitMQ and then back to itself. # # NOTE 2: `emit` method is not designed to be used concurrently - try: + with log_catch(_logger, reraise=False): event = message["event_type"] data = jsonable_encoder(message["data"]) await sio.emit( @@ -51,14 +51,6 @@ async def _safe_emit( room=room, ignore_queue=ignore_queue, ) - except Exception: # pylint: disable=broad-exception-caught - _logger.warning( - "Failed to deliver %s message to %s size=%d", - f"{event=}", - f"{room=}", - len(data), - exc_info=True, - ) async def send_message_to_user( diff --git a/services/web/server/src/simcore_service_webserver/socketio/models.py b/services/web/server/src/simcore_service_webserver/socketio/models.py new file mode 100644 index 00000000000..06e5b9014cb --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/socketio/models.py @@ -0,0 +1,102 @@ +from abc import abstractmethod +from typing import Literal + +from models_library.progress_bar import ProgressReport +from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID +from models_library.rabbitmq_messages import ( + ProgressRabbitMessageNode, + ProgressRabbitMessageProject, + ProgressType, +) +from models_library.socketio import SocketMessageDict +from models_library.users import UserID +from models_library.utils.fastapi_encoders import jsonable_encoder +from pydantic import BaseModel, Field + + +class WebSocketMessageBase(BaseModel): + event_type: str = Field(..., const=True) + + @classmethod + def get_event_type(cls) -> str: + _event_type: str = cls.__fields__["event_type"].default + return _event_type + + @abstractmethod + def to_socket_dict(self) -> SocketMessageDict: + ... + + class Config: + frozen = True + + +class _WebSocketProjectMixin(BaseModel): + project_id: ProjectID + + +class _WebSocketNodeMixin(BaseModel): + node_id: NodeID + + +class _WebSocketUserMixin(BaseModel): + user_id: UserID + + +class _WebSocketProgressMixin(BaseModel): + progress_type: ProgressType + progress_report: ProgressReport + + +class WebSocketProjectProgress( + _WebSocketUserMixin, + _WebSocketProjectMixin, + _WebSocketProgressMixin, + WebSocketMessageBase, +): + event_type: Literal["projectProgress"] = "projectProgress" + + @classmethod + def from_rabbit_message( + cls, message: ProgressRabbitMessageProject + ) -> "WebSocketProjectProgress": + return cls.construct( + user_id=message.user_id, + project_id=message.project_id, + progress_type=message.progress_type, + progress_report=message.report, + ) + + def to_socket_dict(self) -> SocketMessageDict: + return SocketMessageDict( + event_type=self.event_type, + data=jsonable_encoder(self, exclude={"event_type"}), + ) + + +class WebSocketNodeProgress( + _WebSocketUserMixin, + _WebSocketProjectMixin, + _WebSocketNodeMixin, + _WebSocketProgressMixin, + WebSocketMessageBase, +): + event_type: Literal["nodeProgress"] = "nodeProgress" + + @classmethod + def from_rabbit_message( + cls, message: ProgressRabbitMessageNode + ) -> "WebSocketNodeProgress": + return cls.construct( + user_id=message.user_id, + project_id=message.project_id, + node_id=message.node_id, + progress_type=message.progress_type, + progress_report=message.report, + ) + + def to_socket_dict(self) -> SocketMessageDict: + return SocketMessageDict( + event_type=self.event_type, + data=jsonable_encoder(self, exclude={"event_type"}), + ) diff --git a/services/web/server/src/simcore_service_webserver/socketio/plugin.py b/services/web/server/src/simcore_service_webserver/socketio/plugin.py index 7336e031b3e..20ceef31053 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/plugin.py +++ b/services/web/server/src/simcore_service_webserver/socketio/plugin.py @@ -3,6 +3,7 @@ SEE https://github.com/miguelgrinberg/python-socketio """ + import logging from aiohttp import web diff --git a/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py b/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py index 545922fafdf..a26af00fc4c 100644 --- a/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py +++ b/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py @@ -16,6 +16,7 @@ import sqlalchemy as sa from aiohttp.test_utils import TestClient from faker import Faker +from models_library.progress_bar import ProgressReport from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState @@ -57,9 +58,9 @@ from simcore_service_webserver.socketio.messages import ( SOCKET_IO_EVENT, SOCKET_IO_LOG_EVENT, - SOCKET_IO_NODE_PROGRESS_EVENT, SOCKET_IO_NODE_UPDATED_EVENT, ) +from simcore_service_webserver.socketio.models import WebSocketNodeProgress from simcore_service_webserver.socketio.plugin import setup_socketio from tenacity import RetryError from tenacity._asyncio import AsyncRetrying @@ -110,7 +111,7 @@ async def _assert_handler_called(handler: mock.Mock, expected_call: mock._Call) async def _assert_handler_called_with_json( - handler: mock.Mock, expected_call: dict[str, Any] | list[dict[str, Any]] + handler: mock.Mock, expected_call: dict[str, Any] ) -> None: async for attempt in AsyncRetrying( wait=wait_fixed(0.1), @@ -341,7 +342,9 @@ async def test_progress_non_computational_workflow( socket_io_conn = await socketio_client_factory(None, client) mock_progress_handler = mocker.MagicMock() - socket_io_conn.on(SOCKET_IO_NODE_PROGRESS_EVENT, handler=mock_progress_handler) + socket_io_conn.on( + WebSocketNodeProgress.get_event_type(), handler=mock_progress_handler + ) if subscribe_to_logs: assert client.app @@ -351,14 +354,16 @@ async def test_progress_non_computational_workflow( user_id=sender_user_id, project_id=user_project_id, node_id=random_node_id_in_user_project, - progress=0.3, progress_type=progress_type, + report=ProgressReport(actual_value=0.3, total=1), ) await rabbitmq_publisher.publish(progress_message.channel_name, progress_message) call_expected = sender_same_user_id and subscribe_to_logs if call_expected: - expected_call = jsonable_encoder(progress_message, exclude={"channel_name"}) + expected_call = WebSocketNodeProgress.from_rabbit_message( + progress_message + ).to_socket_dict()["data"] await _assert_handler_called_with_json(mock_progress_handler, expected_call) else: await _assert_handler_not_called(mock_progress_handler) @@ -405,8 +410,8 @@ async def test_progress_computational_workflow( user_id=sender_user_id, project_id=user_project_id, node_id=random_node_id_in_user_project, - progress=0.3, progress_type=ProgressType.COMPUTATION_RUNNING, + report=ProgressReport(actual_value=0.3, total=1), ) await rabbitmq_publisher.publish(progress_message.channel_name, progress_message) @@ -418,13 +423,16 @@ async def test_progress_computational_workflow( expected_call |= { "data": user_project["workbench"][f"{random_node_id_in_user_project}"] } - expected_call["data"]["progress"] = int(progress_message.progress * 100) + expected_call["data"]["progress"] = int( + progress_message.report.percent_value * 100 + ) await _assert_handler_called_with_json(mock_progress_handler, expected_call) else: await _assert_handler_not_called(mock_progress_handler) # check the database. doing it after the waiting calls above is safe async with aiopg_engine.acquire() as conn: + assert projects is not None result = await conn.execute( sa.select(projects.c.workbench).where( projects.c.uuid == str(user_project_id) diff --git a/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py b/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py index e18534f8d86..6b5e79cf100 100644 --- a/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py +++ b/services/web/server/tests/unit/isolated/notifications/test_rabbitmq_consumers.py @@ -5,51 +5,79 @@ # pylint: disable=unused-variable from unittest.mock import AsyncMock +from uuid import UUID import pytest from faker import Faker +from models_library.progress_bar import ProgressReport from models_library.rabbitmq_messages import ( ProgressRabbitMessageNode, ProgressRabbitMessageProject, ProgressType, ) -from pydantic import BaseModel +from models_library.socketio import SocketMessageDict from pytest_mock import MockerFixture from simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers import ( _progress_message_parser, ) +from simcore_service_webserver.socketio.models import WebSocketNodeProgress _faker = Faker() @pytest.mark.parametrize( - "raw_data, class_type", + "raw_data, expected_socket_message", [ pytest.param( ProgressRabbitMessageNode( - project_id=_faker.uuid4(cast_to=None), - user_id=_faker.uuid4(cast_to=None), - node_id=_faker.uuid4(cast_to=None), + project_id=UUID("ee825037-599b-4df1-ba44-731dd48287fa"), + user_id=123, + node_id=UUID("6925403d-5464-4d92-9ec9-72c5793ca203"), progress_type=ProgressType.SERVICE_OUTPUTS_PULLING, - progress=0.4, + report=ProgressReport(actual_value=0.4, total=1), ).json(), - ProgressRabbitMessageNode, + SocketMessageDict( + event_type=WebSocketNodeProgress.get_event_type(), + data={ + "project_id": "ee825037-599b-4df1-ba44-731dd48287fa", + "node_id": "6925403d-5464-4d92-9ec9-72c5793ca203", + "user_id": 123, + "progress_type": ProgressType.SERVICE_OUTPUTS_PULLING.value, + "progress_report": { + "actual_value": 0.4, + "total": 1.0, + "unit": None, + }, + }, + ), id="node_progress", ), pytest.param( ProgressRabbitMessageProject( - project_id=_faker.uuid4(cast_to=None), - user_id=_faker.uuid4(cast_to=None), + project_id=UUID("ee825037-599b-4df1-ba44-731dd48287fa"), + user_id=123, progress_type=ProgressType.PROJECT_CLOSING, - progress=0.4, + report=ProgressReport(actual_value=0.4, total=1), ).json(), - ProgressRabbitMessageProject, + SocketMessageDict( + event_type=WebSocketNodeProgress.get_event_type(), + data={ + "project_id": "ee825037-599b-4df1-ba44-731dd48287fa", + "user_id": 123, + "progress_type": ProgressType.PROJECT_CLOSING.value, + "progress_report": { + "actual_value": 0.4, + "total": 1.0, + "unit": None, + }, + }, + ), id="project_progress", ), ], ) async def test_regression_progress_message_parser( - mocker: MockerFixture, raw_data: bytes, class_type: type[BaseModel] + mocker: MockerFixture, raw_data: bytes, expected_socket_message: SocketMessageDict ): send_messages_to_user_mock = mocker.patch( "simcore_service_webserver.notifications._rabbitmq_exclusive_queue_consumers.send_message_to_user", @@ -64,4 +92,4 @@ async def test_regression_progress_message_parser( message = send_messages_to_user_mock.call_args.kwargs["message"] # check that all fields are sent as expected - assert class_type.parse_obj(message["data"]) + assert message["data"] == expected_socket_message["data"]