Skip to content

Commit

Permalink
ensure test runs with whole metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Oct 15, 2024
1 parent 3481ec3 commit 561a633
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 36 deletions.
77 changes: 54 additions & 23 deletions services/director-v2/tests/unit/with_dbs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
import datetime
import json
from collections.abc import Awaitable, Callable, Iterator
from typing import Any
from typing import Any, cast
from uuid import uuid4

import pytest
import sqlalchemy as sa
from _helpers import PublishedProject, RunningProject
from faker import Faker
from fastapi.encoders import jsonable_encoder
from models_library.clusters import Cluster
from models_library.projects import ProjectAtDB
from models_library.projects import ProjectAtDB, ProjectID
from models_library.projects_nodes_io import NodeID
from pydantic.main import BaseModel
from simcore_postgres_database.models.cluster_to_groups import cluster_to_groups
Expand All @@ -25,7 +26,11 @@
from simcore_postgres_database.models.comp_runs import comp_runs
from simcore_postgres_database.models.comp_tasks import comp_tasks
from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB
from simcore_service_director_v2.models.comp_runs import CompRunsAtDB, RunMetadataDict
from simcore_service_director_v2.models.comp_runs import (
CompRunsAtDB,
ProjectMetadataDict,
RunMetadataDict,
)
from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB, Image
from simcore_service_director_v2.utils.computations import to_node_class
from simcore_service_director_v2.utils.dask import generate_dask_job_id
Expand Down Expand Up @@ -84,28 +89,36 @@ def creator(
"project_id": f"{project.uuid}",
"node_id": f"{node_id}",
"schema": {"inputs": {}, "outputs": {}},
"inputs": {
key: json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
for key, value in node_data.inputs.items()
}
if node_data.inputs
else {},
"outputs": {
key: json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
for key, value in node_data.outputs.items()
}
if node_data.outputs
else {},
"inputs": (
{
key: (
json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
)
for key, value in node_data.inputs.items()
}
if node_data.inputs
else {}
),
"outputs": (
{
key: (
json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
)
for key, value in node_data.outputs.items()
}
if node_data.outputs
else {}
),
"image": Image(name=node_data.key, tag=node_data.version).dict( # type: ignore
by_alias=True, exclude_unset=True
), # type: ignore
"node_class": to_node_class(node_data.key),
"internal_id": internal_id + 1,
"submit": datetime.datetime.now(tz=datetime.timezone.utc),
"submit": datetime.datetime.now(tz=datetime.UTC),
"job_id": generate_dask_job_id(
service_key=node_data.key,
service_version=node_data.version,
Expand Down Expand Up @@ -135,9 +148,26 @@ def creator(
)


@pytest.fixture
def project_metadata(faker: Faker) -> ProjectMetadataDict:
return ProjectMetadataDict(
parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)),
parent_node_name=faker.pystr(),
parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)),
parent_project_name=faker.pystr(),
root_parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)),
root_parent_project_name=faker.pystr(),
root_parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)),
root_parent_node_name=faker.pystr(),
)


@pytest.fixture
def run_metadata(
osparc_product_name: str, simcore_user_agent: str, faker: Faker
osparc_product_name: str,
simcore_user_agent: str,
project_metadata: ProjectMetadataDict,
faker: Faker,
) -> RunMetadataDict:
return RunMetadataDict(
node_id_names_map={},
Expand All @@ -147,6 +177,7 @@ def run_metadata(
user_email=faker.email(),
wallet_id=faker.pyint(min_value=1),
wallet_name=faker.name(),
project_metadata=project_metadata,
)


Expand All @@ -171,7 +202,7 @@ def creator(
with postgres_db.connect() as conn:
result = conn.execute(
comp_runs.insert()
.values(**run_config)
.values(**jsonable_encoder(run_config))
.returning(sa.literal_column("*"))
)
new_run = CompRunsAtDB.from_orm(result.first())
Expand Down Expand Up @@ -298,7 +329,7 @@ async def running_project(
project=created_project,
state=StateType.RUNNING,
progress=0.0,
start=datetime.datetime.now(tz=datetime.timezone.utc),
start=datetime.datetime.now(tz=datetime.UTC),
),
runs=runs(user=user, project=created_project, result=StateType.RUNNING),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,22 +381,17 @@ async def test_misconfigured_pipeline_is_not_scheduled(
)
run_entry = CompRunsAtDB.parse_obj(await result.first())
assert run_entry.result == RunningState.ABORTED
assert run_entry.metadata == run_metadata


async def _assert_start_pipeline(
aiopg_engine, published_project: PublishedProject, scheduler: BaseCompScheduler
aiopg_engine,
published_project: PublishedProject,
scheduler: BaseCompScheduler,
run_metadata: RunMetadataDict,
) -> list[CompTaskAtDB]:
exp_published_tasks = deepcopy(published_project.tasks)
assert published_project.project.prj_owner
run_metadata = RunMetadataDict(
node_id_names_map={},
project_name="",
product_name="",
simcore_user_agent="",
user_email="",
wallet_id=231,
wallet_name="",
)
await scheduler.run_new_pipeline(
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
Expand Down Expand Up @@ -618,11 +613,12 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
mocked_clean_task_output_and_log_files_if_invalid: None,
instrumentation_rabbit_client_parser: mock.AsyncMock,
resource_tracking_rabbit_client_parser: mock.AsyncMock,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)

expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)

# -------------------------------------------------------------------------------
Expand Down Expand Up @@ -990,10 +986,11 @@ async def test_task_progress_triggers(
published_project: PublishedProject,
mocked_parse_output_data_fct: None,
mocked_clean_task_output_and_log_files_if_invalid: None,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)
# -------------------------------------------------------------------------------
# 1. first run will move comp_tasks to PENDING so the worker can take them
Expand Down Expand Up @@ -1286,10 +1283,11 @@ async def test_running_pipeline_triggers_heartbeat(
aiopg_engine: aiopg.sa.engine.Engine,
published_project: PublishedProject,
resource_tracking_rabbit_client_parser: mock.AsyncMock,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)
# -------------------------------------------------------------------------------
# 1. first run will move comp_tasks to PENDING so the worker can take them
Expand Down

0 comments on commit 561a633

Please sign in to comment.