diff --git a/services/director-v2/tests/unit/with_dbs/conftest.py b/services/director-v2/tests/unit/with_dbs/conftest.py index 09b727449f28..8dd5527f00a3 100644 --- a/services/director-v2/tests/unit/with_dbs/conftest.py +++ b/services/director-v2/tests/unit/with_dbs/conftest.py @@ -8,15 +8,16 @@ import datetime import json from collections.abc import Awaitable, Callable, Iterator -from typing import Any +from typing import Any, cast from uuid import uuid4 import pytest import sqlalchemy as sa from _helpers import PublishedProject, RunningProject from faker import Faker +from fastapi.encoders import jsonable_encoder from models_library.clusters import Cluster -from models_library.projects import ProjectAtDB +from models_library.projects import ProjectAtDB, ProjectID from models_library.projects_nodes_io import NodeID from pydantic.main import BaseModel from simcore_postgres_database.models.cluster_to_groups import cluster_to_groups @@ -25,7 +26,11 @@ from simcore_postgres_database.models.comp_runs import comp_runs from simcore_postgres_database.models.comp_tasks import comp_tasks from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB -from simcore_service_director_v2.models.comp_runs import CompRunsAtDB, RunMetadataDict +from simcore_service_director_v2.models.comp_runs import ( + CompRunsAtDB, + ProjectMetadataDict, + RunMetadataDict, +) from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB, Image from simcore_service_director_v2.utils.computations import to_node_class from simcore_service_director_v2.utils.dask import generate_dask_job_id @@ -84,28 +89,36 @@ def creator( "project_id": f"{project.uuid}", "node_id": f"{node_id}", "schema": {"inputs": {}, "outputs": {}}, - "inputs": { - key: json.loads(value.json(by_alias=True, exclude_unset=True)) - if isinstance(value, BaseModel) - else value - for key, value in node_data.inputs.items() - } - if node_data.inputs - else {}, - "outputs": { - key: json.loads(value.json(by_alias=True, exclude_unset=True)) - if isinstance(value, BaseModel) - else value - for key, value in node_data.outputs.items() - } - if node_data.outputs - else {}, + "inputs": ( + { + key: ( + json.loads(value.json(by_alias=True, exclude_unset=True)) + if isinstance(value, BaseModel) + else value + ) + for key, value in node_data.inputs.items() + } + if node_data.inputs + else {} + ), + "outputs": ( + { + key: ( + json.loads(value.json(by_alias=True, exclude_unset=True)) + if isinstance(value, BaseModel) + else value + ) + for key, value in node_data.outputs.items() + } + if node_data.outputs + else {} + ), "image": Image(name=node_data.key, tag=node_data.version).dict( # type: ignore by_alias=True, exclude_unset=True ), # type: ignore "node_class": to_node_class(node_data.key), "internal_id": internal_id + 1, - "submit": datetime.datetime.now(tz=datetime.timezone.utc), + "submit": datetime.datetime.now(tz=datetime.UTC), "job_id": generate_dask_job_id( service_key=node_data.key, service_version=node_data.version, @@ -135,9 +148,26 @@ def creator( ) +@pytest.fixture +def project_metadata(faker: Faker) -> ProjectMetadataDict: + return ProjectMetadataDict( + parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)), + parent_node_name=faker.pystr(), + parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)), + parent_project_name=faker.pystr(), + root_parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)), + root_parent_project_name=faker.pystr(), + root_parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)), + root_parent_node_name=faker.pystr(), + ) + + @pytest.fixture def run_metadata( - osparc_product_name: str, simcore_user_agent: str, faker: Faker + osparc_product_name: str, + simcore_user_agent: str, + project_metadata: ProjectMetadataDict, + faker: Faker, ) -> RunMetadataDict: return RunMetadataDict( node_id_names_map={}, @@ -147,6 +177,7 @@ def run_metadata( user_email=faker.email(), wallet_id=faker.pyint(min_value=1), wallet_name=faker.name(), + project_metadata=project_metadata, ) @@ -171,7 +202,7 @@ def creator( with postgres_db.connect() as conn: result = conn.execute( comp_runs.insert() - .values(**run_config) + .values(**jsonable_encoder(run_config)) .returning(sa.literal_column("*")) ) new_run = CompRunsAtDB.from_orm(result.first()) @@ -298,7 +329,7 @@ async def running_project( project=created_project, state=StateType.RUNNING, progress=0.0, - start=datetime.datetime.now(tz=datetime.timezone.utc), + start=datetime.datetime.now(tz=datetime.UTC), ), runs=runs(user=user, project=created_project, result=StateType.RUNNING), ) diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index 2968e96e5db6..d15ab46a4986 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -381,22 +381,17 @@ async def test_misconfigured_pipeline_is_not_scheduled( ) run_entry = CompRunsAtDB.parse_obj(await result.first()) assert run_entry.result == RunningState.ABORTED + assert run_entry.metadata == run_metadata async def _assert_start_pipeline( - aiopg_engine, published_project: PublishedProject, scheduler: BaseCompScheduler + aiopg_engine, + published_project: PublishedProject, + scheduler: BaseCompScheduler, + run_metadata: RunMetadataDict, ) -> list[CompTaskAtDB]: exp_published_tasks = deepcopy(published_project.tasks) assert published_project.project.prj_owner - run_metadata = RunMetadataDict( - node_id_names_map={}, - project_name="", - product_name="", - simcore_user_agent="", - user_email="", - wallet_id=231, - wallet_name="", - ) await scheduler.run_new_pipeline( user_id=published_project.project.prj_owner, project_id=published_project.project.uuid, @@ -618,11 +613,12 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915 mocked_clean_task_output_and_log_files_if_invalid: None, instrumentation_rabbit_client_parser: mock.AsyncMock, resource_tracking_rabbit_client_parser: mock.AsyncMock, + run_metadata: RunMetadataDict, ): _mock_send_computation_tasks(published_project.tasks, mocked_dask_client) expected_published_tasks = await _assert_start_pipeline( - aiopg_engine, published_project, scheduler + aiopg_engine, published_project, scheduler, run_metadata ) # ------------------------------------------------------------------------------- @@ -990,10 +986,11 @@ async def test_task_progress_triggers( published_project: PublishedProject, mocked_parse_output_data_fct: None, mocked_clean_task_output_and_log_files_if_invalid: None, + run_metadata: RunMetadataDict, ): _mock_send_computation_tasks(published_project.tasks, mocked_dask_client) expected_published_tasks = await _assert_start_pipeline( - aiopg_engine, published_project, scheduler + aiopg_engine, published_project, scheduler, run_metadata ) # ------------------------------------------------------------------------------- # 1. first run will move comp_tasks to PENDING so the worker can take them @@ -1286,10 +1283,11 @@ async def test_running_pipeline_triggers_heartbeat( aiopg_engine: aiopg.sa.engine.Engine, published_project: PublishedProject, resource_tracking_rabbit_client_parser: mock.AsyncMock, + run_metadata: RunMetadataDict, ): _mock_send_computation_tasks(published_project.tasks, mocked_dask_client) expected_published_tasks = await _assert_start_pipeline( - aiopg_engine, published_project, scheduler + aiopg_engine, published_project, scheduler, run_metadata ) # ------------------------------------------------------------------------------- # 1. first run will move comp_tasks to PENDING so the worker can take them