Skip to content

Commit

Permalink
♻️Sim4Life computational jobs are not parented correctly (#6542)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Oct 17, 2024
1 parent 268298d commit 380f606
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from models_library.utils.fastapi_encoders import jsonable_encoder
from pydantic import AnyHttpUrl, parse_obj_as
from servicelib.async_utils import run_sequentially_in_context
from servicelib.logging_utils import log_decorator
from servicelib.rabbitmq import RabbitMQRPCClient
from simcore_postgres_database.utils_projects_metadata import DBProjectNotFoundError
from starlette import status
Expand Down Expand Up @@ -150,6 +151,7 @@ async def _check_pipeline_startable(
_UNKNOWN_NODE: Final[str] = "unknown node"


@log_decorator(_logger)
async def _get_project_metadata(
project_id: ProjectID,
project_repo: ProjectsRepository,
Expand All @@ -160,7 +162,7 @@ async def _get_project_metadata(
project_id
)
if project_ancestors.parent_project_uuid is None:
# no parents here
_logger.debug("no parent found for project %s", project_id)
return {}

assert project_ancestors.parent_node_id is not None # nosec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def get(
)
row: RowProxy | None = await result.first()
if not row:
raise ComputationalRunNotFoundError()
raise ComputationalRunNotFoundError
return CompRunsAtDB.from_orm(row)

async def list(
Expand Down Expand Up @@ -80,7 +80,7 @@ async def create(
project_id: ProjectID,
cluster_id: ClusterID,
iteration: PositiveInt | None = None,
metadata: RunMetadataDict | None,
metadata: RunMetadataDict,
use_on_demand_clusters: bool,
) -> CompRunsAtDB:
try:
Expand All @@ -102,13 +102,13 @@ async def create(
.values(
user_id=user_id,
project_uuid=f"{project_id}",
cluster_id=cluster_id
if cluster_id != DEFAULT_CLUSTER_ID
else None,
cluster_id=(
cluster_id if cluster_id != DEFAULT_CLUSTER_ID else None
),
iteration=iteration,
result=RUNNING_STATE_TO_DB[RunningState.PUBLISHED],
started=datetime.datetime.now(tz=datetime.timezone.utc),
metadata=jsonable_encoder(metadata) if metadata else None,
started=datetime.datetime.now(tz=datetime.UTC),
metadata=jsonable_encoder(metadata),
use_on_demand_clusters=use_on_demand_clusters,
)
.returning(literal_column("*"))
Expand Down Expand Up @@ -146,7 +146,7 @@ async def set_run_result(
) -> CompRunsAtDB | None:
values: dict[str, Any] = {"result": RUNNING_STATE_TO_DB[result_state]}
if final_state:
values.update({"ended": datetime.datetime.now(tz=datetime.timezone.utc)})
values.update({"ended": datetime.datetime.now(tz=datetime.UTC)})
return await self.update(
user_id,
project_id,
Expand Down
77 changes: 54 additions & 23 deletions services/director-v2/tests/unit/with_dbs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
import datetime
import json
from collections.abc import Awaitable, Callable, Iterator
from typing import Any
from typing import Any, cast
from uuid import uuid4

import pytest
import sqlalchemy as sa
from _helpers import PublishedProject, RunningProject
from faker import Faker
from fastapi.encoders import jsonable_encoder
from models_library.clusters import Cluster
from models_library.projects import ProjectAtDB
from models_library.projects import ProjectAtDB, ProjectID
from models_library.projects_nodes_io import NodeID
from pydantic.main import BaseModel
from simcore_postgres_database.models.cluster_to_groups import cluster_to_groups
Expand All @@ -25,7 +26,11 @@
from simcore_postgres_database.models.comp_runs import comp_runs
from simcore_postgres_database.models.comp_tasks import comp_tasks
from simcore_service_director_v2.models.comp_pipelines import CompPipelineAtDB
from simcore_service_director_v2.models.comp_runs import CompRunsAtDB, RunMetadataDict
from simcore_service_director_v2.models.comp_runs import (
CompRunsAtDB,
ProjectMetadataDict,
RunMetadataDict,
)
from simcore_service_director_v2.models.comp_tasks import CompTaskAtDB, Image
from simcore_service_director_v2.utils.computations import to_node_class
from simcore_service_director_v2.utils.dask import generate_dask_job_id
Expand Down Expand Up @@ -84,28 +89,36 @@ def creator(
"project_id": f"{project.uuid}",
"node_id": f"{node_id}",
"schema": {"inputs": {}, "outputs": {}},
"inputs": {
key: json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
for key, value in node_data.inputs.items()
}
if node_data.inputs
else {},
"outputs": {
key: json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
for key, value in node_data.outputs.items()
}
if node_data.outputs
else {},
"inputs": (
{
key: (
json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
)
for key, value in node_data.inputs.items()
}
if node_data.inputs
else {}
),
"outputs": (
{
key: (
json.loads(value.json(by_alias=True, exclude_unset=True))
if isinstance(value, BaseModel)
else value
)
for key, value in node_data.outputs.items()
}
if node_data.outputs
else {}
),
"image": Image(name=node_data.key, tag=node_data.version).dict( # type: ignore
by_alias=True, exclude_unset=True
), # type: ignore
"node_class": to_node_class(node_data.key),
"internal_id": internal_id + 1,
"submit": datetime.datetime.now(tz=datetime.timezone.utc),
"submit": datetime.datetime.now(tz=datetime.UTC),
"job_id": generate_dask_job_id(
service_key=node_data.key,
service_version=node_data.version,
Expand Down Expand Up @@ -135,9 +148,26 @@ def creator(
)


@pytest.fixture
def project_metadata(faker: Faker) -> ProjectMetadataDict:
return ProjectMetadataDict(
parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)),
parent_node_name=faker.pystr(),
parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)),
parent_project_name=faker.pystr(),
root_parent_project_id=cast(ProjectID, faker.uuid4(cast_to=None)),
root_parent_project_name=faker.pystr(),
root_parent_node_id=cast(NodeID, faker.uuid4(cast_to=None)),
root_parent_node_name=faker.pystr(),
)


@pytest.fixture
def run_metadata(
osparc_product_name: str, simcore_user_agent: str, faker: Faker
osparc_product_name: str,
simcore_user_agent: str,
project_metadata: ProjectMetadataDict,
faker: Faker,
) -> RunMetadataDict:
return RunMetadataDict(
node_id_names_map={},
Expand All @@ -147,6 +177,7 @@ def run_metadata(
user_email=faker.email(),
wallet_id=faker.pyint(min_value=1),
wallet_name=faker.name(),
project_metadata=project_metadata,
)


Expand All @@ -171,7 +202,7 @@ def creator(
with postgres_db.connect() as conn:
result = conn.execute(
comp_runs.insert()
.values(**run_config)
.values(**jsonable_encoder(run_config))
.returning(sa.literal_column("*"))
)
new_run = CompRunsAtDB.from_orm(result.first())
Expand Down Expand Up @@ -298,7 +329,7 @@ async def running_project(
project=created_project,
state=StateType.RUNNING,
progress=0.0,
start=datetime.datetime.now(tz=datetime.timezone.utc),
start=datetime.datetime.now(tz=datetime.UTC),
),
runs=runs(user=user, project=created_project, result=StateType.RUNNING),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,22 +381,17 @@ async def test_misconfigured_pipeline_is_not_scheduled(
)
run_entry = CompRunsAtDB.parse_obj(await result.first())
assert run_entry.result == RunningState.ABORTED
assert run_entry.metadata == run_metadata


async def _assert_start_pipeline(
aiopg_engine, published_project: PublishedProject, scheduler: BaseCompScheduler
aiopg_engine,
published_project: PublishedProject,
scheduler: BaseCompScheduler,
run_metadata: RunMetadataDict,
) -> list[CompTaskAtDB]:
exp_published_tasks = deepcopy(published_project.tasks)
assert published_project.project.prj_owner
run_metadata = RunMetadataDict(
node_id_names_map={},
project_name="",
product_name="",
simcore_user_agent="",
user_email="",
wallet_id=231,
wallet_name="",
)
await scheduler.run_new_pipeline(
user_id=published_project.project.prj_owner,
project_id=published_project.project.uuid,
Expand Down Expand Up @@ -618,11 +613,12 @@ async def test_proper_pipeline_is_scheduled( # noqa: PLR0915
mocked_clean_task_output_and_log_files_if_invalid: None,
instrumentation_rabbit_client_parser: mock.AsyncMock,
resource_tracking_rabbit_client_parser: mock.AsyncMock,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)

expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)

# -------------------------------------------------------------------------------
Expand Down Expand Up @@ -990,10 +986,11 @@ async def test_task_progress_triggers(
published_project: PublishedProject,
mocked_parse_output_data_fct: None,
mocked_clean_task_output_and_log_files_if_invalid: None,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)
# -------------------------------------------------------------------------------
# 1. first run will move comp_tasks to PENDING so the worker can take them
Expand Down Expand Up @@ -1286,10 +1283,11 @@ async def test_running_pipeline_triggers_heartbeat(
aiopg_engine: aiopg.sa.engine.Engine,
published_project: PublishedProject,
resource_tracking_rabbit_client_parser: mock.AsyncMock,
run_metadata: RunMetadataDict,
):
_mock_send_computation_tasks(published_project.tasks, mocked_dask_client)
expected_published_tasks = await _assert_start_pipeline(
aiopg_engine, published_project, scheduler
aiopg_engine, published_project, scheduler, run_metadata
)
# -------------------------------------------------------------------------------
# 1. first run will move comp_tasks to PENDING so the worker can take them
Expand Down

0 comments on commit 380f606

Please sign in to comment.