Skip to content

Commit

Permalink
removing cluster information
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 3, 2024
1 parent 987c6ce commit 67a9f7a
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import arrow
from pydantic import BaseModel, ConfigDict, Field, PositiveInt

from .clusters import ClusterID
from .projects_nodes import NodeState
from .projects_nodes_io import NodeID
from .projects_state import RunningState
Expand Down Expand Up @@ -40,10 +39,6 @@ class ComputationTask(BaseModel):
...,
description="the iteration id of the computation task (none if no task ran yet)",
)
cluster_id: ClusterID | None = Field(
...,
description="the cluster on which the computaional task runs/ran (none if no task ran yet)",
)
started: datetime.datetime | None = Field(
...,
description="the timestamp when the computation was started or None if not started yet",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
ComputationGet,
ComputationStop,
)
from models_library.clusters import DEFAULT_CLUSTER_ID
from models_library.projects import ProjectAtDB, ProjectID
from models_library.projects_nodes_io import NodeID, NodeIDStr
from models_library.projects_nodes_io import NodeID
from models_library.projects_state import RunningState
from models_library.services import ServiceKeyVersion
from models_library.users import UserID
Expand Down Expand Up @@ -155,7 +154,7 @@ async def _get_project_node_names(
project_uuid: ProjectID, node_id: NodeID
) -> tuple[str, str]:
prj = await project_repo.get_project(project_uuid)
node_id_str = NodeIDStr(f"{node_id}")
node_id_str = f"{node_id}"
if node_id_str not in prj.workbench:
_logger.error(
"%s not found in %s. it is an ancestor of %s. Please check!",
Expand Down Expand Up @@ -228,7 +227,6 @@ async def _try_start_pipeline(
app,
user_id=computation.user_id,
project_id=computation.project_id,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=RunMetadataDict(
node_id_names_map={
NodeID(node_idstr): node_data.label
Expand Down Expand Up @@ -391,7 +389,6 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
else None
),
iteration=last_run.iteration if last_run else None,
cluster_id=last_run.cluster_id if last_run else None,
result=None,
started=compute_pipeline_started_timestamp(
minimal_computational_dag, comp_tasks
Expand Down Expand Up @@ -498,7 +495,6 @@ async def get_computation(
else None
),
iteration=last_run.iteration if last_run else None,
cluster_id=last_run.cluster_id if last_run else None,
result=None,
started=compute_pipeline_started_timestamp(pipeline_dag, all_tasks),
stopped=compute_pipeline_stopped_timestamp(pipeline_dag, all_tasks),
Expand Down Expand Up @@ -573,7 +569,6 @@ async def stop_computation(
url=TypeAdapter(AnyHttpUrl).validate_python(f"{request.url}"),
stop_url=None,
iteration=last_run.iteration if last_run else None,
cluster_id=last_run.cluster_id if last_run else None,
result=None,
started=compute_pipeline_started_timestamp(pipeline_dag, tasks),
stopped=compute_pipeline_stopped_timestamp(pipeline_dag, tasks),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
)
from ..modules.osparc_variables import substitutions
from .errors import (
ClusterAccessForbiddenError,
ClusterNotFoundError,
PipelineNotFoundError,
ProjectNetworkNotFoundError,
Expand Down Expand Up @@ -75,12 +74,6 @@ def _set_exception_handlers(app: FastAPI):
status.HTTP_404_NOT_FOUND, ClusterNotFoundError
),
)
app.add_exception_handler(
ClusterAccessForbiddenError,
make_http_error_handler_for_exception(
status.HTTP_403_FORBIDDEN, ClusterAccessForbiddenError
),
)

# SEE https://docs.python.org/3/library/exceptions.html#exception-hierarchy
app.add_exception_handler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,6 @@ class ClusterNotFoundError(ComputationalSchedulerError):
msg_template = "The cluster '{cluster_id}' not found"


class ClusterAccessForbiddenError(ComputationalSchedulerError):
msg_template = "Insufficient rights to access cluster '{cluster_id}'"


class ClusterInvalidOperationError(ComputationalSchedulerError):
msg_template = "Invalid operation on cluster '{cluster_id}'"


#
# SCHEDULER/CLIENT ERRORS
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import networkx as nx
from aiopg.sa import Engine
from fastapi import FastAPI
from models_library.clusters import ClusterID
from models_library.clusters import DEFAULT_CLUSTER_ID
from models_library.projects import ProjectID
from models_library.users import UserID
from servicelib.background_task import start_periodic_task, stop_periodic_task
Expand Down Expand Up @@ -36,7 +36,6 @@ async def run_new_pipeline(
*,
user_id: UserID,
project_id: ProjectID,
cluster_id: ClusterID,
run_metadata: RunMetadataDict,
use_on_demand_clusters: bool,
) -> None:
Expand All @@ -56,7 +55,7 @@ async def run_new_pipeline(
new_run = await CompRunsRepository.instance(db_engine).create(
user_id=user_id,
project_id=project_id,
cluster_id=cluster_id,
cluster_id=DEFAULT_CLUSTER_ID,
metadata=run_metadata,
use_on_demand_clusters=use_on_demand_clusters,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from dask_task_models_library.container_tasks.protocol import TaskOwner
from faker import Faker
from fastapi.applications import FastAPI
from models_library.clusters import DEFAULT_CLUSTER_ID
from models_library.projects import ProjectAtDB, ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.projects_state import RunningState
Expand Down Expand Up @@ -169,7 +168,6 @@ async def _assert_start_pipeline(
app,
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand Down Expand Up @@ -253,7 +251,6 @@ async def _return_tasks_pending(job_ids: list[str]) -> list[DaskClientTaskState]
mock.call(
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
tasks={f"{p.node_id}": p.image},
callback=mock.ANY,
metadata=mock.ANY,
Expand Down Expand Up @@ -651,7 +648,6 @@ async def _return_random_task_result(job_id) -> TaskOutputData:
mocked_dask_client.send_computation_tasks.assert_called_once_with(
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
tasks={
f"{next_pending_task.node_id}": next_pending_task.image,
},
Expand Down Expand Up @@ -1115,7 +1111,6 @@ async def test_broken_pipeline_configuration_is_not_scheduled_and_aborted(
initialized_app,
user_id=user["id"],
project_id=sleepers_project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand Down Expand Up @@ -1241,7 +1236,6 @@ async def test_handling_of_disconnected_scheduler_dask(
initialized_app,
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand Down Expand Up @@ -1749,7 +1743,6 @@ async def test_pipeline_with_on_demand_cluster_with_not_ready_backend_waits(
initialized_app,
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=True,
)
Expand Down Expand Up @@ -1854,7 +1847,6 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
initialized_app,
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=True,
)
Expand Down

0 comments on commit 67a9f7a

Please sign in to comment.