diff --git a/services/director-v2/src/simcore_service_director_v2/api/entrypoints.py b/services/director-v2/src/simcore_service_director_v2/api/entrypoints.py index 671fc78d2d4c..df3d607049cb 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/entrypoints.py +++ b/services/director-v2/src/simcore_service_director_v2/api/entrypoints.py @@ -2,7 +2,6 @@ from .._meta import API_VTAG from .routes import ( - clusters, computations, computations_tasks, dynamic_scheduler, @@ -27,7 +26,6 @@ v2_router.include_router( dynamic_services.router, tags=["dynamic services"], prefix="/dynamic_services" ) -v2_router.include_router(clusters.router, tags=["clusters"], prefix="/clusters") v2_router.include_router( dynamic_scheduler.router, tags=["dynamic scheduler"], prefix="/dynamic_scheduler" diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/clusters.py b/services/director-v2/src/simcore_service_director_v2/api/routes/clusters.py deleted file mode 100644 index 93532e4069a7..000000000000 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/clusters.py +++ /dev/null @@ -1,236 +0,0 @@ -import logging -from asyncio.log import logger -from typing import Final - -from aiocache import cached # type: ignore[import-untyped] -from fastapi import APIRouter, Depends, HTTPException -from models_library.api_schemas_directorv2.clusters import ( - ClusterCreate, - ClusterDetails, - ClusterDetailsGet, - ClusterGet, - ClusterPatch, - ClusterPing, -) -from models_library.clusters import DEFAULT_CLUSTER_ID, BaseCluster, ClusterID -from models_library.users import UserID -from starlette import status - -from ...core.errors import ( - ClusterInvalidOperationError, - ConfigurationError, - DaskClientAcquisisitonError, -) -from ...core.settings import ComputationalBackendSettings -from ...modules.dask_clients_pool import DaskClientsPool -from ...modules.db.repositories.clusters import ClustersRepository -from ...utils.dask_client_utils import test_scheduler_endpoint -from ..dependencies.dask import get_dask_clients_pool -from ..dependencies.database import get_repository -from ..dependencies.scheduler import get_scheduler_settings - -router = APIRouter() -log = logging.getLogger(__name__) - - -GET_CLUSTER_DETAILS_CACHING_TTL: Final[int] = 3 - - -def _build_cache_key(fct, *_, **kwargs): - return f"{fct.__name__}_{kwargs['cluster_id']}" - - -@cached(ttl=GET_CLUSTER_DETAILS_CACHING_TTL, key_builder=_build_cache_key) -async def _get_cluster_details_with_id( - settings: ComputationalBackendSettings, - user_id: UserID, - cluster_id: ClusterID, - clusters_repo: ClustersRepository, - dask_clients_pool: DaskClientsPool, -) -> ClusterDetails: - log.debug("Getting details for cluster '%s'", cluster_id) - cluster: BaseCluster = settings.default_cluster - if cluster_id != DEFAULT_CLUSTER_ID: - cluster = await clusters_repo.get_cluster(user_id, cluster_id) - async with dask_clients_pool.acquire(cluster) as client: - return await client.get_cluster_details() - - -@router.post( - "", - summary="Create a new cluster for a user", - response_model=ClusterGet, - status_code=status.HTTP_201_CREATED, -) -async def create_cluster( - user_id: UserID, - new_cluster: ClusterCreate, - clusters_repo: ClustersRepository = Depends(get_repository(ClustersRepository)), -): - return await clusters_repo.create_cluster(user_id, new_cluster) - - -@router.get("", summary="Lists clusters for user", response_model=list[ClusterGet]) -async def list_clusters( - user_id: UserID, - clusters_repo: ClustersRepository = Depends(get_repository(ClustersRepository)), - settings: ComputationalBackendSettings = Depends(get_scheduler_settings), -): - default_cluster = settings.default_cluster - return [default_cluster] + await clusters_repo.list_clusters(user_id) - - -@router.get( - "/default", - summary="Returns the default cluster", - response_model=ClusterGet, - status_code=status.HTTP_200_OK, -) -async def get_default_cluster( - settings: ComputationalBackendSettings = Depends(get_scheduler_settings), -): - return settings.default_cluster - - -@router.get( - "/{cluster_id}", - summary="Get one cluster for user", - response_model=ClusterGet, - status_code=status.HTTP_200_OK, -) -async def get_cluster( - user_id: UserID, - cluster_id: ClusterID, - clusters_repo: ClustersRepository = Depends(get_repository(ClustersRepository)), -): - return await clusters_repo.get_cluster(user_id, cluster_id) - - -@router.patch( - "/{cluster_id}", - summary="Modify a cluster for user", - response_model=ClusterGet, - status_code=status.HTTP_200_OK, -) -async def update_cluster( - user_id: UserID, - cluster_id: ClusterID, - updated_cluster: ClusterPatch, - clusters_repo: ClustersRepository = Depends(get_repository(ClustersRepository)), -): - try: - return await clusters_repo.update_cluster(user_id, cluster_id, updated_cluster) - except ClusterInvalidOperationError as e: - raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=f"{e}") from e - - -@router.delete( - "/{cluster_id}", - summary="Remove a cluster for user", - response_model=None, - status_code=status.HTTP_204_NO_CONTENT, -) -async def delete_cluster( - user_id: UserID, - cluster_id: ClusterID, - clusters_repo: ClustersRepository = Depends(get_repository(ClustersRepository)), -): - await clusters_repo.delete_cluster(user_id, cluster_id) - - -@router.get( - "/default/details", - summary="Returns the cluster details", - response_model=ClusterDetailsGet, - status_code=status.HTTP_200_OK, -) -async def get_default_cluster_details( - user_id: UserID, - settings: ComputationalBackendSettings = Depends(get_scheduler_settings), - clusters_repo: ClustersRepository = Depends(get_repository(ClustersRepository)), - dask_clients_pool: DaskClientsPool = Depends(get_dask_clients_pool), -): - default_cluster = await _get_cluster_details_with_id( - settings=settings, - user_id=user_id, - cluster_id=DEFAULT_CLUSTER_ID, - clusters_repo=clusters_repo, - dask_clients_pool=dask_clients_pool, - ) - logger.debug("found followind %s", f"{default_cluster=!r}") - return default_cluster - - -@router.get( - "/{cluster_id}/details", - summary="Returns the cluster details", - response_model=ClusterDetailsGet, - status_code=status.HTTP_200_OK, -) -async def get_cluster_details( - user_id: UserID, - cluster_id: ClusterID, - settings: ComputationalBackendSettings = Depends(get_scheduler_settings), - clusters_repo: ClustersRepository = Depends(get_repository(ClustersRepository)), - dask_clients_pool: DaskClientsPool = Depends(get_dask_clients_pool), -): - try: - cluster_details = await _get_cluster_details_with_id( - settings=settings, - user_id=user_id, - cluster_id=cluster_id, - clusters_repo=clusters_repo, - dask_clients_pool=dask_clients_pool, - ) - logger.debug("found following %s", f"{cluster_details=!r}") - return cluster_details - except DaskClientAcquisisitonError as exc: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, detail=f"{exc}" - ) from exc - - -@router.post( - ":ping", - summary="Test cluster connection", - response_model=None, - status_code=status.HTTP_204_NO_CONTENT, -) -async def test_cluster_connection( - cluster_auth: ClusterPing, -): - try: - return await test_scheduler_endpoint(endpoint=cluster_auth.endpoint) - - except ConfigurationError as e: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"{e}" - ) from e - - -@router.post( - "/default:ping", - summary="Test cluster connection", - response_model=None, - status_code=status.HTTP_204_NO_CONTENT, -) -async def test_default_cluster_connection( - settings: ComputationalBackendSettings = Depends(get_scheduler_settings), -): - cluster = settings.default_cluster - return await test_scheduler_endpoint(endpoint=cluster.endpoint) - - -@router.post( - "/{cluster_id}:ping", - summary="Test cluster connection", - response_model=None, - status_code=status.HTTP_204_NO_CONTENT, -) -async def test_specific_cluster_connection( - user_id: UserID, - cluster_id: ClusterID, - clusters_repo: ClustersRepository = Depends(get_repository(ClustersRepository)), -): - cluster = await clusters_repo.get_cluster(user_id, cluster_id) - return await test_scheduler_endpoint(endpoint=cluster.endpoint) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index fb2352ef8958..67fba79d3dce 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -74,7 +74,7 @@ from ..utils.dask_client_utils import ( DaskSubSystem, TaskHandlers, - create_internal_client_based_on_auth, + connect_to_dask_scheduler, ) _logger = logging.getLogger(__name__) @@ -133,7 +133,7 @@ async def create( ) -> "DaskClient": _logger.info( "Initiating connection to %s with auth: %s, type: %s", - f"dask-scheduler/gateway at {endpoint}", + f"dask-scheduler at {endpoint}", authentication, cluster_type, ) @@ -149,9 +149,7 @@ async def create( endpoint, attempt.retry_state.attempt_number, ) - backend = await create_internal_client_based_on_auth( - endpoint, authentication - ) + backend = await connect_to_dask_scheduler(endpoint, authentication) dask_utils.check_scheduler_status(backend.client) instance = cls( app=app, @@ -162,7 +160,7 @@ async def create( ) _logger.info( "Connection to %s succeeded [%s]", - f"dask-scheduler/gateway at {endpoint}", + f"dask-scheduler at {endpoint}", json.dumps(attempt.retry_state.retry_object.statistics), ) _logger.info( @@ -331,14 +329,12 @@ async def send_computation_tasks( ) dask_utils.check_communication_with_scheduler_is_open(self.backend.client) dask_utils.check_scheduler_status(self.backend.client) - # NOTE: in case it's a gateway or it is an on-demand cluster + # NOTE: in case it is an on-demand cluster # we do not check a priori if the task # is runnable because we CAN'T. A cluster might auto-scale, the worker(s) - # might also auto-scale and the gateway does not know that a priori. + # might also auto-scale we do not know that a priori. # So, we'll just send the tasks over and see what happens after a while. - if (self.cluster_type != ClusterTypeInModel.ON_DEMAND) and ( - self.backend.gateway is None - ): + if self.cluster_type != ClusterTypeInModel.ON_DEMAND: dask_utils.check_if_cluster_is_able_to_run_pipeline( project_id=project_id, node_id=node_id, diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py b/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py index 05233651d326..0ec66eeabdd8 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask_client_utils.py @@ -3,23 +3,16 @@ import socket from collections.abc import Awaitable, Callable from dataclasses import dataclass, field -from typing import Final import distributed -import httpx -from aiohttp import ClientConnectionError, ClientResponseError from dask_task_models_library.container_tasks.events import ( TaskLogEvent, TaskProgressEvent, ) -from models_library.clusters import ( - ClusterAuthentication, - InternalClusterAuthentication, - TLSAuthentication, -) +from models_library.clusters import ClusterAuthentication, TLSAuthentication from pydantic import AnyUrl -from ..core.errors import ComputationalSchedulerError, ConfigurationError +from ..core.errors import ConfigurationError from .dask import wrap_client_async_routine @@ -53,8 +46,8 @@ async def close(self) -> None: await wrap_client_async_routine(self.client.close()) -async def _connect_to_dask_scheduler( - endpoint: AnyUrl, authentication: InternalClusterAuthentication +async def connect_to_dask_scheduler( + endpoint: AnyUrl, authentication: ClusterAuthentication ) -> DaskSubSystem: try: security = distributed.Security() @@ -75,37 +68,3 @@ async def _connect_to_dask_scheduler( except TypeError as exc: msg = f"Scheduler has invalid configuration: {endpoint=}" raise ConfigurationError(msg=msg) from exc - - -async def create_internal_client_based_on_auth( - endpoint: AnyUrl, authentication: ClusterAuthentication -) -> DaskSubSystem: - return await _connect_to_dask_scheduler(endpoint, authentication) # type: ignore[arg-type] # _is_dask_scheduler checks already that it is a valid type - - -_PING_TIMEOUT_S: Final[int] = 5 -_DASK_SCHEDULER_RUNNING_STATE: Final[str] = "running" - - -async def test_scheduler_endpoint(endpoint: AnyUrl) -> None: - """This method will try to connect to a scheduler endpoint and raise a ConfigurationError in case of problem - - :raises ConfigurationError: contians some information as to why the connection failed - """ - try: - async with distributed.Client( - address=f"{endpoint}", timeout=f"{_PING_TIMEOUT_S}", asynchronous=True - ) as dask_client: - if dask_client.status != _DASK_SCHEDULER_RUNNING_STATE: - msg = "internal scheduler is not running!" - raise ComputationalSchedulerError(msg=msg) # noqa: TRY301 - - except ( - ClientConnectionError, - ClientResponseError, - httpx.HTTPError, - ComputationalSchedulerError, - ) as exc: - logger.debug("Pinging %s, failed: %s", f"{endpoint=}", f"{exc=!r}") - msg = f"Could not connect to cluster in {endpoint}: error: {exc}" - raise ConfigurationError(msg=msg) from exc diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index bfab7253b692..7c083fbf9581 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -986,7 +986,6 @@ async def test_changed_scheduler_raises_exception( mocked_user_completed_cb.assert_not_called() -@pytest.mark.flaky(max_runs=3) @pytest.mark.parametrize("fail_remote_fct", [False, True]) async def test_get_tasks_status( dask_client: DaskClient, diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_clusters.py b/services/director-v2/tests/unit/with_dbs/test_api_route_clusters.py deleted file mode 100644 index db6d1e017b08..000000000000 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_clusters.py +++ /dev/null @@ -1,793 +0,0 @@ -# pylint:disable=unused-variable -# pylint:disable=unused-argument -# pylint:disable=redefined-outer-name - -import random -from collections.abc import Awaitable, Callable, Iterator -from typing import Any - -import httpx -import pytest -import sqlalchemy as sa -from common_library.serialization import model_dump_with_secrets -from distributed.deploy.spec import SpecCluster -from faker import Faker -from httpx import URL -from models_library.api_schemas_directorv2.clusters import ( - ClusterCreate, - ClusterGet, - ClusterPatch, - ClusterPing, -) -from models_library.clusters import ( - CLUSTER_ADMIN_RIGHTS, - CLUSTER_MANAGER_RIGHTS, - CLUSTER_NO_RIGHTS, - CLUSTER_USER_RIGHTS, - Cluster, - ClusterAccessRights, - ClusterAuthentication, - SimpleAuthentication, -) -from pydantic import SecretStr, TypeAdapter -from pytest_simcore.helpers.typing_env import EnvVarsDict -from simcore_postgres_database.models.clusters import ClusterType, clusters -from starlette import status - -pytest_simcore_core_services_selection = [ - "postgres", -] -pytest_simcore_ops_services_selection = [ - "adminer", -] - - -@pytest.fixture() -def clusters_config( - mock_env: EnvVarsDict, - postgres_db: sa.engine.Engine, - postgres_host_config: dict[str, str], - monkeypatch: pytest.MonkeyPatch, - dask_spec_local_cluster: SpecCluster, - faker: Faker, -): - monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "1") - monkeypatch.setenv("R_CLONE_PROVIDER", "MINIO") - monkeypatch.setenv("S3_ENDPOINT", faker.url()) - monkeypatch.setenv("S3_ACCESS_KEY", faker.pystr()) - monkeypatch.setenv("S3_REGION", faker.pystr()) - monkeypatch.setenv("S3_SECRET_KEY", faker.pystr()) - monkeypatch.setenv("S3_BUCKET_NAME", faker.pystr()) - - -@pytest.fixture -def cluster_simple_authentication(faker: Faker) -> Callable[[], dict[str, Any]]: - def creator() -> dict[str, Any]: - simple_auth = { - "type": "simple", - "username": faker.user_name(), - "password": faker.password(), - } - assert SimpleAuthentication.model_validate(simple_auth) - return simple_auth - - return creator - - -@pytest.fixture -def clusters_cleaner(postgres_db: sa.engine.Engine) -> Iterator: - yield - with postgres_db.connect() as conn: - conn.execute(sa.delete(clusters)) - - -async def test_list_clusters( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - async_client: httpx.AsyncClient, -): - user_1 = registered_user() - list_clusters_url = URL(f"/v2/clusters?user_id={user_1['id']}") - # there is no cluster at the moment, the list shall contain the default cluster - response = await async_client.get(list_clusters_url) - assert response.status_code == status.HTTP_200_OK - returned_clusters_list = TypeAdapter(list[ClusterGet]).validate_python( - response.json() - ) - assert ( - len(returned_clusters_list) == 1 - ), f"no default cluster in {returned_clusters_list=}" - assert ( - returned_clusters_list[0].id == 0 - ), "default cluster id is not the one expected" - - # let's create some clusters - NUM_CLUSTERS = 111 - for n in range(NUM_CLUSTERS): - await create_cluster(user_1, name=f"pytest cluster{n:04}") - - response = await async_client.get(list_clusters_url) - assert response.status_code == status.HTTP_200_OK - returned_clusters_list = TypeAdapter(list[ClusterGet]).validate_python( - response.json() - ) - assert ( - len(returned_clusters_list) == NUM_CLUSTERS + 1 - ) # the default cluster comes on top of the NUM_CLUSTERS - assert ( - returned_clusters_list[0].id == 0 - ), "the first cluster shall be the platform default cluster" - - # now create a second user and check the clusters are not seen by it BUT the default one - user_2 = registered_user() - response = await async_client.get(f"/v2/clusters?user_id={user_2['id']}") - assert response.status_code == status.HTTP_200_OK - returned_clusters_list = TypeAdapter(list[ClusterGet]).validate_python( - response.json() - ) - assert ( - len(returned_clusters_list) == 1 - ), f"no default cluster in {returned_clusters_list=}" - assert ( - returned_clusters_list[0].id == 0 - ), "default cluster id is not the one expected" - - # let's create a few more clusters owned by user_1 with specific rights - for rights, name in [ - (CLUSTER_NO_RIGHTS, "no rights"), - (CLUSTER_USER_RIGHTS, "user rights"), - (CLUSTER_MANAGER_RIGHTS, "manager rights"), - (CLUSTER_ADMIN_RIGHTS, "admin rights"), - ]: - await create_cluster( - user_1, # cluster is owned by user_1 - name=f"cluster with {name}", - access_rights={ - user_1["primary_gid"]: CLUSTER_ADMIN_RIGHTS, - user_2["primary_gid"]: rights, - }, - ) - - response = await async_client.get(f"/v2/clusters?user_id={user_2['id']}") - assert response.status_code == status.HTTP_200_OK - user_2_clusters = TypeAdapter(list[ClusterGet]).validate_python(response.json()) - # we should find 3 clusters + the default cluster - assert len(user_2_clusters) == 3 + 1 - for name in [ - "cluster with user rights", - "cluster with manager rights", - "cluster with admin rights", - ]: - clusters = list( - filter( - lambda cluster, name=name: cluster.name == name, - user_2_clusters, - ), - ) - assert len(clusters) == 1, f"missing cluster with {name=}" - - -async def test_get_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - async_client: httpx.AsyncClient, -): - user_1 = registered_user() - # try to get one that does not exist - response = await async_client.get( - f"/v2/clusters/15615165165165?user_id={user_1['id']}" - ) - assert response.status_code == status.HTTP_404_NOT_FOUND - # let's create some clusters - a_bunch_of_clusters = [ - await create_cluster(user_1, name=f"pytest cluster{n:04}") for n in range(111) - ] - the_cluster = random.choice(a_bunch_of_clusters) - - # there is no cluster at the moment, the list is empty - response = await async_client.get( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}" - ) - assert response.status_code == status.HTTP_200_OK, f"received {response.text}" - returned_cluster = ClusterGet.model_validate(response.json()) - assert returned_cluster - assert the_cluster.model_dump( - exclude={"authentication"} - ) == returned_cluster.model_dump(exclude={"authentication"}) - - user_2 = registered_user() - # getting the same cluster for user 2 shall return 403 - response = await async_client.get( - f"/v2/clusters/{the_cluster.id}?user_id={user_2['id']}" - ) - assert ( - response.status_code == status.HTTP_403_FORBIDDEN - ), f"received {response.text}" - # let's create a few cluster for user 2 and share some with user 1 - for rights, user_1_expected_access in [ - (CLUSTER_NO_RIGHTS, False), - (CLUSTER_USER_RIGHTS, True), - (CLUSTER_MANAGER_RIGHTS, True), - (CLUSTER_ADMIN_RIGHTS, True), - ]: - a_cluster = await create_cluster( - user_2, # cluster is owned by user_2 - access_rights={ - user_2["primary_gid"]: CLUSTER_ADMIN_RIGHTS, - user_1["primary_gid"]: rights, - }, - ) - # now let's check that user_1 can access only the correct ones - response = await async_client.get( - f"/v2/clusters/{a_cluster.id}?user_id={user_1['id']}" - ) - assert ( - response.status_code == status.HTTP_200_OK - if user_1_expected_access - else status.HTTP_403_FORBIDDEN - ), f"received {response.text}" - - -@pytest.mark.parametrize( - "cluster_sharing_rights, can_use", - [ - pytest.param(CLUSTER_ADMIN_RIGHTS, True, id="SHARE_WITH_ADMIN_RIGHTS"), - pytest.param(CLUSTER_MANAGER_RIGHTS, True, id="SHARE_WITH_MANAGER_RIGHTS"), - pytest.param(CLUSTER_USER_RIGHTS, True, id="SHARE_WITH_USER_RIGHTS"), - pytest.param(CLUSTER_NO_RIGHTS, False, id="DENY_RIGHTS"), - ], -) -async def test_get_another_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - async_client: httpx.AsyncClient, - cluster_sharing_rights: ClusterAccessRights, - can_use: bool, -): - user_1 = registered_user() - user_2 = registered_user() - # let's create some clusters - a_bunch_of_clusters = [ - await create_cluster( - user_1, - name=f"pytest cluster{n:04}", - access_rights={ - user_1["primary_gid"]: CLUSTER_ADMIN_RIGHTS, - user_2["primary_gid"]: cluster_sharing_rights, - }, - ) - for n in range(111) - ] - the_cluster = random.choice(a_bunch_of_clusters) - # try to get the cluster as user 2 - response = await async_client.get( - f"/v2/clusters/{the_cluster.id}?user_id={user_2['id']}" - ) - assert ( - response.status_code == status.HTTP_200_OK - if can_use - else status.HTTP_403_FORBIDDEN - ), f"received {response.text}" - - -@pytest.mark.parametrize("with_query", [True, False]) -async def test_get_default_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - async_client: httpx.AsyncClient, - with_query: bool, -): - user_1 = registered_user() - - get_cluster_url = URL("/v2/clusters/default") - if with_query: - get_cluster_url = URL(f"/v2/clusters/default?user_id={user_1['id']}") - response = await async_client.get(get_cluster_url) - assert response.status_code == status.HTTP_200_OK, f"received {response.text}" - returned_cluster = ClusterGet.model_validate(response.json()) - assert returned_cluster - assert returned_cluster.id == 0 - assert returned_cluster.name == "Default cluster" - assert 1 in returned_cluster.access_rights # everyone group is always 1 - assert returned_cluster.access_rights[1] == CLUSTER_USER_RIGHTS - - -async def test_create_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - cluster_simple_authentication: Callable, - async_client: httpx.AsyncClient, - faker: Faker, - postgres_db: sa.engine.Engine, - clusters_cleaner, -): - user_1 = registered_user() - create_cluster_url = URL(f"/v2/clusters?user_id={user_1['id']}") - cluster_data = ClusterCreate( - endpoint=faker.uri(), - authentication=cluster_simple_authentication(), - name=faker.name(), - type=random.choice(list(ClusterType)), - owner=faker.pyint(min_value=1), - ) - response = await async_client.post( - create_cluster_url, - json=model_dump_with_secrets( - cluster_data, - show_secrets=True, - by_alias=True, - exclude_unset=True, - ), - ) - assert response.status_code == status.HTTP_201_CREATED, f"received: {response.text}" - created_cluster = ClusterGet.model_validate(response.json()) - assert created_cluster - - assert cluster_data.model_dump( - exclude={"id", "owner", "access_rights", "authentication"} - ) == created_cluster.model_dump( - exclude={"id", "owner", "access_rights", "authentication"} - ) - - assert created_cluster.id is not None - assert created_cluster.owner == user_1["primary_gid"] - assert created_cluster.access_rights == { - user_1["primary_gid"]: CLUSTER_ADMIN_RIGHTS - } - - # let's check that DB is correctly setup, there is one entry - with postgres_db.connect() as conn: - conn.execute( - sa.select(clusters).where(clusters.c.name == cluster_data.name) - ).one() - - -async def test_update_own_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - cluster_simple_authentication: Callable, - async_client: httpx.AsyncClient, - faker: Faker, -): - _PATCH_EXPORT = {"by_alias": True, "exclude_unset": True, "exclude_none": True} - user_1 = registered_user() - # try to modify one that does not exist - response = await async_client.patch( - f"/v2/clusters/15615165165165?user_id={user_1['id']}", - json=model_dump_with_secrets( - ClusterPatch(), show_secrets=True, **_PATCH_EXPORT - ), - ) - assert response.status_code == status.HTTP_404_NOT_FOUND - # let's create some clusters - a_bunch_of_clusters = [ - await create_cluster(user_1, name=f"pytest cluster{n:04}") for n in range(111) - ] - the_cluster = random.choice(a_bunch_of_clusters) - # get the original one - response = await async_client.get( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}" - ) - assert response.status_code == status.HTTP_200_OK, f"received {response.text}" - original_cluster = ClusterGet.model_validate(response.json()) - - # now we modify nothing - response = await async_client.patch( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}", - json=model_dump_with_secrets( - ClusterPatch(), show_secrets=True, **_PATCH_EXPORT - ), - ) - assert response.status_code == status.HTTP_200_OK, f"received {response.text}" - returned_cluster = ClusterGet.model_validate(response.json()) - assert returned_cluster.model_dump() == original_cluster.model_dump() - - # modify some simple things - expected_modified_cluster = original_cluster.model_copy() - for cluster_patch in [ - ClusterPatch(name=faker.name()), - ClusterPatch(description=faker.text()), - ClusterPatch(type=ClusterType.ON_PREMISE), - ClusterPatch(thumbnail=faker.uri()), - ClusterPatch(endpoint=faker.uri()), - ClusterPatch(authentication=cluster_simple_authentication()), - ]: - jsonable_cluster_patch = model_dump_with_secrets( - cluster_patch, show_secrets=True, **_PATCH_EXPORT - ) - print(f"--> patching cluster with {jsonable_cluster_patch}") - response = await async_client.patch( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}", - json=jsonable_cluster_patch, - ) - assert response.status_code == status.HTTP_200_OK, f"received {response.text}" - returned_cluster = ClusterGet.model_validate(response.json()) - expected_modified_cluster = expected_modified_cluster.model_copy( - update=cluster_patch.model_dump(**_PATCH_EXPORT) - ) - assert returned_cluster.model_dump( - exclude={"authentication": {"password"}} - ) == expected_modified_cluster.model_dump( - exclude={"authentication": {"password"}} - ) - - # we can change the access rights, the owner rights are always kept - user_2 = registered_user() - - for rights in [ - CLUSTER_ADMIN_RIGHTS, - CLUSTER_MANAGER_RIGHTS, - CLUSTER_USER_RIGHTS, - CLUSTER_NO_RIGHTS, - ]: - cluster_patch = ClusterPatch(accessRights={user_2["primary_gid"]: rights}) - response = await async_client.patch( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}", - json=cluster_patch.model_dump(**_PATCH_EXPORT), - ) - assert response.status_code == status.HTTP_200_OK, f"received {response.text}" - returned_cluster = ClusterGet.model_validate(response.json()) - - expected_modified_cluster.access_rights[user_2["primary_gid"]] = rights - assert returned_cluster.model_dump( - exclude={"authentication": {"password"}} - ) == expected_modified_cluster.model_dump( - exclude={"authentication": {"password"}} - ) - # we can change the owner since we are admin - cluster_patch = ClusterPatch(owner=user_2["primary_gid"]) - response = await async_client.patch( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}", - json=model_dump_with_secrets(cluster_patch, show_secrets=True, **_PATCH_EXPORT), - ) - assert response.status_code == status.HTTP_200_OK, f"received {response.text}" - returned_cluster = ClusterGet.model_validate(response.json()) - expected_modified_cluster.owner = user_2["primary_gid"] - expected_modified_cluster.access_rights[ - user_2["primary_gid"] - ] = CLUSTER_ADMIN_RIGHTS - assert returned_cluster.model_dump( - exclude={"authentication": {"password"}} - ) == expected_modified_cluster.model_dump(exclude={"authentication": {"password"}}) - - # we should not be able to reduce the rights of the new owner - cluster_patch = ClusterPatch( - accessRights={user_2["primary_gid"]: CLUSTER_NO_RIGHTS} - ) - response = await async_client.patch( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}", - json=model_dump_with_secrets(cluster_patch, show_secrets=True, **_PATCH_EXPORT), - ) - assert ( - response.status_code == status.HTTP_403_FORBIDDEN - ), f"received {response.text}" - - -async def test_update_default_cluster_fails( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - cluster_simple_authentication: Callable, - async_client: httpx.AsyncClient, - faker: Faker, -): - _PATCH_EXPORT = {"by_alias": True, "exclude_unset": True, "exclude_none": True} - user_1 = registered_user() - # try to modify one that does not exist - response = await async_client.patch( - f"/v2/clusters/default?user_id={user_1['id']}", - json=model_dump_with_secrets( - ClusterPatch(), show_secrets=True, **_PATCH_EXPORT - ), - ) - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY - - -@pytest.mark.parametrize( - "cluster_sharing_rights, can_use, can_manage, can_administer", - [ - pytest.param( - CLUSTER_ADMIN_RIGHTS, True, True, True, id="SHARE_WITH_ADMIN_RIGHTS" - ), - pytest.param( - CLUSTER_MANAGER_RIGHTS, True, True, False, id="SHARE_WITH_MANAGER_RIGHTS" - ), - pytest.param( - CLUSTER_USER_RIGHTS, True, False, False, id="SHARE_WITH_USER_RIGHTS" - ), - pytest.param(CLUSTER_NO_RIGHTS, False, False, False, id="DENY_RIGHTS"), - ], -) -async def test_update_another_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - cluster_simple_authentication: Callable, - async_client: httpx.AsyncClient, - faker: Faker, - cluster_sharing_rights: ClusterAccessRights, - can_use: bool, - can_manage: bool, - can_administer: bool, -): - """user_1 is the owner and administrator, he/she gives some rights to user 2""" - - _PATCH_EXPORT = {"by_alias": True, "exclude_unset": True, "exclude_none": True} - user_1 = registered_user() - user_2 = registered_user() - # let's create some clusters - a_bunch_of_clusters = [ - await create_cluster( - user_1, - name=f"pytest cluster{n:04}", - access_rights={ - user_1["primary_gid"]: CLUSTER_ADMIN_RIGHTS, - user_2["primary_gid"]: cluster_sharing_rights, - }, - ) - for n in range(111) - ] - the_cluster = random.choice(a_bunch_of_clusters) - # get the original one - response = await async_client.get( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}" - ) - assert response.status_code == status.HTTP_200_OK, f"received {response.text}" - ClusterGet.model_validate(response.json()) - - # let's try to modify stuff as we are user 2 - for cluster_patch in [ - ClusterPatch(name=faker.name()), - ClusterPatch(description=faker.text()), - ClusterPatch(type=ClusterType.ON_PREMISE), - ClusterPatch(thumbnail=faker.uri()), - ClusterPatch(endpoint=faker.uri()), - ClusterPatch(authentication=cluster_simple_authentication()), - ]: - response = await async_client.patch( - f"/v2/clusters/{the_cluster.id}?user_id={user_2['id']}", - json=model_dump_with_secrets( - cluster_patch, show_secrets=True, **_PATCH_EXPORT - ), - ) - assert ( - response.status_code == status.HTTP_200_OK - if can_manage - else status.HTTP_403_FORBIDDEN - ), f"received {response.text}" - - # let's try to add/remove someone (reserved to managers) - user_3 = registered_user() - for rights in [ - CLUSTER_USER_RIGHTS, # add user - CLUSTER_NO_RIGHTS, # remove user - ]: - # try to add user 3 - cluster_patch = ClusterPatch(accessRights={user_3["primary_gid"]: rights}) - response = await async_client.patch( - f"/v2/clusters/{the_cluster.id}?user_id={user_2['id']}", - json=model_dump_with_secrets( - cluster_patch, show_secrets=True, **_PATCH_EXPORT - ), - ) - assert ( - response.status_code == status.HTTP_200_OK - if can_manage - else status.HTTP_403_FORBIDDEN - ), f"received {response.text} while {'adding' if rights == CLUSTER_USER_RIGHTS else 'removing'} user" - - # modify rights to admin/manager (reserved to administrators) - for rights in [ - CLUSTER_ADMIN_RIGHTS, - CLUSTER_MANAGER_RIGHTS, - ]: - cluster_patch = ClusterPatch(accessRights={user_3["primary_gid"]: rights}) - response = await async_client.patch( - f"/v2/clusters/{the_cluster.id}?user_id={user_2['id']}", - json=model_dump_with_secrets( - cluster_patch, show_secrets=True, **_PATCH_EXPORT - ), - ) - assert ( - response.status_code == status.HTTP_200_OK - if can_administer - else status.HTTP_403_FORBIDDEN - ), f"received {response.text}" - - -async def test_delete_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - async_client: httpx.AsyncClient, -): - user_1 = registered_user() - # let's create some clusters - a_bunch_of_clusters = [ - await create_cluster( - user_1, - name=f"pytest cluster{n:04}", - access_rights={ - user_1["primary_gid"]: CLUSTER_ADMIN_RIGHTS, - }, - ) - for n in range(111) - ] - the_cluster = random.choice(a_bunch_of_clusters) - # let's delete that cluster - response = await async_client.delete( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}" - ) - assert ( - response.status_code == status.HTTP_204_NO_CONTENT - ), f"received {response.text}" - # now check it is gone - response = await async_client.get( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}" - ) - assert ( - response.status_code == status.HTTP_404_NOT_FOUND - ), f"received {response.text}" - - -@pytest.mark.parametrize( - "cluster_sharing_rights, can_administer", - [ - pytest.param(CLUSTER_ADMIN_RIGHTS, True, id="SHARE_WITH_ADMIN_RIGHTS"), - pytest.param(CLUSTER_MANAGER_RIGHTS, False, id="SHARE_WITH_MANAGER_RIGHTS"), - pytest.param(CLUSTER_USER_RIGHTS, False, id="SHARE_WITH_USER_RIGHTS"), - pytest.param(CLUSTER_NO_RIGHTS, False, id="DENY_RIGHTS"), - ], -) -async def test_delete_another_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - cluster_simple_authentication: Callable, - async_client: httpx.AsyncClient, - faker: Faker, - cluster_sharing_rights: ClusterAccessRights, - can_administer: bool, -): - user_1 = registered_user() - user_2 = registered_user() - # let's create some clusters - a_bunch_of_clusters = [ - await create_cluster( - user_1, - name=f"pytest cluster{n:04}", - access_rights={ - user_1["primary_gid"]: CLUSTER_ADMIN_RIGHTS, - user_2["primary_gid"]: cluster_sharing_rights, - }, - ) - for n in range(111) - ] - the_cluster = random.choice(a_bunch_of_clusters) - # let's delete that cluster as user_2 - response = await async_client.delete( - f"/v2/clusters/{the_cluster.id}?user_id={user_2['id']}" - ) - assert ( - response.status_code == status.HTTP_204_NO_CONTENT - if can_administer - else status.HTTP_403_FORBIDDEN - ), f"received {response.text}" - # now check it is gone or still around - response = await async_client.get( - f"/v2/clusters/{the_cluster.id}?user_id={user_1['id']}" - ) - assert ( - response.status_code == status.HTTP_404_NOT_FOUND - if can_administer - else status.HTTP_200_OK - ), f"received {response.text}" - - -async def test_delete_default_cluster_fails( - clusters_config: None, - registered_user: Callable[..., dict], - async_client: httpx.AsyncClient, -): - user_1 = registered_user() - response = await async_client.delete(f"/v2/clusters/default?user_id={user_1['id']}") - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY - - -async def test_ping_invalid_cluster_raises_422( - clusters_config: None, - async_client: httpx.AsyncClient, - faker: Faker, - cluster_simple_authentication: Callable[[], dict[str, Any]], -): - # calling with wrong data raises - response = await async_client.post("/v2/clusters:ping", json={}) - with pytest.raises(httpx.HTTPStatusError): - response.raise_for_status() - - # calling with correct data but non existing cluster also raises - some_fake_cluster = ClusterPing( - endpoint=faker.url(), - authentication=TypeAdapter(ClusterAuthentication).validate_python( - cluster_simple_authentication() - ), - ) - response = await async_client.post( - "/v2/clusters:ping", - json=model_dump_with_secrets( - some_fake_cluster, show_secrets=True, by_alias=True - ), - ) - with pytest.raises(httpx.HTTPStatusError): - response.raise_for_status() - - -async def test_ping_cluster( - clusters_config: None, async_client: httpx.AsyncClient, faker: Faker -): - valid_cluster = ClusterPing( - endpoint=faker.uri(), - authentication=SimpleAuthentication( - username="pytest_user", - password=TypeAdapter(SecretStr).validate_python(faker.password()), - ), - ) - response = await async_client.post( - "/v2/clusters:ping", - json=model_dump_with_secrets(valid_cluster, show_secrets=True, by_alias=True), - ) - response.raise_for_status() - assert response.status_code == status.HTTP_204_NO_CONTENT - - -async def test_ping_specific_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - create_cluster: Callable[..., Awaitable[Cluster]], - async_client: httpx.AsyncClient, - faker: Faker, -): - user_1 = registered_user() - # try to ping one that does not exist - response = await async_client.get( - f"/v2/clusters/15615165165165:ping?user_id={user_1['id']}" - ) - assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY - - # let's create some clusters and ping one - a_bunch_of_clusters = [ - await create_cluster( - user_1, - name=f"pytest cluster{n:04}", - endpoint=faker.uri(), - authentication=SimpleAuthentication( - username="pytest_user", - password=TypeAdapter(SecretStr).validate_python(faker.password()), - ), - ) - for n in range(111) - ] - the_cluster = random.choice(a_bunch_of_clusters) - - response = await async_client.post( - f"/v2/clusters/{the_cluster.id}:ping?user_id={user_1['id']}", - ) - response.raise_for_status() - assert response.status_code == status.HTTP_204_NO_CONTENT - - -async def test_ping_default_cluster( - clusters_config: None, - registered_user: Callable[..., dict], - async_client: httpx.AsyncClient, -): - user_1 = registered_user() - # try to ping one that does not exist - response = await async_client.post( - f"/v2/clusters/default:ping?user_id={user_1['id']}" - ) - assert response.status_code == status.HTTP_204_NO_CONTENT diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_clusters_details.py b/services/director-v2/tests/unit/with_dbs/test_api_route_clusters_details.py deleted file mode 100644 index 557e6305d5f1..000000000000 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_clusters_details.py +++ /dev/null @@ -1,103 +0,0 @@ -# pylint:disable=unused-variable -# pylint:disable=unused-argument -# pylint:disable=redefined-outer-name - -from collections.abc import Awaitable, Callable -from typing import Any - -import httpx -import pytest -import sqlalchemy as sa -from distributed.deploy.spec import SpecCluster -from faker import Faker -from models_library.api_schemas_directorv2.clusters import ClusterDetailsGet -from models_library.clusters import Cluster, ClusterID, SimpleAuthentication -from models_library.users import UserID -from pydantic import SecretStr -from pytest_simcore.helpers.typing_env import EnvVarsDict -from starlette import status - -pytest_simcore_core_services_selection = [ - "postgres", -] -pytest_simcore_ops_services_selection = [ - "adminer", -] - - -@pytest.fixture() -def clusters_config( - mock_env: EnvVarsDict, - postgres_db: sa.engine.Engine, - postgres_host_config: dict[str, str], - monkeypatch: pytest.MonkeyPatch, - dask_spec_local_cluster: SpecCluster, - faker: Faker, -): - monkeypatch.setenv("COMPUTATIONAL_BACKEND_DASK_CLIENT_ENABLED", "1") - monkeypatch.setenv("R_CLONE_PROVIDER", "MINIO") - monkeypatch.setenv("S3_ENDPOINT", faker.url()) - monkeypatch.setenv("S3_ACCESS_KEY", faker.pystr()) - monkeypatch.setenv("S3_REGION", faker.pystr()) - monkeypatch.setenv("S3_SECRET_KEY", faker.pystr()) - monkeypatch.setenv("S3_BUCKET_NAME", faker.pystr()) - - -async def test_get_default_cluster_details( - clusters_config: None, - registered_user: Callable, - async_client: httpx.AsyncClient, -): - user_1 = registered_user() - - # This test checks that the default cluster is accessible - # the default cluster is the osparc internal cluster available through a dask-scheduler - response = await async_client.get( - f"/v2/clusters/default/details?user_id={user_1['id']}" - ) - assert response.status_code == status.HTTP_200_OK - default_cluster_out = ClusterDetailsGet.model_validate(response.json()) - response = await async_client.get( - f"/v2/clusters/{0}/details?user_id={user_1['id']}" - ) - assert response.status_code == status.HTTP_200_OK - assert default_cluster_out == ClusterDetailsGet.model_validate(response.json()) - - -async def _get_cluster_details( - async_client: httpx.AsyncClient, user_id: UserID, cluster_id: ClusterID -) -> ClusterDetailsGet: - response = await async_client.get( - f"/v2/clusters/{cluster_id}/details?user_id={user_id}" - ) - assert response.status_code == status.HTTP_200_OK - print(f"<-- received cluster details response {response=}") - cluster_out = ClusterDetailsGet.model_validate(response.json()) - assert cluster_out - print(f"<-- received cluster details {cluster_out=}") - assert cluster_out.scheduler, "the cluster's scheduler is not started!" - return cluster_out - - -async def test_get_cluster_details( - clusters_config: None, - registered_user: Callable[..., dict[str, Any]], - async_client: httpx.AsyncClient, - create_cluster: Callable[..., Awaitable[Cluster]], - faker: Faker, -): - user_1 = registered_user() - # define the cluster in the DB - some_cluster = await create_cluster( - user_1, - endpoint=faker.uri(), - authentication=SimpleAuthentication( - username=faker.user_name(), - password=SecretStr(faker.password()), - ).model_dump(by_alias=True), - ) - # in its present state, the cluster should have no workers - cluster_out = await _get_cluster_details( - async_client, user_1["id"], some_cluster.id - ) - assert not cluster_out.scheduler.workers, "the cluster should not have any worker!"