From ef977966d0802480f6c3b7d4f29e90ffdb1e4d51 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 1 Aug 2024 11:15:46 +0200 Subject: [PATCH 01/10] fixed mypy checks --- .github/workflows/ci-testing-deploy.yml | 1 - .../servicelib/long_running_tasks/_task.py | 12 ++++++---- .../src/simcore_sdk/node_ports_v2/port.py | 3 ++- .../dynamic-sidecar/requirements/_test.in | 6 ++--- .../dynamic-sidecar/requirements/_test.txt | 2 ++ services/dynamic-sidecar/setup.cfg | 4 ++++ .../api/_dependencies.py | 24 +++++++++---------- .../api/containers.py | 4 +--- .../api/containers_long_running_tasks.py | 18 +++++++------- .../core/docker_utils.py | 4 ++-- .../core/registry.py | 3 ++- .../core/settings.py | 2 +- .../_logging_event_handler.py | 6 ++--- .../attribute_monitor/_watchdog_extensions.py | 3 ++- .../modules/long_running_tasks.py | 15 +++++++----- .../modules/mounted_fs.py | 2 +- .../modules/nodeports.py | 20 +++++++++------- .../modules/outputs/_context.py | 4 ++-- .../modules/outputs/_event_filter.py | 6 ++--- .../modules/outputs/_event_handler.py | 6 ++--- .../modules/outputs/_manager.py | 3 ++- .../modules/outputs/_watchdog_extensions.py | 3 ++- .../modules/system_monitor/_notifier.py | 2 +- .../modules/system_monitor/_socketio.py | 2 +- .../modules/user_services_preferences/_db.py | 2 +- 25 files changed, 87 insertions(+), 70 deletions(-) diff --git a/.github/workflows/ci-testing-deploy.yml b/.github/workflows/ci-testing-deploy.yml index 25730b9db06..bf1d3ac11f7 100644 --- a/.github/workflows/ci-testing-deploy.yml +++ b/.github/workflows/ci-testing-deploy.yml @@ -1213,7 +1213,6 @@ jobs: run: ./ci/github/unit-testing/dynamic-sidecar.bash install - name: typecheck run: ./ci/github/unit-testing/dynamic-sidecar.bash typecheck - continue-on-error: true - name: test if: always() run: ./ci/github/unit-testing/dynamic-sidecar.bash test diff --git a/packages/service-library/src/servicelib/long_running_tasks/_task.py b/packages/service-library/src/servicelib/long_running_tasks/_task.py index 41652231c5e..88960cb6327 100644 --- a/packages/service-library/src/servicelib/long_running_tasks/_task.py +++ b/packages/service-library/src/servicelib/long_running_tasks/_task.py @@ -9,7 +9,10 @@ from typing import Any, Protocol from uuid import uuid4 -from models_library.api_schemas_long_running_tasks.base import ProgressPercent +from models_library.api_schemas_long_running_tasks.base import ( + ProgressPercent, + TaskProgress, +) from pydantic import PositiveFloat from ._errors import ( @@ -19,7 +22,7 @@ TaskNotCompletedError, TaskNotFoundError, ) -from ._models import TaskId, TaskName, TaskProgress, TaskResult, TaskStatus, TrackedTask +from ._models import TaskId, TaskName, TaskResult, TaskStatus, TrackedTask logger = logging.getLogger(__name__) @@ -349,8 +352,7 @@ async def close(self) -> None: class TaskProtocol(Protocol): - # NOTE: when using **kwargs pyright complains. this might be a bug that should be fixed soon - async def __call__(self, task_progress: TaskProgress, *task_kwargs: Any) -> Any: + async def __call__(self, progress: TaskProgress, *args: Any, **kwargs: Any) -> Any: ... @property @@ -366,7 +368,7 @@ def start_task( task_context: TaskContext | None = None, task_name: str | None = None, fire_and_forget: bool = False, - **task_kwargs, + **task_kwargs: Any, ) -> TaskId: """ Creates a background task from an async function. diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py index e78b5a6581f..f135b213bb6 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py @@ -29,6 +29,7 @@ PortLink, ) from .port_validation import validate_port_content +from .ports_mapping import PortKey log = logging.getLogger(__name__) @@ -68,7 +69,7 @@ class SetKWargs: class Port(BaseServiceIOModel): - key: str = Field(..., regex=PROPERTY_KEY_RE) + key: PortKey = Field(..., regex=PROPERTY_KEY_RE) widget: dict[str, Any] | None = None default_value: DataItemValue | None = Field(None, alias="defaultValue") diff --git a/services/dynamic-sidecar/requirements/_test.in b/services/dynamic-sidecar/requirements/_test.in index 87396c16b02..35f081991de 100644 --- a/services/dynamic-sidecar/requirements/_test.in +++ b/services/dynamic-sidecar/requirements/_test.in @@ -16,9 +16,9 @@ pytest-mock python-dotenv sqlalchemy[mypy] # adds Mypy / Pep-484 Support for ORM Mappings SEE https://docs.sqlalchemy.org/en/20/orm/extensions/mypy.html types_aiobotocore_s3 -types-aiofiles # missing mypy stubs -types-PyYAML # missing mypy stubs - +types-aiofiles +types-psutil +types-PyYAML # NOTE: What test client to use for fastapi-based apps? # diff --git a/services/dynamic-sidecar/requirements/_test.txt b/services/dynamic-sidecar/requirements/_test.txt index eed03ba966b..91b7297252f 100644 --- a/services/dynamic-sidecar/requirements/_test.txt +++ b/services/dynamic-sidecar/requirements/_test.txt @@ -142,6 +142,8 @@ types-aiobotocore-s3==2.13.1 # via -r requirements/_test.in types-aiofiles==24.1.0.20240626 # via -r requirements/_test.in +types-psutil==6.0.0.20240621 + # via -r requirements/_test.in types-pyyaml==6.0.12.20240311 # via -r requirements/_test.in typing-extensions==4.11.0 diff --git a/services/dynamic-sidecar/setup.cfg b/services/dynamic-sidecar/setup.cfg index 24947589554..9e0e0ca00a2 100644 --- a/services/dynamic-sidecar/setup.cfg +++ b/services/dynamic-sidecar/setup.cfg @@ -11,3 +11,7 @@ commit_args = --no-verify asyncio_mode = auto markers = testit: "marks test to run during development" + +[mypy] +plugins = + pydantic.mypy diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py index 2e2110b126a..8992f73f5d6 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py @@ -2,7 +2,7 @@ """ from asyncio import Lock -from typing import Annotated +from typing import Annotated, cast from fastapi import Depends, FastAPI, Request from fastapi.datastructures import State @@ -19,65 +19,65 @@ def get_application(request: Request) -> FastAPI: - return request.app + return cast(FastAPI, request.app) def get_app_state(request: Request) -> State: - return request.app.state + return cast(State, request.app.state) def get_application_health( app_state: Annotated[State, Depends(get_app_state)] ) -> ApplicationHealth: - return app_state.application_health # type: ignore + return cast(ApplicationHealth, app_state.application_health) def get_settings( app_state: Annotated[State, Depends(get_app_state)] ) -> ApplicationSettings: - return app_state.settings # type: ignore + return cast(ApplicationSettings, app_state.settings) def get_shared_store( app_state: Annotated[State, Depends(get_app_state)] ) -> SharedStore: - return app_state.shared_store # type: ignore + return cast(SharedStore, app_state.shared_store) def get_mounted_volumes( app_state: Annotated[State, Depends(get_app_state)] ) -> MountedVolumes: - return app_state.mounted_volumes # type: ignore + return cast(MountedVolumes, app_state.mounted_volumes) def get_container_restart_lock( app_state: Annotated[State, Depends(get_app_state)] ) -> Lock: - return app_state.container_restart_lock # type: ignore + return cast(Lock, app_state.container_restart_lock) def get_outputs_manager( app_state: Annotated[State, Depends(get_app_state)] ) -> OutputsManager: - return app_state.outputs_manager # type: ignore + return cast(OutputsManager, app_state.outputs_manager) def get_outputs_context( app_state: Annotated[State, Depends(get_app_state)] ) -> OutputsContext: - return app_state.outputs_context # type: ignore + return cast(OutputsContext, app_state.outputs_context) def get_inputs_state( app_state: Annotated[State, Depends(get_app_state)] ) -> InputsState: - return app_state.inputs_state # type: ignore + return cast(InputsState, app_state.inputs_state) def get_user_services_metrics( app_state: Annotated[State, Depends(get_app_state)] ) -> UserServicesMetrics: - return app_state.user_service_metrics # type: ignore + return cast(UserServicesMetrics, app_state.user_service_metrics) def get_rabbitmq_client( diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py index 68cb801c9bc..59d77e1b309 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py @@ -12,7 +12,6 @@ ActivityInfo, ActivityInfoOrNone, ) -from pydantic import parse_raw_as from servicelib.fastapi.requests_decorators import cancel_on_disconnect from ..core.docker_utils import docker_client @@ -106,7 +105,6 @@ async def get_containers_activity( container_name = inactivity_command.service - inactivity_response: str | None = None try: inactivity_response = await run_command_in_container( shared_store.original_to_container_names[inactivity_command.service], @@ -127,7 +125,7 @@ async def get_containers_activity( return ActivityInfo(seconds_inactive=_INACTIVE_FOR_LONG_TIME) try: - return parse_raw_as(ActivityInfo, inactivity_response) + return ActivityInfo.parse_raw(inactivity_response) except json.JSONDecodeError: _logger.warning( "Could not parse command result '%s' as '%s'", diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py index 25b9cb88344..0b5c0aae0fb 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py @@ -1,5 +1,5 @@ from textwrap import dedent -from typing import Annotated +from typing import Annotated, cast from fastapi import APIRouter, Depends, FastAPI, Request, status from servicelib.fastapi.long_running_tasks.server import ( @@ -83,7 +83,7 @@ async def create_service_containers_task( # pylint: disable=too-many-arguments application_health=application_health, ) except TaskAlreadyRunningError as e: - return e.managed_task.task_id # pylint: disable=no-member + return cast(str, e.managed_task.task_id) # type: ignore[attr-defined] # pylint:disable=no-member @router.post( @@ -112,7 +112,7 @@ async def runs_docker_compose_down_task( settings=settings, ) except TaskAlreadyRunningError as e: - return e.managed_task.task_id # pylint: disable=no-member + return cast(str, e.managed_task.task_id) # type: ignore[attr-defined] # pylint:disable=no-member @router.post( @@ -141,7 +141,7 @@ async def state_restore_task( app=app, ) except TaskAlreadyRunningError as e: - return e.managed_task.task_id # pylint: disable=no-member + return cast(str, e.managed_task.task_id) # type: ignore[attr-defined] # pylint:disable=no-member @router.post( @@ -170,7 +170,7 @@ async def state_save_task( app=app, ) except TaskAlreadyRunningError as e: - return e.managed_task.task_id # pylint: disable=no-member + return cast(str, e.managed_task.task_id) # type: ignore[attr-defined] # pylint:disable=no-member @router.post( @@ -201,7 +201,7 @@ async def ports_inputs_pull_task( inputs_pulling_enabled=inputs_state.inputs_pulling_enabled, ) except TaskAlreadyRunningError as e: - return e.managed_task.task_id # pylint: disable=no-member + return cast(str, e.managed_task.task_id) # type: ignore[attr-defined] # pylint:disable=no-member @router.post( @@ -230,7 +230,7 @@ async def ports_outputs_pull_task( app=app, ) except TaskAlreadyRunningError as e: - return e.managed_task.task_id # pylint: disable=no-member + return cast(str, e.managed_task.task_id) # type: ignore[attr-defined] # pylint:disable=no-member @router.post( @@ -257,7 +257,7 @@ async def ports_outputs_push_task( app=app, ) except TaskAlreadyRunningError as e: - return e.managed_task.task_id # pylint: disable=no-member + return cast(str, e.managed_task.task_id) # type: ignore[attr-defined] # pylint:disable=no-member @router.post( @@ -286,4 +286,4 @@ async def containers_restart_task( shared_store=shared_store, ) except TaskAlreadyRunningError as e: - return e.managed_task.task_id # pylint: disable=no-member + return cast(str, e.managed_task.task_id) # type: ignore[attr-defined] # pylint:disable=no-member diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py index ab12303fd85..7804d3de35c 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py @@ -1,7 +1,7 @@ import logging from collections.abc import AsyncGenerator, Iterable from contextlib import asynccontextmanager -from typing import Any +from typing import Any, cast import aiodocker import yaml @@ -59,7 +59,7 @@ async def _get_container( docker: aiodocker.Docker, container_name: str ) -> DockerContainer | None: try: - return await docker.containers.get(container_name) + return cast(DockerContainer, await docker.containers.get(container_name)) except aiodocker.DockerError as e: if e.status == http_status.HTTP_404_NOT_FOUND: return None diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/registry.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/registry.py index c14016e0baf..ba483b53ba3 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/registry.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/registry.py @@ -3,6 +3,7 @@ import json import logging from pathlib import Path +from typing import Any import httpx from fastapi import FastAPI @@ -25,7 +26,7 @@ def _get_registry_url(registry_settings: RegistrySettings) -> str: async def _is_registry_reachable(registry_settings: RegistrySettings) -> None: async with httpx.AsyncClient(timeout=5) as client: - params = {} + params: dict[str, Any] = {} if registry_settings.REGISTRY_AUTH: params["auth"] = ( registry_settings.REGISTRY_USER, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py index a925bb1c5ee..0d635524adf 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/settings.py @@ -8,7 +8,7 @@ from models_library.callbacks_mapping import CallbacksMapping from models_library.products import ProductName from models_library.projects import ProjectID -from models_library.projects_nodes import NodeID +from models_library.projects_nodes_io import NodeID from models_library.services import DynamicServiceKey, RunID, ServiceVersion from models_library.users import UserID from pydantic import ByteSize, Field, PositiveInt, parse_obj_as, validator diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/attribute_monitor/_logging_event_handler.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/attribute_monitor/_logging_event_handler.py index 51b3f7e4f8f..527fa32f3ea 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/attribute_monitor/_logging_event_handler.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/attribute_monitor/_logging_event_handler.py @@ -11,9 +11,9 @@ from time import sleep as blocking_sleep from typing import Final -import aioprocessing -from aioprocessing.process import AioProcess -from aioprocessing.queues import AioQueue +import aioprocessing # type: ignore[import-untyped] +from aioprocessing.process import AioProcess # type: ignore[import-untyped] +from aioprocessing.queues import AioQueue # type: ignore[import-untyped] from pydantic import ByteSize, PositiveFloat from servicelib.logging_utils import log_context from watchdog.events import FileSystemEvent diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/attribute_monitor/_watchdog_extensions.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/attribute_monitor/_watchdog_extensions.py index 55da4bee573..5925e7d7fe2 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/attribute_monitor/_watchdog_extensions.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/attribute_monitor/_watchdog_extensions.py @@ -5,7 +5,8 @@ from servicelib.logging_utils import log_catch from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers.api import DEFAULT_OBSERVER_TIMEOUT, BaseObserver -from watchdog.observers.inotify import InotifyBuffer, InotifyEmitter +from watchdog.observers.inotify import InotifyEmitter +from watchdog.observers.inotify_buffer import InotifyBuffer from watchdog.observers.inotify_c import Inotify, InotifyConstants from watchdog.utils import BaseThread from watchdog.utils.delayed_queue import DelayedQueue diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 592ce9c39c3..aba30a07663 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -6,11 +6,14 @@ from typing import Final from fastapi import FastAPI -from models_library.api_schemas_long_running_tasks.base import ProgressPercent +from models_library.api_schemas_long_running_tasks.base import ( + ProgressPercent, + TaskProgress, +) +from models_library.basic_types import IDStr from models_library.generated_models.docker_rest_api import ContainerState from models_library.rabbitmq_messages import ProgressType, SimcorePlatformStatus from pydantic import PositiveInt -from servicelib.fastapi.long_running_tasks.server import TaskProgress from servicelib.file_utils import log_directory_changes from servicelib.logging_utils import log_context from servicelib.progress_bar import ProgressBarData @@ -369,7 +372,7 @@ async def task_restore_state( app, ProgressType.SERVICE_STATE_PULLING, ), - description="pulling states", + description=IDStr("pulling states"), ) as root_progress: await logged_gather( *( @@ -430,7 +433,7 @@ async def task_save_state( app, ProgressType.SERVICE_STATE_PUSHING, ), - description="pushing state", + description=IDStr("pushing state"), ) as root_progress: await logged_gather( *[ @@ -475,7 +478,7 @@ async def task_ports_inputs_pull( app, ProgressType.SERVICE_INPUTS_PULLING, ), - description="pulling inputs", + description=IDStr("pulling inputs"), ) as root_progress: with log_directory_changes( mounted_volumes.disk_inputs_path, _logger, logging.INFO @@ -514,7 +517,7 @@ async def task_ports_outputs_pull( app, ProgressType.SERVICE_OUTPUTS_PULLING, ), - description="pulling outputs", + description=IDStr("pulling outputs"), ) as root_progress: transferred_bytes = await nodeports.download_target_ports( nodeports.PortTypeName.OUTPUTS, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py index bd7ab8b716b..b07bfd87bc5 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/mounted_fs.py @@ -4,7 +4,7 @@ from pathlib import Path from fastapi import FastAPI -from models_library.projects_nodes import NodeID +from models_library.projects_nodes_io import NodeID from models_library.services import RunID from servicelib.docker_constants import PREFIX_DYNAMIC_SIDECAR_VOLUMES diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index 44f8ad9bfab..ccbac40d8e9 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -8,11 +8,12 @@ from contextlib import AsyncExitStack from enum import Enum from pathlib import Path -from typing import Coroutine, Optional, cast +from typing import Coroutine, cast import aiofiles.os import magic from aiofiles.tempfile import TemporaryDirectory as AioTemporaryDirectory +from models_library.basic_types import IDStr from models_library.projects import ProjectIDStr from models_library.projects_nodes_io import NodeIDStr from pydantic import ByteSize @@ -24,10 +25,12 @@ from servicelib.utils import logged_gather from simcore_sdk import node_ports_v2 from simcore_sdk.node_ports_common.file_io_utils import LogRedirectCB -from simcore_sdk.node_ports_v2 import Nodeports, Port +from simcore_sdk.node_ports_v2 import Port from simcore_sdk.node_ports_v2.links import ItemConcreteValue +from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports from simcore_sdk.node_ports_v2.port import SetKWargs from simcore_sdk.node_ports_v2.port_utils import is_file_type +from simcore_sdk.node_ports_v2.ports_mapping import PortKey from ..core.settings import ApplicationSettings, get_settings @@ -45,7 +48,7 @@ class PortTypeName(str, Enum): # OUTPUTS section -def _get_size_of_value(value: ItemConcreteValue | None) -> int: +def _get_size_of_value(value: tuple[ItemConcreteValue | None, SetKWargs | None]) -> int: if value is None: return 0 if isinstance(value, Path): @@ -88,7 +91,7 @@ async def upload_outputs( ) # let's gather the tasks - ports_values: dict[str, tuple[ItemConcreteValue | None, SetKWargs | None]] = {} + ports_values: dict[PortKey, tuple[ItemConcreteValue | None, SetKWargs | None]] = {} archiving_tasks: deque[Coroutine[None, None, None]] = deque() ports_to_set = [ port_value @@ -103,7 +106,7 @@ async def upload_outputs( 2 if is_file_type(port.property_type) else 1 for port in ports_to_set ), - description="uploading outputs", + description=IDStr("uploading outputs"), ) ) for port in ports_to_set: @@ -194,14 +197,15 @@ async def _get_data_from_port( port: Port, *, target_dir: Path, progress_bar: ProgressBarData ) -> tuple[Port, ItemConcreteValue | None, ByteSize]: async with progress_bar.sub_progress( - steps=2 if is_file_type(port.property_type) else 1, description="getting data" + steps=2 if is_file_type(port.property_type) else 1, + description=IDStr("getting data"), ) as sub_progress: with log_context(logger, logging.DEBUG, f"getting {port.key=}"): port_data = await port.get(sub_progress) if is_file_type(port.property_type): # if there are files, move them to the final destination - downloaded_file: Path | None = cast(Optional[Path], port_data) + downloaded_file: Path | None = cast(Path | None, port_data) final_path: Path = target_dir / port.key if not downloaded_file or not downloaded_file.exists(): @@ -276,7 +280,7 @@ async def download_target_ports( if (not port_keys) or (port_value.key in port_keys) ] async with progress_bar.sub_progress( - steps=len(ports_to_get), description="downloading" + steps=len(ports_to_get), description=IDStr("downloading") ) as sub_progress: results = await logged_gather( *[ diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_context.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_context.py index 3cfb135726c..e05bb899cba 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_context.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_context.py @@ -1,8 +1,8 @@ from dataclasses import dataclass, field from pathlib import Path -import aioprocessing -from aioprocessing.queues import AioQueue +import aioprocessing # type: ignore[import-untyped] +from aioprocessing.queues import AioQueue # type: ignore[import-untyped] from fastapi import FastAPI from ..mounted_fs import MountedVolumes diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_filter.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_filter.py index e7fb25c7317..8490f9cd72e 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_filter.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_filter.py @@ -6,7 +6,7 @@ from concurrent.futures import ThreadPoolExecutor from contextlib import suppress from dataclasses import dataclass -from typing import Final, Optional +from typing import Final, TypeAlias from pydantic import ( ByteSize, @@ -22,7 +22,7 @@ from ._directory_utils import get_directory_total_size from ._manager import OutputsManager -PortEvent = Optional[str] +PortEvent: TypeAlias = str | None logger = logging.getLogger(__name__) @@ -33,7 +33,7 @@ class BaseDelayPolicy(ABC): def get_min_interval(self) -> NonNegativeFloat: # pylint:disable=no-self-use - return DEFAULT_OBSERVER_TIMEOUT # type: ignore + return DEFAULT_OBSERVER_TIMEOUT @abstractmethod def get_wait_interval(self, dir_size: NonNegativeInt) -> NonNegativeFloat: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py index 0256d482e5b..784db423ed6 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_event_handler.py @@ -11,9 +11,9 @@ from time import sleep as blocking_sleep from typing import Any, Final -import aioprocessing -from aioprocessing.process import AioProcess -from aioprocessing.queues import AioQueue +import aioprocessing # type: ignore [import-untyped] +from aioprocessing.process import AioProcess # type: ignore [import-untyped] +from aioprocessing.queues import AioQueue # type: ignore [import-untyped] from pydantic import PositiveFloat from servicelib.logging_utils import log_context from watchdog.events import FileSystemEvent diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py index 7e4a856a76e..307f8b3d933 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_manager.py @@ -7,6 +7,7 @@ from functools import partial from fastapi import FastAPI +from models_library.basic_types import IDStr from models_library.rabbitmq_messages import ProgressType from pydantic import PositiveFloat from pydantic.errors import PydanticErrorMixin @@ -130,7 +131,7 @@ async def _upload_ports() -> None: async with progress_bar.ProgressBarData( num_steps=1, progress_report_cb=self.task_progress_cb, - description="uploading ports", + description=IDStr("uploading ports"), ) as root_progress: await upload_outputs( outputs_path=self.outputs_context.outputs_path, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py index 78c416b9bef..6d6917d4e15 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/outputs/_watchdog_extensions.py @@ -7,7 +7,8 @@ from servicelib.logging_utils import log_catch from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers.api import DEFAULT_OBSERVER_TIMEOUT, BaseObserver -from watchdog.observers.inotify import InotifyBuffer, InotifyEmitter +from watchdog.observers.inotify import InotifyEmitter +from watchdog.observers.inotify_buffer import InotifyBuffer from watchdog.observers.inotify_c import Inotify, InotifyConstants from watchdog.utils import BaseThread from watchdog.utils.delayed_queue import DelayedQueue diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py index 3eb6c8fcc50..9f97a889bac 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_notifier.py @@ -1,7 +1,7 @@ import contextlib from pathlib import Path -import socketio +import socketio # type: ignore[import-untyped] from fastapi import FastAPI from fastapi.encoders import jsonable_encoder from models_library.api_schemas_dynamic_sidecar.socketio import ( diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py index 0bb2f808059..bdbe9808a8a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/system_monitor/_socketio.py @@ -1,6 +1,6 @@ import logging -import socketio +import socketio # type: ignore[import-untyped] from fastapi import FastAPI from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/user_services_preferences/_db.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/user_services_preferences/_db.py index eafede22078..f2cc53a0d9d 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/user_services_preferences/_db.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/user_services_preferences/_db.py @@ -1,6 +1,6 @@ from pathlib import Path -import umsgpack +import umsgpack # type: ignore[import-untyped] from models_library.products import ProductName from models_library.services import ServiceKey, ServiceVersion from models_library.user_preferences import PreferenceName From e1c869c2dae02af0cf38ae0e41e9bfa7cb3a5df5 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 1 Aug 2024 13:01:20 +0200 Subject: [PATCH 02/10] fixed cirular dependency --- .../simcore-sdk/src/simcore_sdk/node_ports_v2/port.py | 2 +- .../src/simcore_sdk/node_ports_v2/ports_mapping.py | 9 ++------- .../simcore-sdk/src/simcore_sdk/node_ports_v2/types.py | 2 ++ .../simcore_service_dynamic_sidecar/modules/nodeports.py | 5 +++-- 4 files changed, 8 insertions(+), 10 deletions(-) create mode 100644 packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py index f135b213bb6..cf305aa14fb 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py @@ -29,7 +29,7 @@ PortLink, ) from .port_validation import validate_port_content -from .ports_mapping import PortKey +from .types import PortKey log = logging.getLogger(__name__) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py index 3f34ee06b2c..93dba9d9da8 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py @@ -1,15 +1,10 @@ -import re from collections.abc import ItemsView, Iterator, KeysView, ValuesView -from models_library.basic_regex import PROPERTY_KEY_RE -from pydantic import BaseModel, ConstrainedStr +from pydantic import BaseModel from ..node_ports_common.exceptions import UnboundPortError from .port import Port - - -class PortKey(ConstrainedStr): - regex = re.compile(PROPERTY_KEY_RE) +from .types import PortKey class BasePortsMapping(BaseModel): diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py new file mode 100644 index 00000000000..06b06aad209 --- /dev/null +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py @@ -0,0 +1,2 @@ +class PortKey(ConstrainedStr): + regex = re.compile(PROPERTY_KEY_RE) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index ccbac40d8e9..6519d4efd7b 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -5,10 +5,11 @@ import sys import time from collections import deque +from collections.abc import Coroutine from contextlib import AsyncExitStack from enum import Enum from pathlib import Path -from typing import Coroutine, cast +from typing import cast import aiofiles.os import magic @@ -30,7 +31,7 @@ from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports from simcore_sdk.node_ports_v2.port import SetKWargs from simcore_sdk.node_ports_v2.port_utils import is_file_type -from simcore_sdk.node_ports_v2.ports_mapping import PortKey +from simcore_sdk.node_ports_v2.types import PortKey from ..core.settings import ApplicationSettings, get_settings From fbcd1722eceec7625cd44954213feead3682bd5e Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 1 Aug 2024 13:08:04 +0200 Subject: [PATCH 03/10] refactor --- packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py | 3 +-- packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py | 6 ++++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py index cf305aa14fb..0a89046f9d5 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py @@ -7,7 +7,6 @@ from typing import Any from models_library.api_schemas_storage import LinkType -from models_library.basic_regex import PROPERTY_KEY_RE from models_library.basic_types import IDStr from models_library.services_io import BaseServiceIOModel from pydantic import AnyUrl, Field, PrivateAttr, ValidationError, validator @@ -69,7 +68,7 @@ class SetKWargs: class Port(BaseServiceIOModel): - key: PortKey = Field(..., regex=PROPERTY_KEY_RE) + key: PortKey widget: dict[str, Any] | None = None default_value: DataItemValue | None = Field(None, alias="defaultValue") diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py index 06b06aad209..de7f27e7b95 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py @@ -1,2 +1,8 @@ +import re + +from models_library.basic_regex import PROPERTY_KEY_RE +from pydantic import ConstrainedStr + + class PortKey(ConstrainedStr): regex = re.compile(PROPERTY_KEY_RE) From 6a9a5f63a8ff18a035edf8b73ba156003768eb8c Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 1 Aug 2024 13:23:04 +0200 Subject: [PATCH 04/10] refactor --- .../simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index 8418a006b42..5299de9a6e0 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -20,6 +20,7 @@ from .links import ItemConcreteValue, ItemValue from .port_utils import is_file_type from .ports_mapping import InputsList, OutputsList, PortKey +from .types import PortKey log = logging.getLogger(__name__) From d4a4455b0e2a86437f08d656bf5cbc8171a44867 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 1 Aug 2024 13:23:47 +0200 Subject: [PATCH 05/10] fixed import --- .../src/simcore_sdk/node_ports_v2/nodeports_v2.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index 5299de9a6e0..0ef18661071 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -1,6 +1,7 @@ import logging +from collections.abc import Callable, Coroutine from pathlib import Path -from typing import Any, Callable, Coroutine +from typing import Any from models_library.api_schemas_storage import LinkType from models_library.basic_types import IDStr @@ -19,7 +20,7 @@ from ..node_ports_v2.port import SetKWargs from .links import ItemConcreteValue, ItemValue from .port_utils import is_file_type -from .ports_mapping import InputsList, OutputsList, PortKey +from .ports_mapping import InputsList, OutputsList from .types import PortKey log = logging.getLogger(__name__) From 2b9d348f8f9f837aa9d696f686c34f621851d046 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 1 Aug 2024 15:09:02 +0200 Subject: [PATCH 06/10] revert --- .../src/simcore_service_dynamic_sidecar/api/containers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py index 59d77e1b309..507540688a6 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers.py @@ -12,6 +12,7 @@ ActivityInfo, ActivityInfoOrNone, ) +from pydantic import parse_raw_as from servicelib.fastapi.requests_decorators import cancel_on_disconnect from ..core.docker_utils import docker_client @@ -125,7 +126,7 @@ async def get_containers_activity( return ActivityInfo(seconds_inactive=_INACTIVE_FOR_LONG_TIME) try: - return ActivityInfo.parse_raw(inactivity_response) + return parse_raw_as(ActivityInfo, inactivity_response) except json.JSONDecodeError: _logger.warning( "Could not parse command result '%s' as '%s'", From 60bbe5906e4dab1087eead2f3d8b38f0c68255c1 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 8 Aug 2024 08:40:04 +0200 Subject: [PATCH 07/10] usign ServicePortKey instead of PortKey --- .../simcore_sdk/node_ports_v2/nodeports_v2.py | 14 +++-- .../src/simcore_sdk/node_ports_v2/port.py | 4 +- .../node_ports_v2/ports_mapping.py | 12 ++-- .../src/simcore_sdk/node_ports_v2/types.py | 8 --- .../test_node_ports_v2_nodeports2.py | 58 ++++++++++--------- 5 files changed, 48 insertions(+), 48 deletions(-) delete mode 100644 packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py index bb57a98fe15..8c78e28a066 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py @@ -7,6 +7,7 @@ from models_library.basic_types import IDStr from models_library.projects import ProjectIDStr from models_library.projects_nodes_io import NodeIDStr +from models_library.services_types import ServicePortKey from models_library.users import UserID from pydantic import BaseModel, Field, ValidationError from pydantic.error_wrappers import flatten_errors @@ -22,7 +23,6 @@ from .links import ItemConcreteValue, ItemValue from .port_utils import is_file_type from .ports_mapping import InputsList, OutputsList -from .types import PortKey log = logging.getLogger(__name__) @@ -76,7 +76,7 @@ async def outputs(self) -> OutputsList: return self.internal_outputs async def get_value_link( - self, item_key: PortKey, *, file_link_type: LinkType + self, item_key: ServicePortKey, *, file_link_type: LinkType ) -> ItemValue | None: try: return await (await self.inputs)[item_key].get_value( @@ -91,7 +91,7 @@ async def get_value_link( ) async def get( - self, item_key: PortKey, progress_bar: ProgressBarData | None = None + self, item_key: ServicePortKey, progress_bar: ProgressBarData | None = None ) -> ItemConcreteValue | None: try: return await (await self.inputs)[item_key].get(progress_bar) @@ -101,7 +101,9 @@ async def get( # if this fails it will raise an exception return await (await self.outputs)[item_key].get(progress_bar) - async def set(self, item_key: PortKey, item_value: ItemConcreteValue) -> None: + async def set( + self, item_key: ServicePortKey, item_value: ItemConcreteValue + ) -> None: # first try to set the inputs. try: the_updated_inputs = await self.inputs @@ -141,7 +143,9 @@ async def _auto_update_from_db(self) -> None: async def set_multiple( self, - port_values: dict[PortKey, tuple[ItemConcreteValue | None, SetKWargs | None]], + port_values: dict[ + ServicePortKey, tuple[ItemConcreteValue | None, SetKWargs | None] + ], *, progress_bar: ProgressBarData, ) -> None: diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py index 7ce9b862727..2338563dcdb 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/port.py @@ -9,6 +9,7 @@ from models_library.api_schemas_storage import LinkType from models_library.basic_types import IDStr from models_library.services_io import BaseServiceIOModel +from models_library.services_types import ServicePortKey from pydantic import AnyUrl, Field, PrivateAttr, ValidationError, validator from pydantic.tools import parse_obj_as from servicelib.progress_bar import ProgressBarData @@ -28,7 +29,6 @@ PortLink, ) from .port_validation import validate_port_content -from .types import PortKey log = logging.getLogger(__name__) @@ -68,7 +68,7 @@ class SetKWargs: class Port(BaseServiceIOModel): - key: PortKey + key: ServicePortKey widget: dict[str, Any] | None = None default_value: DataItemValue | None = Field(None, alias="defaultValue") diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py index 93dba9d9da8..2855e8a253e 100644 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py +++ b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/ports_mapping.py @@ -1,16 +1,16 @@ from collections.abc import ItemsView, Iterator, KeysView, ValuesView +from models_library.services_types import ServicePortKey from pydantic import BaseModel from ..node_ports_common.exceptions import UnboundPortError from .port import Port -from .types import PortKey class BasePortsMapping(BaseModel): - __root__: dict[PortKey, Port] + __root__: dict[ServicePortKey, Port] - def __getitem__(self, key: int | PortKey) -> Port: + def __getitem__(self, key: int | ServicePortKey) -> Port: if isinstance(key, int): if key < len(self.__root__): key = list(self.__root__.keys())[key] @@ -19,13 +19,13 @@ def __getitem__(self, key: int | PortKey) -> Port: assert isinstance(key, str) # nosec return self.__root__[key] - def __iter__(self) -> Iterator[PortKey]: # type: ignore + def __iter__(self) -> Iterator[ServicePortKey]: # type: ignore return iter(self.__root__) - def keys(self) -> KeysView[PortKey]: + def keys(self) -> KeysView[ServicePortKey]: return self.__root__.keys() - def items(self) -> ItemsView[PortKey, Port]: + def items(self) -> ItemsView[ServicePortKey, Port]: return self.__root__.items() def values(self) -> ValuesView[Port]: diff --git a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py b/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py deleted file mode 100644 index de7f27e7b95..00000000000 --- a/packages/simcore-sdk/src/simcore_sdk/node_ports_v2/types.py +++ /dev/null @@ -1,8 +0,0 @@ -import re - -from models_library.basic_regex import PROPERTY_KEY_RE -from pydantic import ConstrainedStr - - -class PortKey(ConstrainedStr): - regex = re.compile(PROPERTY_KEY_RE) diff --git a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py index 754c5812322..6f93900f450 100644 --- a/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py +++ b/packages/simcore-sdk/tests/integration/test_node_ports_v2_nodeports2.py @@ -28,6 +28,7 @@ NodeIDStr, SimcoreS3FileID, ) +from models_library.services_types import ServicePortKey from servicelib.progress_bar import ProgressBarData from settings_library.r_clone import RCloneSettings from simcore_sdk import node_ports_v2 @@ -36,7 +37,6 @@ from simcore_sdk.node_ports_v2.links import ItemConcreteValue, PortLink from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports from simcore_sdk.node_ports_v2.port import Port -from simcore_sdk.node_ports_v2.ports_mapping import PortKey pytest_simcore_core_services_selection = [ "migration", @@ -224,7 +224,7 @@ async def test_port_value_accessors( item_pytype: type, option_r_clone_settings: RCloneSettings | None, ): # pylint: disable=W0613, W0621 - item_key = PortKey("some_key") + item_key = ServicePortKey("some_key") config_dict, _, _ = create_special_configuration( inputs=[(item_key, item_type, item_value)], outputs=[(item_key, item_type, None)], @@ -298,15 +298,15 @@ async def test_port_file_accessors( ) await check_config_valid(PORTS, config_dict) assert ( - await (await PORTS.outputs)[PortKey("out_34")].get() is None + await (await PORTS.outputs)[ServicePortKey("out_34")].get() is None ) # check emptyness with pytest.raises(exceptions.S3InvalidPathError): - await (await PORTS.inputs)[PortKey("in_1")].get() + await (await PORTS.inputs)[ServicePortKey("in_1")].get() # this triggers an upload to S3 + configuration change - await (await PORTS.outputs)[PortKey("out_34")].set(item_value) + await (await PORTS.outputs)[ServicePortKey("out_34")].set(item_value) # this is the link to S3 storage - value = (await PORTS.outputs)[PortKey("out_34")].value + value = (await PORTS.outputs)[ServicePortKey("out_34")].value assert isinstance(value, (DownloadLink, PortLink, BaseFileLink)) received_file_link = value.dict(by_alias=True, exclude_unset=True) assert received_file_link["store"] == s3_simcore_location @@ -320,11 +320,13 @@ async def test_port_file_accessors( assert received_file_link["eTag"] # this triggers a download from S3 to a location in /tempdir/simcorefiles/item_key - assert isinstance(await (await PORTS.outputs)[PortKey("out_34")].get(), item_pytype) - downloaded_file = await (await PORTS.outputs)[PortKey("out_34")].get() + assert isinstance( + await (await PORTS.outputs)[ServicePortKey("out_34")].get(), item_pytype + ) + downloaded_file = await (await PORTS.outputs)[ServicePortKey("out_34")].get() assert isinstance(downloaded_file, Path) assert downloaded_file.exists() - assert str(await (await PORTS.outputs)[PortKey("out_34")].get()).startswith( + assert str(await (await PORTS.outputs)[ServicePortKey("out_34")].get()).startswith( str( Path( tempfile.gettempdir(), @@ -469,9 +471,9 @@ async def test_get_value_from_previous_node( ) await check_config_valid(PORTS, config_dict) - input_value = await (await PORTS.inputs)[PortKey("in_15")].get() + input_value = await (await PORTS.inputs)[ServicePortKey("in_15")].get() assert isinstance(input_value, item_pytype) - assert await (await PORTS.inputs)[PortKey("in_15")].get() == item_value + assert await (await PORTS.inputs)[ServicePortKey("in_15")].get() == item_value @pytest.mark.parametrize( @@ -512,7 +514,7 @@ async def test_get_file_from_previous_node( r_clone_settings=option_r_clone_settings, ) await check_config_valid(PORTS, config_dict) - file_path = await (await PORTS.inputs)[PortKey("in_15")].get() + file_path = await (await PORTS.inputs)[ServicePortKey("in_15")].get() assert isinstance(file_path, item_pytype) assert file_path == Path( tempfile.gettempdir(), @@ -572,7 +574,7 @@ async def test_get_file_from_previous_node_with_mapping_of_same_key_name( postgres_db, project_id, this_node_uuid, config_dict ) # pylint: disable=E1101 await check_config_valid(PORTS, config_dict) - file_path = await (await PORTS.inputs)[PortKey("in_15")].get() + file_path = await (await PORTS.inputs)[ServicePortKey("in_15")].get() assert isinstance(file_path, item_pytype) assert file_path == Path( tempfile.gettempdir(), @@ -631,7 +633,7 @@ async def test_file_mapping( postgres_db, project_id, node_uuid, config_dict ) # pylint: disable=E1101 await check_config_valid(PORTS, config_dict) - file_path = await (await PORTS.inputs)[PortKey("in_1")].get() + file_path = await (await PORTS.inputs)[ServicePortKey("in_1")].get() assert isinstance(file_path, item_pytype) assert file_path == Path( tempfile.gettempdir(), @@ -642,7 +644,7 @@ async def test_file_mapping( ) # let's get it a second time to see if replacing works - file_path = await (await PORTS.inputs)[PortKey("in_1")].get() + file_path = await (await PORTS.inputs)[ServicePortKey("in_1")].get() assert isinstance(file_path, item_pytype) assert file_path == Path( tempfile.gettempdir(), @@ -659,7 +661,7 @@ async def test_file_mapping( assert isinstance(file_path, Path) await PORTS.set_file_by_keymap(file_path) file_id = create_valid_file_uuid("out_1", file_path) - value = (await PORTS.outputs)[PortKey("out_1")].value + value = (await PORTS.outputs)[ServicePortKey("out_1")].value assert isinstance(value, (DownloadLink, PortLink, BaseFileLink)) received_file_link = value.dict(by_alias=True, exclude_unset=True) assert received_file_link["store"] == s3_simcore_location @@ -714,13 +716,15 @@ async def test_regression_concurrent_port_update_fails( # when writing in serial these are expected to work for item_key, _, _ in outputs: - await (await PORTS.outputs)[PortKey(item_key)].set(int_item_value) - assert (await PORTS.outputs)[PortKey(item_key)].value == int_item_value + await (await PORTS.outputs)[ServicePortKey(item_key)].set(int_item_value) + assert (await PORTS.outputs)[ServicePortKey(item_key)].value == int_item_value # when writing in parallel and reading back, # they fail, with enough concurrency async def _upload_create_task(item_key: str) -> None: - await (await PORTS.outputs)[PortKey(item_key)].set(parallel_int_item_value) + await (await PORTS.outputs)[ServicePortKey(item_key)].set( + parallel_int_item_value + ) # updating in parallel creates a race condition results = await gather( @@ -733,7 +737,7 @@ async def _upload_create_task(item_key: str) -> None: with pytest.raises(AssertionError) as exc_info: for item_key, _, _ in outputs: assert (await PORTS.outputs)[ - PortKey(item_key) + ServicePortKey(item_key) ].value == parallel_int_item_value assert exc_info.value.args[0].startswith( @@ -765,7 +769,7 @@ async def test_batch_update_inputs_outputs( async with ProgressBarData(num_steps=2, description=faker.pystr()) as progress_bar: await PORTS.set_multiple( { - PortKey(port.key): (k, None) + ServicePortKey(port.key): (k, None) for k, port in enumerate((await PORTS.outputs).values()) }, progress_bar=progress_bar, @@ -774,7 +778,7 @@ async def test_batch_update_inputs_outputs( assert progress_bar._current_steps == pytest.approx(1) # noqa: SLF001 await PORTS.set_multiple( { - PortKey(port.key): (k, None) + ServicePortKey(port.key): (k, None) for k, port in enumerate((await PORTS.inputs).values(), start=1000) }, progress_bar=progress_bar, @@ -785,18 +789,18 @@ async def test_batch_update_inputs_outputs( ports_inputs = await PORTS.inputs for k, asd in enumerate(outputs): item_key, _, _ = asd - assert ports_outputs[PortKey(item_key)].value == k - assert await ports_outputs[PortKey(item_key)].get() == k + assert ports_outputs[ServicePortKey(item_key)].value == k + assert await ports_outputs[ServicePortKey(item_key)].get() == k for k, asd in enumerate(inputs, start=1000): item_key, _, _ = asd - assert ports_inputs[PortKey(item_key)].value == k - assert await ports_inputs[PortKey(item_key)].get() == k + assert ports_inputs[ServicePortKey(item_key)].value == k + assert await ports_inputs[ServicePortKey(item_key)].get() == k # test missing key raises error async with ProgressBarData(num_steps=1, description=faker.pystr()) as progress_bar: with pytest.raises(UnboundPortError): await PORTS.set_multiple( - {PortKey("missing_key_in_both"): (123132, None)}, + {ServicePortKey("missing_key_in_both"): (123132, None)}, progress_bar=progress_bar, ) From b47def026c2628743f6112a96c70f3829e9810bd Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 8 Aug 2024 08:42:40 +0200 Subject: [PATCH 08/10] @sanderegg reusing ServicePortKey instead of redefined PortKey --- .../dask_task_models_library/container_tasks/io.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py index 0c4b560923f..887397d4227 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/io.py @@ -3,12 +3,12 @@ from pathlib import Path from typing import Any, ClassVar, TypeAlias, Union -from models_library.basic_regex import MIME_TYPE_RE, PROPERTY_KEY_RE +from models_library.basic_regex import MIME_TYPE_RE from models_library.generics import DictModel +from models_library.services_types import ServicePortKey from pydantic import ( AnyUrl, BaseModel, - ConstrainedStr, Extra, Field, StrictBool, @@ -81,10 +81,6 @@ class Config: } -class PortKey(ConstrainedStr): - regex = PROPERTY_KEY_RE - - PortValue: TypeAlias = Union[ StrictBool, StrictInt, @@ -97,7 +93,7 @@ class PortKey(ConstrainedStr): ] -class TaskInputData(DictModel[PortKey, PortValue]): +class TaskInputData(DictModel[ServicePortKey, PortValue]): class Config: schema_extra: ClassVar[dict[str, Any]] = { "examples": [ @@ -115,7 +111,7 @@ class Config: PortSchemaValue: TypeAlias = Union[PortSchema, FilePortSchema] -class TaskOutputDataSchema(DictModel[PortKey, PortSchemaValue]): +class TaskOutputDataSchema(DictModel[ServicePortKey, PortSchemaValue]): # # NOTE: Expected output data is only determined at runtime. A possibility # would be to create pydantic models dynamically but dask serialization @@ -144,7 +140,7 @@ class Config: } -class TaskOutputData(DictModel[PortKey, PortValue]): +class TaskOutputData(DictModel[ServicePortKey, PortValue]): @classmethod def from_task_output( cls, schema: TaskOutputDataSchema, output_folder: Path, output_file_ext: str From 6d736037ab205e0bc461a844774a1d5d210541ab Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 8 Aug 2024 08:47:16 +0200 Subject: [PATCH 09/10] fixed broken imports --- .../src/simcore_service_director_v2/utils/dask.py | 4 ++-- .../simcore_service_dynamic_sidecar/modules/nodeports.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index adcb740db44..0070ac3dc23 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -28,6 +28,7 @@ from models_library.projects import ProjectID, ProjectIDStr from models_library.projects_nodes_io import NodeID, NodeIDStr from models_library.services import ServiceKey, ServiceVersion +from models_library.services_types import ServicePortKey from models_library.users import UserID from models_library.utils.json_serialization import json_dumps from pydantic import AnyUrl, ByteSize, ValidationError, parse_obj_as @@ -39,7 +40,6 @@ ) from simcore_sdk.node_ports_v2 import FileLinkType, Port, links, port_utils from simcore_sdk.node_ports_v2.links import ItemValue as _NPItemValue -from simcore_sdk.node_ports_v2.ports_mapping import PortKey from ..constants import UNDEFINED_DOCKER_LABEL from ..core.errors import ( @@ -182,7 +182,7 @@ async def parse_output_data( value_to_transfer = port_value try: - await (await ports.outputs)[cast(PortKey, port_key)].set_value( + await (await ports.outputs)[cast(ServicePortKey, port_key)].set_value( value_to_transfer ) except ValidationError as err: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py index 7a545f8d770..2213dd1d4ac 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/nodeports.py @@ -17,6 +17,7 @@ from models_library.basic_types import IDStr from models_library.projects import ProjectIDStr from models_library.projects_nodes_io import NodeIDStr +from models_library.services_types import ServicePortKey from pydantic import ByteSize from servicelib.archiving_utils import PrunableFolder, archive_dir, unarchive_dir from servicelib.async_utils import run_sequentially_in_context @@ -31,7 +32,6 @@ from simcore_sdk.node_ports_v2.nodeports_v2 import Nodeports from simcore_sdk.node_ports_v2.port import SetKWargs from simcore_sdk.node_ports_v2.port_utils import is_file_type -from simcore_sdk.node_ports_v2.types import PortKey from ..core.settings import ApplicationSettings, get_settings @@ -93,7 +93,9 @@ async def upload_outputs( ) # let's gather the tasks - ports_values: dict[PortKey, tuple[ItemConcreteValue | None, SetKWargs | None]] = {} + ports_values: dict[ + ServicePortKey, tuple[ItemConcreteValue | None, SetKWargs | None] + ] = {} archiving_tasks: deque[Coroutine[None, None, None]] = deque() ports_to_set = [ port_value From 05804552660e5fcecbf8755019c4d4e67df3269b Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 8 Aug 2024 09:12:31 +0200 Subject: [PATCH 10/10] remove redundant cast --- .../src/simcore_service_director_v2/utils/dask.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 0070ac3dc23..c6229bacdc8 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -28,7 +28,6 @@ from models_library.projects import ProjectID, ProjectIDStr from models_library.projects_nodes_io import NodeID, NodeIDStr from models_library.services import ServiceKey, ServiceVersion -from models_library.services_types import ServicePortKey from models_library.users import UserID from models_library.utils.json_serialization import json_dumps from pydantic import AnyUrl, ByteSize, ValidationError, parse_obj_as @@ -182,9 +181,7 @@ async def parse_output_data( value_to_transfer = port_value try: - await (await ports.outputs)[cast(ServicePortKey, port_key)].set_value( - value_to_transfer - ) + await (await ports.outputs)[port_key].set_value(value_to_transfer) except ValidationError as err: ports_errors.extend(_get_port_validation_errors(port_key, err))