diff --git a/packages/common-library/src/common_library/errors_classes.py b/packages/common-library/src/common_library/errors_classes.py index 83e40b2a2b0..dfee557d38c 100644 --- a/packages/common-library/src/common_library/errors_classes.py +++ b/packages/common-library/src/common_library/errors_classes.py @@ -45,7 +45,7 @@ def _get_full_class_name(cls) -> str: ] return ".".join(reversed(relevant_classes)) - def error_context(self): + def error_context(self) -> dict[str, Any]: """Returns context in which error occurred and stored within the exception""" return dict(**self.__dict__) diff --git a/packages/models-library/tests/test_rest_ordering.py b/packages/models-library/tests/test_rest_ordering.py index a3f84bf0d2c..4ceed67dea5 100644 --- a/packages/models-library/tests/test_rest_ordering.py +++ b/packages/models-library/tests/test_rest_ordering.py @@ -120,10 +120,11 @@ def test_ordering_query_model_class__defaults(): # checks all defaults model = OrderQueryParamsModel() - assert model.order_by - assert isinstance(model.order_by, OrderBy) # nosec - assert model.order_by.field == "modified_at" # NOTE that this was mapped! - assert model.order_by.direction == OrderDirection.DESC + assert model.order_by is not None + assert ( + model.order_by.field == "modified_at" # pylint: disable=no-member + ) # NOTE that this was mapped! + assert model.order_by.direction is OrderDirection.DESC # pylint: disable=no-member # partial defaults model = OrderQueryParamsModel.model_validate({"order_by": {"field": "name"}}) diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index e624ed0785c..251e35fa638 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -53,10 +53,10 @@ ClusterNotFoundError, ClustersKeeperNotAvailableError, ComputationalRunNotFoundError, + ComputationalSchedulerError, ConfigurationError, PricingPlanUnitNotFoundError, ProjectNotFoundError, - SchedulerError, WalletNotEnoughCreditsError, ) from ...models.comp_pipelines import CompPipelineAtDB @@ -204,7 +204,9 @@ async def _get_project_node_names( except DBProjectNotFoundError: _logger.exception("Could not find project: %s", f"{project_id=}") except ProjectNotFoundError as exc: - _logger.exception("Could not find parent project: %s", f"{exc.project_id=}") + _logger.exception( + "Could not find parent project: %s", exc.error_context().get("project_id") + ) return {} @@ -510,7 +512,9 @@ async def get_computation( pipeline_details=pipeline_details, url=TypeAdapter(AnyHttpUrl).validate_python(f"{request.url}"), stop_url=( - TypeAdapter(AnyHttpUrl).validate_python(f"{self_url}:stop?user_id={user_id}") + TypeAdapter(AnyHttpUrl).validate_python( + f"{self_url}:stop?user_id={user_id}" + ) if pipeline_state.is_running() else None ), @@ -598,7 +602,7 @@ async def stop_computation( except ProjectNotFoundError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{e}") from e - except SchedulerError as e: + except ComputationalSchedulerError as e: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{e}") from e @@ -639,7 +643,7 @@ async def delete_computation( # abort the pipeline first try: await scheduler.stop_pipeline(computation_stop.user_id, project_id) - except SchedulerError as e: + except ComputationalSchedulerError as e: _logger.warning( "Project %s could not be stopped properly.\n reason: %s", project_id, diff --git a/services/director-v2/src/simcore_service_director_v2/core/application.py b/services/director-v2/src/simcore_service_director_v2/core/application.py index fb9d6094dff..43a9dcc4e03 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/application.py +++ b/services/director-v2/src/simcore_service_director_v2/core/application.py @@ -126,6 +126,7 @@ def create_base_app(settings: AppSettings | None = None) -> FastAPI: for name in _NOISY_LOGGERS: logging.getLogger(name).setLevel(quiet_level) + assert settings.SC_BOOT_MODE # nosec app = FastAPI( debug=settings.SC_BOOT_MODE.is_devel_mode(), title=PROJECT_NAME, diff --git a/services/director-v2/src/simcore_service_director_v2/core/errors.py b/services/director-v2/src/simcore_service_director_v2/core/errors.py index 65af83fa28f..18a5b674ed2 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/errors.py +++ b/services/director-v2/src/simcore_service_director_v2/core/errors.py @@ -19,134 +19,66 @@ } """ +from typing import Any + from common_library.errors_classes import OsparcErrorMixin from models_library.errors import ErrorDict from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID -class DirectorError(Exception): - """Basic exception""" +class DirectorError(OsparcErrorMixin, RuntimeError): + msg_template: str = "Director-v2 unexpected error" class ConfigurationError(DirectorError): - """An error in the director-v2 configuration""" - - def __init__(self, msg: str | None = None): - super().__init__( - msg or "Invalid configuration of the director-v2 application. Please check." - ) - - -class GenericDockerError(DirectorError): - """Generic docker library error""" - - def __init__(self, msg: str, original_exception: Exception): - super().__init__(msg + f": {original_exception}") - self.original_exception = original_exception - - -class ServiceNotAvailableError(DirectorError): - """Service not found""" - - def __init__(self, service_name: str, service_tag: str | None = None): - service_tag = service_tag or "UNDEFINED" - super().__init__(f"The service {service_name}:{service_tag} does not exist") - self.service_name = service_name - self.service_tag = service_tag - - -class ServiceUUIDNotFoundError(DirectorError): - """Service not found""" - - def __init__(self, service_uuid: str): - super().__init__(f"The service with uuid {service_uuid} was not found") - self.service_uuid = service_uuid - - -class ServiceUUIDInUseError(DirectorError): - """Service UUID is already in use""" - - def __init__(self, service_uuid: str): - super().__init__(f"The service uuid {service_uuid} is already in use") - self.service_uuid = service_uuid - - -class ServiceStartTimeoutError(DirectorError): - """The service was created but never run (time-out)""" - - def __init__(self, service_name: str, service_uuid: str): - super().__init__(f"Service {service_name}:{service_uuid} failed to start ") - self.service_name = service_name - self.service_uuid = service_uuid + msg_template: str = "Application misconfiguration: {msg}" class ProjectNotFoundError(DirectorError): - """Project not found error""" - - def __init__(self, project_id: ProjectID): - super().__init__(f"project {project_id} not found") - self.project_id = project_id + msg_template: str = "project {project_id} not found" class ProjectNetworkNotFoundError(DirectorError): - """Project not found error""" - - def __init__(self, project_id: ProjectID): - super().__init__(f"no networks forund for project {project_id}") - self.project_id = project_id + msg_template: str = "no networks found for project {project_id}" class PricingPlanUnitNotFoundError(DirectorError): - """Pricing plan unit not found error""" - - def __init__(self, msg: str): - super().__init__(msg) + msg_template: str = "pricing plan not found {msg}" class PipelineNotFoundError(DirectorError): - """Pipeline not found error""" + msg_template: str = "pipeline {pipeline_id} not found" - def __init__(self, pipeline_id: str): - super().__init__(f"pipeline {pipeline_id} not found") - -class ComputationalRunNotFoundError(OsparcErrorMixin, DirectorError): +class ComputationalRunNotFoundError(DirectorError): msg_template = "Computational run not found" -class ComputationalTaskNotFoundError(OsparcErrorMixin, DirectorError): +class ComputationalTaskNotFoundError(DirectorError): msg_template = "Computational task {node_id} not found" -class WalletNotEnoughCreditsError(OsparcErrorMixin, DirectorError): +class WalletNotEnoughCreditsError(DirectorError): msg_template = "Wallet '{wallet_name}' has {wallet_credit_amount} credits." # # SCHEDULER ERRORS # +class ComputationalSchedulerError(DirectorError): + msg_template = "Computational scheduler unexpected error {msg}" -class SchedulerError(DirectorError): - def __init__(self, msg: str | None = None): - super().__init__(msg or "Unexpected error in the scheduler") - - -class InvalidPipelineError(SchedulerError): - """A pipeline is misconfigured""" - - def __init__(self, pipeline_id: str, msg: str | None = None): - super().__init__(msg or f"Invalid configuration of pipeline {pipeline_id}") +class InvalidPipelineError(ComputationalSchedulerError): + msg_template = "Computational scheduler: Invalid configuration of pipeline {pipeline_id}: {msg}" -class TaskSchedulingError(SchedulerError): - """A task cannot be scheduled""" +class TaskSchedulingError(ComputationalSchedulerError): + msg_template = "Computational scheduler: Task {node_id} in project {project_id} could not be scheduled {msg}" - code: str = "task scheduler error" - - def __init__(self, project_id: ProjectID, node_id: NodeID, msg: str | None = None): - super().__init__(msg=msg) + def __init__(self, project_id: ProjectID, node_id: NodeID, **ctx: Any) -> None: + super().__init__(project_id=project_id, node_id=node_id, **ctx) self.project_id = project_id self.node_id = node_id @@ -164,89 +96,59 @@ def get_errors(self) -> list[ErrorDict]: ] -class MissingComputationalResourcesError(TaskSchedulingError): - """A task cannot be scheduled because the cluster does not have the required resources""" - - def __init__(self, project_id: ProjectID, node_id: NodeID, msg: str | None = None): - super().__init__(project_id, node_id, msg=msg) - +class MissingComputationalResourcesError( + TaskSchedulingError +): # pylint: disable=too-many-ancestors + msg_template = ( + "Service {service_name}:{service_version} cannot be scheduled " + "on cluster {cluster_id}: task needs '{task_resources}', " + "cluster has {cluster_resources}" + ) -class InsuficientComputationalResourcesError(TaskSchedulingError): - """A task cannot be scheduled because the cluster does not have *enough* of the required resources""" - def __init__(self, project_id: ProjectID, node_id: NodeID, msg: str | None = None): - super().__init__(project_id, node_id, msg=msg) +class InsuficientComputationalResourcesError( + TaskSchedulingError +): # pylint: disable=too-many-ancestors + msg_template: str = ( + "Insufficient computational resources to run {service_name}:{service_version} with {service_requested_resources} on cluster {cluster_id}." + "Cluster available workers: {cluster_available_resources}" + "TIP: Reduce service required resources or contact oSparc support" + ) -class PortsValidationError(TaskSchedulingError): - """ - Gathers all validation errors raised while checking input/output - ports in a project's node. - """ +class PortsValidationError(TaskSchedulingError): # pylint: disable=too-many-ancestors + msg_template: str = ( + "Node {node_id} in {project_id} with ports having invalid values {errors_list}" + ) - def __init__(self, project_id: ProjectID, node_id: NodeID, errors: list[ErrorDict]): - super().__init__( - project_id, - node_id, - msg=f"Node with {len(errors)} ports having invalid values", - ) - self.errors = errors - def get_errors(self) -> list[ErrorDict]: - """Returns 'public errors': filters only value_error.port_validation errors for the client. - The rest only shown as number - """ - value_errors: list[ErrorDict] = [] - for error in self.errors: - # NOTE: should I filter? if error["type"].startswith("value_error."): - - loc_tail: list[str] = [] - if port_key := error.get("ctx", {}).get("port_key"): - loc_tail.append(f"{port_key}") - - if schema_error_path := error.get("ctx", {}).get("schema_error_path"): - loc_tail += list(schema_error_path) - - # WARNING: error in a node, might come from the previous node's port - # DO NOT remove project/node/port hiearchy - value_errors.append( - { - "loc": (f"{self.project_id}", f"{self.node_id}", *tuple(loc_tail)), - "msg": error["msg"], - # NOTE: here we list the codes of the PydanticValueErrors collected in ValidationError - "type": error["type"], - } - ) - return value_errors - - -class ComputationalSchedulerChangedError(OsparcErrorMixin, SchedulerError): +class ComputationalSchedulerChangedError(ComputationalSchedulerError): msg_template = "The dask scheduler ID changed from '{original_scheduler_id}' to '{current_scheduler_id}'" -class ComputationalBackendNotConnectedError(OsparcErrorMixin, SchedulerError): +class ComputationalBackendNotConnectedError(ComputationalSchedulerError): msg_template = "The dask computational backend is not connected" -class ComputationalBackendNoS3AccessError(OsparcErrorMixin, SchedulerError): +class ComputationalBackendNoS3AccessError(ComputationalSchedulerError): msg_template = "The S3 backend is not ready, please try again later" -class ComputationalBackendTaskNotFoundError(OsparcErrorMixin, SchedulerError): +class ComputationalBackendTaskNotFoundError(ComputationalSchedulerError): msg_template = ( "The dask computational backend does not know about the task '{job_id}'" ) -class ComputationalBackendTaskResultsNotReadyError(OsparcErrorMixin, SchedulerError): +class ComputationalBackendTaskResultsNotReadyError(ComputationalSchedulerError): msg_template = "The task result is not ready yet for job '{job_id}'" -class ClustersKeeperNotAvailableError(OsparcErrorMixin, SchedulerError): +class ClustersKeeperNotAvailableError(ComputationalSchedulerError): msg_template = "clusters-keeper service is not available!" -class ComputationalBackendOnDemandNotReadyError(OsparcErrorMixin, SchedulerError): +class ComputationalBackendOnDemandNotReadyError(ComputationalSchedulerError): msg_template = ( "The on demand computational cluster is not ready 'est. remaining time: {eta}'" ) @@ -255,15 +157,15 @@ class ComputationalBackendOnDemandNotReadyError(OsparcErrorMixin, SchedulerError # # SCHEDULER/CLUSTER ERRORS # -class ClusterNotFoundError(OsparcErrorMixin, SchedulerError): +class ClusterNotFoundError(ComputationalSchedulerError): msg_template = "The cluster '{cluster_id}' not found" -class ClusterAccessForbiddenError(OsparcErrorMixin, SchedulerError): +class ClusterAccessForbiddenError(ComputationalSchedulerError): msg_template = "Insufficient rights to access cluster '{cluster_id}'" -class ClusterInvalidOperationError(OsparcErrorMixin, SchedulerError): +class ClusterInvalidOperationError(ComputationalSchedulerError): msg_template = "Invalid operation on cluster '{cluster_id}'" @@ -272,21 +174,21 @@ class ClusterInvalidOperationError(OsparcErrorMixin, SchedulerError): # -class DaskClientRequestError(OsparcErrorMixin, SchedulerError): +class DaskClientRequestError(ComputationalSchedulerError): msg_template = ( "The dask client to cluster on '{endpoint}' did an invalid request '{error}'" ) -class DaskClusterError(OsparcErrorMixin, SchedulerError): +class DaskClusterError(ComputationalSchedulerError): msg_template = "The dask cluster on '{endpoint}' encountered an error: '{error}'" -class DaskGatewayServerError(OsparcErrorMixin, SchedulerError): +class DaskGatewayServerError(ComputationalSchedulerError): msg_template = "The dask gateway on '{endpoint}' encountered an error: '{error}'" -class DaskClientAcquisisitonError(OsparcErrorMixin, SchedulerError): +class DaskClientAcquisisitonError(ComputationalSchedulerError): msg_template = ( "The dask client to cluster '{cluster}' encountered an error '{error}'" ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py index 5fd39dda4bc..2d663aec9a1 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_base_scheduler.py @@ -45,10 +45,10 @@ ComputationalBackendNotConnectedError, ComputationalBackendOnDemandNotReadyError, ComputationalSchedulerChangedError, + ComputationalSchedulerError, DaskClientAcquisisitonError, InvalidPipelineError, PipelineNotFoundError, - SchedulerError, TaskSchedulingError, ) from ...core.settings import ComputationalBackendSettings @@ -242,7 +242,7 @@ async def stop_pipeline( } if not possible_iterations: msg = f"There are no pipeline scheduled for {user_id}:{project_id}" - raise SchedulerError(msg) + raise ComputationalSchedulerError(msg=msg) current_max_iteration = max(possible_iterations) selected_iteration = current_max_iteration else: @@ -281,7 +281,7 @@ def _get_last_iteration(self, user_id: UserID, project_id: ProjectID) -> Iterati } if not possible_iterations: msg = f"There are no pipeline scheduled for {user_id}:{project_id}" - raise SchedulerError(msg) + raise ComputationalSchedulerError(msg=msg) return max(possible_iterations) def _start_scheduling( @@ -342,10 +342,10 @@ async def _get_pipeline_tasks( } if len(pipeline_comp_tasks) != len(pipeline_dag.nodes()): # type: ignore[arg-type] msg = ( - f"{project_id}The tasks defined for {project_id} do not contain all" + f"The tasks defined for {project_id} do not contain all" f" the tasks defined in the pipeline [{list(pipeline_dag.nodes)}]! Please check." ) - raise InvalidPipelineError(msg) + raise InvalidPipelineError(pipeline_id=project_id, msg=msg) return pipeline_comp_tasks async def _update_run_result_from_tasks( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py index 39b432b9492..524dfc7e8ad 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/_scheduler_factory.py @@ -18,7 +18,7 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler: if not hasattr(app.state, "engine"): msg = "Database connection is missing. Please check application configuration." - raise ConfigurationError(msg) + raise ConfigurationError(msg=msg) db_engine = app.state.engine with log_context( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_clients_pool.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_clients_pool.py index d246bb35f42..31177b5a616 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_clients_pool.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_clients_pool.py @@ -50,7 +50,7 @@ async def create( def instance(app: FastAPI) -> "DaskClientsPool": if not hasattr(app.state, "dask_clients_pool"): msg = "Dask clients pool is not available. Please check the configuration." - raise ConfigurationError(msg) + raise ConfigurationError(msg=msg) dask_clients_pool: DaskClientsPool = app.state.dask_clients_pool return dask_clients_pool diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py index 37129141f6d..38981b5fa7d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_pipelines.py @@ -25,7 +25,7 @@ async def get_pipeline(self, project_id: ProjectID) -> CompPipelineAtDB: ) row: RowProxy | None = await result.fetchone() if not row: - raise PipelineNotFoundError(str(project_id)) + raise PipelineNotFoundError(pipeline_id=project_id) return CompPipelineAtDB.model_validate(row) async def upsert_pipeline( @@ -39,7 +39,9 @@ async def upsert_pipeline( dag_adjacency_list=nx.to_dict_of_lists(dag_graph), state=RunningState.PUBLISHED if publish else RunningState.NOT_STARTED, ) - insert_stmt = insert(comp_pipeline).values(**pipeline_at_db.model_dump(by_alias=True)) + insert_stmt = insert(comp_pipeline).values( + **pipeline_at_db.model_dump(by_alias=True) + ) # FIXME: This is not a nice thing. this part of the information should be kept in comp_runs. update_exclusion_policy = set() if not dag_graph.nodes(): diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py index bdb64cbbf99..637e0c7faf6 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks/_utils.py @@ -322,7 +322,7 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: f"invalid EC2 type name selected {set(hardware_info.aws_ec2_instances)}." " TIP: adjust product configuration" ) - raise ConfigurationError(msg) from exc + raise ConfigurationError(msg=msg) from exc except ( RemoteMethodNotRegisteredError, RPCServerError, @@ -450,9 +450,11 @@ async def generate_tasks_list_from_project( last_heartbeat=None, created=arrow.utcnow().datetime, modified=arrow.utcnow().datetime, - pricing_info=pricing_info.model_dump(exclude={"pricing_unit_cost"}) - if pricing_info - else None, + pricing_info=( + pricing_info.model_dump(exclude={"pricing_unit_cost"}) + if pricing_info + else None + ), hardware_info=hardware_info, ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/projects.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/projects.py index 5f5fe5263ff..902f9977574 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/projects.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/projects.py @@ -22,7 +22,7 @@ async def get_project(self, project_id: ProjectID) -> ProjectAtDB: ) ).first() if not row: - raise ProjectNotFoundError(project_id) + raise ProjectNotFoundError(project_id=project_id) return ProjectAtDB.model_validate(row) async def is_node_present_in_workbench( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/projects_networks.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/projects_networks.py index 59334aa0a06..12fc7fe2932 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/projects_networks.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/projects_networks.py @@ -22,7 +22,7 @@ async def get_projects_networks(self, project_id: ProjectID) -> ProjectsNetworks ) ).first() if not row: - raise ProjectNetworkNotFoundError(project_id) + raise ProjectNetworkNotFoundError(project_id=project_id) return ProjectsNetworks.model_validate(row) async def upsert_projects_networks( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py index ed834581b03..1e05524b48d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py @@ -59,7 +59,7 @@ async def get_swarm_network(simcore_services_network_name: DockerNetworkName) -> f"Swarm network name (searching for '*{simcore_services_network_name}*') " f"is not configured.Found following networks: {networks}" ) - raise DynamicSidecarError(msg) + raise DynamicSidecarError(msg=msg) return networks[0] @@ -89,7 +89,7 @@ async def create_network(network_config: dict[str, Any]) -> NetworkId: # finally raise an error if a network cannot be spawned # pylint: disable=raise-missing-from msg = f"Could not create or recover a network ID for {network_config}" - raise DynamicSidecarError(msg) from e + raise DynamicSidecarError(msg=msg) from e def _to_snake_case(string: str) -> str: @@ -119,7 +119,7 @@ async def create_service_and_get_id( if "ID" not in service_start_result: msg = f"Error while starting service: {service_start_result!s}" - raise DynamicSidecarError(msg) + raise DynamicSidecarError(msg=msg) service_id: ServiceId = service_start_result["ID"] return service_id @@ -159,7 +159,10 @@ async def _get_service_latest_task(service_id: str) -> Mapping[str, Any]: last_task: Mapping[str, Any] = sorted_tasks[-1] return last_task except GenericDockerError as err: - if err.original_exception.status == status.HTTP_404_NOT_FOUND: + if ( + err.error_context()["original_exception"].status + == status.HTTP_404_NOT_FOUND + ): raise DockerServiceNotFoundError(service_id=service_id) from err raise @@ -205,7 +208,7 @@ async def _get_task_data_when_service_running(service_id: str) -> Mapping[str, A docker_node_id: None | str = task.get("NodeID", None) if not docker_node_id: msg = f"Could not find an assigned NodeID for service_id={service_id}. Last task inspect result: {task}" - raise DynamicSidecarError(msg) + raise DynamicSidecarError(msg=msg) return docker_node_id diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_utils.py index ceb9d276c13..f625c2ea625 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_utils.py @@ -13,8 +13,7 @@ async def docker_client() -> AsyncIterator[aiodocker.docker.Docker]: client = aiodocker.Docker() yield client except aiodocker.exceptions.DockerError as e: - message = "Unexpected error from docker client" - raise GenericDockerError(message, e) from e + raise GenericDockerError(msg=f"{e.message}", original_exception=e) from e finally: if client is not None: await client.close() diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/settings.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/settings.py index 78a1201a714..8c1849064ee 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/settings.py @@ -267,7 +267,7 @@ def remap_to_compose_spec_key() -> dict[str, SimcoreServiceLabels]: f"docker_image_name_by_services={docker_image_name_by_services}" ) log.error(message) - raise DynamicSidecarError(message) + raise DynamicSidecarError(msg=message) return remap_to_compose_spec_key() diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/errors.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/errors.py index e3d67da68aa..3b0a400223b 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/errors.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/errors.py @@ -1,34 +1,28 @@ -from aiodocker.exceptions import DockerError -from common_library.errors_classes import OsparcErrorMixin -from models_library.projects_nodes_io import NodeID +from typing import Any + +from aiodocker import DockerError from ...core.errors import DirectorError class DynamicSidecarError(DirectorError): - pass + msg_template: str = "Unexpected dynamic sidecar error: {msg}" class GenericDockerError(DynamicSidecarError): - """Generic docker library error""" - - def __init__(self, msg: str, original_exception: DockerError): - super().__init__(msg + f": {original_exception.message}") + def __init__(self, original_exception: DockerError, **ctx: Any) -> None: + super().__init__(original_exception=original_exception, **ctx) self.original_exception = original_exception + msg_template: str = "Unexpected error using docker client: {msg}" -class DynamicSidecarNotFoundError(DirectorError): - """Dynamic sidecar was not found""" - def __init__(self, node_uuid: NodeID): - super().__init__(f"node {node_uuid} not found") +class DynamicSidecarNotFoundError(DirectorError): + msg_template: str = "node {node_uuid} not found" class DockerServiceNotFoundError(DirectorError): - """Raised when an expected docker service is not found""" - - def __init__(self, service_id: str): - super().__init__(f"docker service with {service_id=} not found") + msg_template: str = "docker service with {service_id} not found" class EntrypointContainerNotFoundError(DynamicSidecarError): @@ -39,5 +33,5 @@ class LegacyServiceIsNotSupportedError(DirectorError): """This API is not implemented by the director-v0""" -class UnexpectedContainerStatusError(OsparcErrorMixin, DynamicSidecarError): - msg_template = "Unexpected status from containers: {containers_with_error}" +class UnexpectedContainerStatusError(DynamicSidecarError): + msg_template: str = "Unexpected status from containers: {containers_with_error}" diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index 04853661c47..99fa3517130 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -171,7 +171,7 @@ def toggle_observation(self, node_uuid: NodeID, *, disable: bool) -> bool: raises DynamicSidecarNotFoundError """ if node_uuid not in self._inverse_search_mapping: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) service_name = self._inverse_search_mapping[node_uuid] service_task = self._service_observation_task.get(service_name) @@ -274,7 +274,7 @@ async def add_service_from_scheduler_data( f"node_uuids at a global level collided. A running service for node {scheduler_data.node_uuid} already exists." " Please checkout other projects which may have this issue." ) - raise DynamicSidecarError(msg) + raise DynamicSidecarError(msg=msg) self._inverse_search_mapping[ scheduler_data.node_uuid @@ -288,7 +288,7 @@ def is_service_tracked(self, node_uuid: NodeID) -> bool: def get_scheduler_data(self, node_uuid: NodeID) -> SchedulerData: if node_uuid not in self._inverse_search_mapping: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) service_name = self._inverse_search_mapping[node_uuid] return self._to_observe[service_name] @@ -336,7 +336,7 @@ async def mark_service_for_removal( """Marks service for removal, causing RemoveMarkedService to trigger""" async with self._lock: if node_uuid not in self._inverse_search_mapping: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) service_name = self._inverse_search_mapping[node_uuid] if service_name not in self._to_observe: @@ -416,7 +416,7 @@ async def remove_service_from_observation(self, node_uuid: NodeID) -> None: """ async with self._lock: if node_uuid not in self._inverse_search_mapping: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) service_name = self._inverse_search_mapping[node_uuid] if service_name not in self._to_observe: @@ -438,7 +438,7 @@ async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDeta raises DynamicSidecarNotFoundError """ if node_uuid not in self._inverse_search_mapping: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) service_name = self._inverse_search_mapping[node_uuid] scheduler_data: SchedulerData = self._to_observe[service_name] @@ -451,7 +451,7 @@ async def retrieve_service_inputs( ) -> RetrieveDataOutEnveloped: """Pulls data from input ports for the service""" if node_uuid not in self._inverse_search_mapping: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) service_name = self._inverse_search_mapping[node_uuid] scheduler_data: SchedulerData = self._to_observe[service_name] @@ -518,7 +518,7 @@ async def detach_project_network( async def restart_containers(self, node_uuid: NodeID) -> None: """Restarts containers without saving or restoring the state or I/O ports""" if node_uuid not in self._inverse_search_mapping: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) service_name: ServiceName = self._inverse_search_mapping[node_uuid] scheduler_data: SchedulerData = self._to_observe[service_name] diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py index d003eec60e6..7f55dc68498 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/volumes.py @@ -79,7 +79,7 @@ def _get_s3_volume_driver_config( } else: msg = f"Unexpected, all {S3Provider.__name__} should be covered" - raise DynamicSidecarError(msg) + raise DynamicSidecarError(msg=msg) assert extra_options is not None # nosec options: dict[str, Any] = driver_config["Options"] diff --git a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py index dcda51ad0e5..a7cb4e1ba27 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py @@ -81,7 +81,7 @@ async def on_shutdown() -> None: def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient: if not hasattr(app.state, "rabbitmq_client"): msg = "RabbitMQ client is not available. Please check the configuration." - raise ConfigurationError(msg) + raise ConfigurationError(msg=msg) return cast(RabbitMQClient, app.state.rabbitmq_client) @@ -90,5 +90,5 @@ def get_rabbitmq_rpc_client(app: FastAPI) -> RabbitMQRPCClient: msg = ( "RabbitMQ client for RPC is not available. Please check the configuration." ) - raise ConfigurationError(msg) + raise ConfigurationError(msg=msg) return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_client) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/resource_usage_tracker_client.py b/services/director-v2/src/simcore_service_director_v2/modules/resource_usage_tracker_client.py index 58d02975fd7..3b75607989d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/resource_usage_tracker_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/resource_usage_tracker_client.py @@ -91,7 +91,7 @@ async def get_default_service_pricing_plan( ) if response.status_code == status.HTTP_404_NOT_FOUND: msg = "No pricing plan defined" - raise PricingPlanUnitNotFoundError(msg) + raise PricingPlanUnitNotFoundError(msg=msg) response.raise_for_status() return PricingPlanGet.model_validate(response.json()) @@ -117,7 +117,7 @@ async def get_default_pricing_and_hardware_info( unit.specific_info.aws_ec2_instances, ) msg = "Default pricing plan and unit does not exist" - raise PricingPlanUnitNotFoundError(msg) + raise PricingPlanUnitNotFoundError(msg=msg) async def get_pricing_unit( self, diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index d76596b5bf1..afb1e0b3770 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -129,7 +129,9 @@ async def create_node_ports( db_manager=db_manager, ) except ValidationError as err: - raise PortsValidationError(project_id, node_id, list(err.errors())) from err + raise PortsValidationError( + project_id=project_id, node_id=node_id, errors_list=list(err.errors()) + ) from err async def parse_output_data( @@ -181,7 +183,9 @@ async def parse_output_data( ports_errors.extend(_get_port_validation_errors(port_key, err)) if ports_errors: - raise PortsValidationError(project_id, node_id, ports_errors) + raise PortsValidationError( + project_id=project_id, node_id=node_id, errors_list=ports_errors + ) async def compute_input_data( @@ -218,11 +222,13 @@ async def compute_input_data( else: input_data[port.key] = value - except ValidationError as err: # noqa: PERF203 + except ValidationError as err: ports_errors.extend(_get_port_validation_errors(port.key, err)) if ports_errors: - raise PortsValidationError(project_id, node_id, ports_errors) + raise PortsValidationError( + project_id=project_id, node_id=node_id, errors_list=ports_errors + ) return TaskInputData.model_validate(input_data) @@ -608,18 +614,25 @@ def check_if_cluster_is_able_to_run_pipeline( raise MissingComputationalResourcesError( project_id=project_id, node_id=node_id, - msg=f"Service {node_image.name}:{node_image.tag} cannot be scheduled " - f"on cluster {cluster_id}: task needs '{task_resources}', " - f"cluster has {cluster_resources}", + service_name=node_image.name, + service_version=node_image.tag, + cluster_id=cluster_id, + task_resources=task_resources, + cluster_resources=cluster_resources, ) # well then our workers are not powerful enough raise InsuficientComputationalResourcesError( project_id=project_id, node_id=node_id, - msg=f"Insufficient computational resources to run {node_image.name}:{node_image.tag} with {_to_human_readable_resource_values( task_resources)} on cluster {cluster_id}." - f"Cluster available workers: {[_to_human_readable_resource_values( worker.get('resources', None)) for worker in workers.values()]}" - "TIP: Reduce service required resources or contact oSparc support", + service_name=node_image.name, + service_version=node_image.tag, + service_requested_resources=_to_human_readable_resource_values(task_resources), + cluster_id=cluster_id, + cluster_available_resources=[ + _to_human_readable_resource_values(worker.get("resources", None)) + for worker in workers.values() + ], ) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py b/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py index 15e6e98dfce..964f38e6484 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py @@ -26,11 +26,11 @@ from pydantic import AnyUrl from ..core.errors import ( + ComputationalSchedulerError, ConfigurationError, DaskClientRequestError, DaskClusterError, DaskGatewayServerError, - SchedulerError, ) from .dask import check_maximize_workers, wrap_client_async_routine @@ -101,7 +101,7 @@ async def _connect_to_dask_scheduler( ) except TypeError as exc: msg = f"Scheduler has invalid configuration: {endpoint=}" - raise ConfigurationError(msg) from exc + raise ConfigurationError(msg=msg) from exc async def _connect_with_gateway_and_create_cluster( @@ -155,7 +155,7 @@ async def _connect_with_gateway_and_create_cluster( except TypeError as exc: msg = f"Cluster has invalid configuration: {endpoint=}, {auth_params=}" - raise ConfigurationError(msg) from exc + raise ConfigurationError(msg=msg) from exc except ValueError as exc: # this is when a 404=NotFound,422=MalformedData comes up raise DaskClientRequestError(endpoint=endpoint, error=exc) from exc @@ -196,10 +196,10 @@ async def get_gateway_auth_from_params( return dask_gateway.JupyterHubAuth(auth_params.api_token) except (TypeError, ValueError) as exc: msg = f"Cluster has invalid configuration: {auth_params}" - raise ConfigurationError(msg) from exc + raise ConfigurationError(msg=msg) from exc msg = f"Cluster has invalid configuration: {auth_params=}" - raise ConfigurationError(msg) + raise ConfigurationError(msg=msg) _PING_TIMEOUT_S: Final[int] = 5 @@ -220,7 +220,7 @@ async def test_scheduler_endpoint( ) as dask_client: if dask_client.status != _DASK_SCHEDULER_RUNNING_STATE: msg = "internal scheduler is not running!" - raise SchedulerError(msg) + raise ComputationalSchedulerError(msg=msg) else: gateway_auth = await get_gateway_auth_from_params(authentication) @@ -247,8 +247,8 @@ async def test_scheduler_endpoint( ClientConnectionError, ClientResponseError, httpx.HTTPError, - SchedulerError, + ComputationalSchedulerError, ) as exc: logger.debug("Pinging %s, failed: %s", f"{endpoint=}", f"{exc=!r}") msg = f"Could not connect to cluster in {endpoint}: error: {exc}" - raise ConfigurationError(msg) from exc + raise ConfigurationError(msg=msg) from exc diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py index 560794460de..f0a17c5e51c 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py @@ -399,9 +399,10 @@ async def test_get_stack_status_missing( mocked_dynamic_scheduler_events: None, mock_docker_api: None, ) -> None: - with pytest.raises(DynamicSidecarNotFoundError) as execinfo: + with pytest.raises( + DynamicSidecarNotFoundError, match=rf"{scheduler_data.node_uuid} not found" + ): await scheduler.get_stack_status(scheduler_data.node_uuid) - assert f"{scheduler_data.node_uuid} not found" in str(execinfo) async def test_get_stack_status_failing_sidecar( diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py b/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py index 5787fa119e1..2de98368d9a 100644 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py +++ b/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py @@ -252,7 +252,7 @@ def mocked_director_v2_scheduler(mocker: MockerFixture, exp_status_code: int) -> # MOCKING get_stack_status def get_stack_status(node_uuid: NodeID) -> RunningDynamicServiceDetails: if exp_status_code == status.HTTP_307_TEMPORARY_REDIRECT: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) return RunningDynamicServiceDetails.model_validate( RunningDynamicServiceDetails.model_config["json_schema_extra"]["examples"][ @@ -269,7 +269,7 @@ def get_stack_status(node_uuid: NodeID) -> RunningDynamicServiceDetails: # MOCKING remove_service def remove_service(node_uuid: NodeID, *ars: Any, **kwargs: Any) -> None: if exp_status_code == status.HTTP_307_TEMPORARY_REDIRECT: - raise DynamicSidecarNotFoundError(node_uuid) + raise DynamicSidecarNotFoundError(node_uuid=node_uuid) mocker.patch( f"{module_base}._task.DynamicSidecarsScheduler.mark_service_for_removal", diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 495023dbda2..8fbc2d9006b 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -58,9 +58,9 @@ ComputationalBackendTaskNotFoundError, ComputationalBackendTaskResultsNotReadyError, ComputationalSchedulerChangedError, + ComputationalSchedulerError, ConfigurationError, PipelineNotFoundError, - SchedulerError, ) from simcore_service_director_v2.core.settings import AppSettings from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB @@ -1107,7 +1107,7 @@ async def test_handling_of_disconnected_dask_scheduler( aiopg_engine: aiopg.sa.engine.Engine, mocker: MockerFixture, published_project: PublishedProject, - backend_error: SchedulerError, + backend_error: ComputationalSchedulerError, run_metadata: RunMetadataDict, ): # this will create a non connected backend issue that will trigger re-connection diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py index f36a8f8f7f6..77c327706fd 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py @@ -403,13 +403,12 @@ def test_settings__valid_network_names( async def test_failed_docker_client_request(docker_swarm: None): missing_network_name = "this_network_cannot_be_found" - with pytest.raises(GenericDockerError) as execinfo: + with pytest.raises( + GenericDockerError, + match=f"Unexpected error using docker client: network {missing_network_name} not found", + ): async with docker_client() as client: await client.networks.get(missing_network_name) - assert ( - str(execinfo.value) - == f"Unexpected error from docker client: network {missing_network_name} not found" - ) async def test_get_swarm_network_ok( @@ -428,16 +427,16 @@ async def test_get_swarm_network_missing_network( dynamic_services_scheduler_settings: DynamicServicesSchedulerSettings, docker_swarm: None, ): - with pytest.raises(DynamicSidecarError) as excinfo: + with pytest.raises( + DynamicSidecarError, + match=r"Unexpected dynamic sidecar error: " + r"Swarm network name \(searching for \'\*test_network_name\*\'\) is not configured." + r"Found following networks: \[\]", + ): await docker_api.get_swarm_network( dynamic_services_scheduler_settings.SIMCORE_SERVICES_NETWORK_NAME ) - assert str(excinfo.value) == ( - "Swarm network name (searching for '*test_network_name*') is not configured." - "Found following networks: []" - ) - async def test_recreate_network_multiple_times( network_config: dict[str, Any],