Skip to content

Commit

Permalink
removed cluster_id from scheduler interface
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Dec 3, 2024
1 parent 67a9f7a commit dd8ce53
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest
from _helpers import PublishedProject
from faker import Faker
from models_library.clusters import DEFAULT_CLUSTER_ID, Cluster
from models_library.clusters import DEFAULT_CLUSTER_ID
from models_library.projects import ProjectID
from models_library.projects_state import RunningState
from models_library.users import UserID
Expand Down Expand Up @@ -260,7 +260,6 @@ async def test_create(
run_metadata: RunMetadataDict,
faker: Faker,
publish_project: Callable[[], Awaitable[PublishedProject]],
create_cluster: Callable[..., Awaitable[Cluster]],
):
with pytest.raises(ProjectNotFoundError):
await CompRunsRepository(aiopg_engine).create(
Expand Down Expand Up @@ -324,15 +323,6 @@ async def test_create(
metadata=run_metadata,
use_on_demand_clusters=faker.pybool(),
)
cluster = await create_cluster(published_project.user)
await CompRunsRepository(aiopg_engine).create(
user_id=published_project.user["id"],
project_id=published_project.project.uuid,
cluster_id=cluster.id,
iteration=None,
metadata=run_metadata,
use_on_demand_clusters=faker.pybool(),
)


async def test_update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ async def test_schedule_all_pipelines(
initialized_app,
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand Down Expand Up @@ -260,7 +259,6 @@ async def test_schedule_all_pipelines_logs_error_if_it_find_old_pipelines(
initialized_app,
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand Down Expand Up @@ -345,7 +343,6 @@ async def test_empty_pipeline_is_not_scheduled(
initialized_app,
user_id=user["id"],
project_id=empty_project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand All @@ -361,7 +358,6 @@ async def test_empty_pipeline_is_not_scheduled(
initialized_app,
user_id=user["id"],
project_id=empty_project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import pytest
from _helpers import PublishedProject
from fastapi import FastAPI
from models_library.clusters import DEFAULT_CLUSTER_ID
from pytest_mock import MockerFixture
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
Expand Down Expand Up @@ -66,7 +65,6 @@ async def test_worker_properly_autocalls_scheduler_api(
initialized_app,
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand Down Expand Up @@ -123,7 +121,6 @@ async def _project_pipeline_creation_workflow() -> None:
initialized_app,
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
cluster_id=DEFAULT_CLUSTER_ID,
run_metadata=run_metadata,
use_on_demand_clusters=False,
)
Expand Down
86 changes: 0 additions & 86 deletions services/director-v2/tests/unit/with_dbs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@
from _helpers import PublishedProject, RunningProject
from faker import Faker
from fastapi.encoders import jsonable_encoder
from models_library.clusters import Cluster
from models_library.projects import ProjectAtDB, ProjectID
from models_library.projects_nodes_io import NodeID
from pydantic.main import BaseModel
from simcore_postgres_database.models.cluster_to_groups import cluster_to_groups
from simcore_postgres_database.models.clusters import clusters
from simcore_postgres_database.models.comp_pipeline import StateType, comp_pipeline
from simcore_postgres_database.models.comp_runs import comp_runs
from simcore_postgres_database.models.comp_tasks import comp_tasks
Expand All @@ -34,8 +31,6 @@
from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB, Image
from simcore_service_director_v2.utils.computations import to_node_class
from simcore_service_director_v2.utils.dask import generate_dask_job_id
from simcore_service_director_v2.utils.db import to_clusters_db
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncEngine


Expand Down Expand Up @@ -223,87 +218,6 @@ async def _(
)


@pytest.fixture
async def create_cluster(
sqlalchemy_async_engine: AsyncEngine,
) -> AsyncIterator[Callable[..., Awaitable[Cluster]]]:
created_cluster_ids: list[str] = []

async def _(user: dict[str, Any], **cluster_kwargs) -> Cluster:
assert "json_schema_extra" in Cluster.model_config
assert isinstance(Cluster.model_config["json_schema_extra"], dict)
assert isinstance(Cluster.model_config["json_schema_extra"]["examples"], list)
assert isinstance(
Cluster.model_config["json_schema_extra"]["examples"][1], dict
)
cluster_config = Cluster.model_config["json_schema_extra"]["examples"][1]
cluster_config["owner"] = user["primary_gid"]
cluster_config.update(**cluster_kwargs)
new_cluster = Cluster.model_validate(cluster_config)
assert new_cluster

async with sqlalchemy_async_engine.begin() as conn:
# insert basic cluster
created_cluster = (
await conn.execute(
sa.insert(clusters)
.values(to_clusters_db(new_cluster, only_update=False))
.returning(sa.literal_column("*"))
)
).one()
created_cluster_ids.append(created_cluster.id)
if "access_rights" in cluster_kwargs:
for gid, rights in cluster_kwargs["access_rights"].items():
await conn.execute(
pg_insert(cluster_to_groups)
.values(
cluster_id=created_cluster.id,
gid=gid,
**rights.model_dump(),
)
.on_conflict_do_update(
index_elements=["gid", "cluster_id"],
set_=rights.model_dump(),
)
)
access_rights_in_db = {}
for row in await conn.execute(
sa.select(
cluster_to_groups.c.gid,
cluster_to_groups.c.read,
cluster_to_groups.c.write,
cluster_to_groups.c.delete,
)
.select_from(clusters.join(cluster_to_groups))
.where(clusters.c.id == created_cluster.id)
):
access_rights_in_db[row.gid] = {
"read": row.read,
"write": row.write,
"delete": row.delete,
}

return Cluster(
id=created_cluster.id,
name=created_cluster.name,
description=created_cluster.description,
type=created_cluster.type,
owner=created_cluster.owner,
endpoint=created_cluster.endpoint,
authentication=created_cluster.authentication,
access_rights=access_rights_in_db,
thumbnail=None,
)

yield _

# cleanup
async with sqlalchemy_async_engine.begin() as conn:
await conn.execute(
clusters.delete().where(clusters.c.id.in_(created_cluster_ids))
)


@pytest.fixture
async def publish_project(
registered_user: Callable[..., dict[str, Any]],
Expand Down

0 comments on commit dd8ce53

Please sign in to comment.