Skip to content

Commit

Permalink
pruning
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Nov 19, 2024
1 parent f386608 commit 60e5447
Show file tree
Hide file tree
Showing 9 changed files with 2,488 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@
ClusterNotFoundError,
ClustersKeeperNotAvailableError,
ComputationalRunNotFoundError,
ComputationalSchedulerError,
ConfigurationError,
PricingPlanUnitNotFoundError,
ProjectNotFoundError,
SchedulerError,
WalletNotEnoughCreditsError,
)
from ...models.comp_pipelines import CompPipelineAtDB
Expand Down Expand Up @@ -510,7 +510,9 @@ async def get_computation(
pipeline_details=pipeline_details,
url=TypeAdapter(AnyHttpUrl).validate_python(f"{request.url}"),
stop_url=(
TypeAdapter(AnyHttpUrl).validate_python(f"{self_url}:stop?user_id={user_id}")
TypeAdapter(AnyHttpUrl).validate_python(
f"{self_url}:stop?user_id={user_id}"
)
if pipeline_state.is_running()
else None
),
Expand Down Expand Up @@ -598,7 +600,7 @@ async def stop_computation(

except ProjectNotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{e}") from e
except SchedulerError as e:
except ComputationalSchedulerError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"{e}") from e


Expand Down Expand Up @@ -639,7 +641,7 @@ async def delete_computation(
# abort the pipeline first
try:
await scheduler.stop_pipeline(computation_stop.user_id, project_id)
except SchedulerError as e:
except ComputationalSchedulerError as e:
_logger.warning(
"Project %s could not be stopped properly.\n reason: %s",
project_id,
Expand Down
90 changes: 33 additions & 57 deletions services/director-v2/src/simcore_service_director_v2/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,71 +49,47 @@ class PipelineNotFoundError(DirectorError):
msg_template: str = "pipeline {pipeline_id} not found"


class ComputationalRunNotFoundError(OsparcErrorMixin, DirectorError):
class ComputationalRunNotFoundError(DirectorError):
msg_template = "Computational run not found"


class ComputationalTaskNotFoundError(OsparcErrorMixin, DirectorError):
class ComputationalTaskNotFoundError(DirectorError):
msg_template = "Computational task {node_id} not found"


class WalletNotEnoughCreditsError(OsparcErrorMixin, DirectorError):
class WalletNotEnoughCreditsError(DirectorError):
msg_template = "Wallet '{wallet_name}' has {wallet_credit_amount} credits."


#
# SCHEDULER ERRORS
#
class ComputationalSchedulerError(DirectorError):
msg_template = "Computational scheduler unexpected error"


class SchedulerError(DirectorError):
def __init__(self, msg: str | None = None):
super().__init__(msg or "Unexpected error in the scheduler")
class InvalidPipelineError(ComputationalSchedulerError):
msg_template = "Computational scheduler: Invalid configuration of pipeline {pipeline_id}: {msg}"


class InvalidPipelineError(SchedulerError):
"""A pipeline is misconfigured"""

def __init__(self, pipeline_id: str, msg: str | None = None):
super().__init__(msg or f"Invalid configuration of pipeline {pipeline_id}")


class TaskSchedulingError(SchedulerError):
"""A task cannot be scheduled"""

code: str = "task scheduler error"

def __init__(self, project_id: ProjectID, node_id: NodeID, msg: str | None = None):
super().__init__(msg=msg)
self.project_id = project_id
self.node_id = node_id

def get_errors(self) -> list[ErrorDict]:
# default implementation
return [
{
"loc": (
f"{self.project_id}",
f"{self.node_id}",
),
"msg": f"{self.args[0]}",
"type": self.code,
},
]
class TaskSchedulingError(ComputationalSchedulerError):
msg_template = "Computational scheduler: Task {node_id} in project {project_id} could not be scheduled {msg}"


class MissingComputationalResourcesError(TaskSchedulingError):
"""A task cannot be scheduled because the cluster does not have the required resources"""

def __init__(self, project_id: ProjectID, node_id: NodeID, msg: str | None = None):
super().__init__(project_id, node_id, msg=msg)
msg_template = (
"Service {service_name}:{service_version} cannot be scheduled "
"on cluster {cluster_id}: task needs '{task_resources}', "
"cluster has {cluster_resources}",
)


class InsuficientComputationalResourcesError(TaskSchedulingError):
"""A task cannot be scheduled because the cluster does not have *enough* of the required resources"""

def __init__(self, project_id: ProjectID, node_id: NodeID, msg: str | None = None):
super().__init__(project_id, node_id, msg=msg)
msg_template: str = (
"Insufficient computational resources to run {service_name}:{service_version} with {service_requested_resources} on cluster {cluster_id}."
"Cluster available workers: {cluster_available_resources}"
"TIP: Reduce service required resources or contact oSparc support"
)


class PortsValidationError(TaskSchedulingError):
Expand Down Expand Up @@ -158,33 +134,33 @@ def get_errors(self) -> list[ErrorDict]:
return value_errors


class ComputationalSchedulerChangedError(OsparcErrorMixin, SchedulerError):
class ComputationalSchedulerChangedError(ComputationalSchedulerError):
msg_template = "The dask scheduler ID changed from '{original_scheduler_id}' to '{current_scheduler_id}'"


class ComputationalBackendNotConnectedError(OsparcErrorMixin, SchedulerError):
class ComputationalBackendNotConnectedError(ComputationalSchedulerError):
msg_template = "The dask computational backend is not connected"


class ComputationalBackendNoS3AccessError(OsparcErrorMixin, SchedulerError):
class ComputationalBackendNoS3AccessError(ComputationalSchedulerError):
msg_template = "The S3 backend is not ready, please try again later"


class ComputationalBackendTaskNotFoundError(OsparcErrorMixin, SchedulerError):
class ComputationalBackendTaskNotFoundError(ComputationalSchedulerError):
msg_template = (
"The dask computational backend does not know about the task '{job_id}'"
)


class ComputationalBackendTaskResultsNotReadyError(OsparcErrorMixin, SchedulerError):
class ComputationalBackendTaskResultsNotReadyError(ComputationalSchedulerError):
msg_template = "The task result is not ready yet for job '{job_id}'"


class ClustersKeeperNotAvailableError(OsparcErrorMixin, SchedulerError):
class ClustersKeeperNotAvailableError(ComputationalSchedulerError):
msg_template = "clusters-keeper service is not available!"


class ComputationalBackendOnDemandNotReadyError(OsparcErrorMixin, SchedulerError):
class ComputationalBackendOnDemandNotReadyError(ComputationalSchedulerError):
msg_template = (
"The on demand computational cluster is not ready 'est. remaining time: {eta}'"
)
Expand All @@ -193,15 +169,15 @@ class ComputationalBackendOnDemandNotReadyError(OsparcErrorMixin, SchedulerError
#
# SCHEDULER/CLUSTER ERRORS
#
class ClusterNotFoundError(OsparcErrorMixin, SchedulerError):
class ClusterNotFoundError(ComputationalSchedulerError):
msg_template = "The cluster '{cluster_id}' not found"


class ClusterAccessForbiddenError(OsparcErrorMixin, SchedulerError):
class ClusterAccessForbiddenError(ComputationalSchedulerError):
msg_template = "Insufficient rights to access cluster '{cluster_id}'"


class ClusterInvalidOperationError(OsparcErrorMixin, SchedulerError):
class ClusterInvalidOperationError(ComputationalSchedulerError):
msg_template = "Invalid operation on cluster '{cluster_id}'"


Expand All @@ -210,21 +186,21 @@ class ClusterInvalidOperationError(OsparcErrorMixin, SchedulerError):
#


class DaskClientRequestError(OsparcErrorMixin, SchedulerError):
class DaskClientRequestError(ComputationalSchedulerError):
msg_template = (
"The dask client to cluster on '{endpoint}' did an invalid request '{error}'"
)


class DaskClusterError(OsparcErrorMixin, SchedulerError):
class DaskClusterError(ComputationalSchedulerError):
msg_template = "The dask cluster on '{endpoint}' encountered an error: '{error}'"


class DaskGatewayServerError(OsparcErrorMixin, SchedulerError):
class DaskGatewayServerError(ComputationalSchedulerError):
msg_template = "The dask gateway on '{endpoint}' encountered an error: '{error}'"


class DaskClientAcquisisitonError(OsparcErrorMixin, SchedulerError):
class DaskClientAcquisisitonError(ComputationalSchedulerError):
msg_template = (
"The dask client to cluster '{cluster}' encountered an error '{error}'"
)
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
ComputationalBackendNotConnectedError,
ComputationalBackendOnDemandNotReadyError,
ComputationalSchedulerChangedError,
ComputationalSchedulerError,
DaskClientAcquisisitonError,
InvalidPipelineError,
PipelineNotFoundError,
SchedulerError,
TaskSchedulingError,
)
from ...core.settings import ComputationalBackendSettings
Expand Down Expand Up @@ -242,7 +242,7 @@ async def stop_pipeline(
}
if not possible_iterations:
msg = f"There are no pipeline scheduled for {user_id}:{project_id}"
raise SchedulerError(msg)
raise ComputationalSchedulerError(msg)
current_max_iteration = max(possible_iterations)
selected_iteration = current_max_iteration
else:
Expand Down Expand Up @@ -281,7 +281,7 @@ def _get_last_iteration(self, user_id: UserID, project_id: ProjectID) -> Iterati
}
if not possible_iterations:
msg = f"There are no pipeline scheduled for {user_id}:{project_id}"
raise SchedulerError(msg)
raise ComputationalSchedulerError(msg)
return max(possible_iterations)

def _start_scheduling(
Expand Down Expand Up @@ -342,10 +342,10 @@ async def _get_pipeline_tasks(
}
if len(pipeline_comp_tasks) != len(pipeline_dag.nodes()): # type: ignore[arg-type]
msg = (
f"{project_id}The tasks defined for {project_id} do not contain all"
f"The tasks defined for {project_id} do not contain all"
f" the tasks defined in the pipeline [{list(pipeline_dag.nodes)}]! Please check."
)
raise InvalidPipelineError(msg)
raise InvalidPipelineError(pipeline_id=project_id, msg=msg)
return pipeline_comp_tasks

async def _update_run_result_from_tasks(
Expand Down Expand Up @@ -929,10 +929,11 @@ async def _schedule_tasks_to_start( # noqa: C901
comp_tasks[NodeIDStr(f"{task}")].state = RunningState.FAILED
raise
except TaskSchedulingError as exc:
exc.error_context()["project_id"]
_logger.exception(
"Project '%s''s task '%s' could not be scheduled",
exc.project_id,
exc.node_id,
exc.error_context()["project_id"],
exc.error_context()["node_id"],
)
await CompTasksRepository.instance(
self.db_engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,10 @@ async def _process_task_result(
except TaskSchedulingError as err:
task_final_state = RunningState.FAILED
simcore_platform_status = SimcorePlatformStatus.BAD
errors = err.get_errors()
_logger.debug(
"Unexpected failure while processing results of %s: %s",
f"{task=}",
f"{errors=}",
f"{err=}",
)

# resource tracking
Expand Down
Loading

0 comments on commit 60e5447

Please sign in to comment.