Skip to content

Commit

Permalink
♻️Computation backend: clean comp_tasks DB (🗃️) (#6968)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Dec 17, 2024
1 parent 38866c1 commit 8f4c1b2
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 74 deletions.
10 changes: 5 additions & 5 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ flag_management:
statuses:
- type: project
target: auto
threshold: 2%
threshold: 5%
- type: patch
target: auto
threshold: 2%
threshold: 5%


component_management:
Expand All @@ -22,7 +22,7 @@ component_management:
statuses:
- type: project
target: auto
threshold: 2%
threshold: 5%
branches:
- "!master"
individual_components:
Expand Down Expand Up @@ -116,12 +116,12 @@ coverage:
project:
default:
informational: true
threshold: 2%
threshold: 5%

patch:
default:
informational: true
threshold: 2%
threshold: 5%

comment:
layout: "header,diff,flags,components,footer"
Expand Down
1 change: 1 addition & 0 deletions packages/postgres-database/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
--constraint _migration.txt

aiopg[sa]
arrow
coverage
faker
pytest
Expand Down
8 changes: 7 additions & 1 deletion packages/postgres-database/requirements/_test.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
aiopg==1.4.0
# via -r requirements/_test.in
arrow==1.3.0
# via -r requirements/_test.in
async-timeout==4.0.3
# via
# -c requirements/_base.txt
Expand Down Expand Up @@ -52,7 +54,9 @@ pytest-instafail==0.5.0
pytest-runner==6.0.1
# via -r requirements/_test.in
python-dateutil==2.9.0.post0
# via faker
# via
# arrow
# faker
pyyaml==6.0.2
# via
# -c requirements/../../../requirements/constraints.txt
Expand All @@ -72,6 +76,8 @@ types-docker==7.1.0.20240827
# via -r requirements/_test.in
types-psycopg2==2.9.21.20241019
# via -r requirements/_test.in
types-python-dateutil==2.9.0.20241206
# via arrow
types-requests==2.32.0.20241016
# via types-docker
typing-extensions==4.12.2
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""remove submit timestamp
Revision ID: 52a0e8148dd5
Revises: 77ac824a77ff
Create Date: 2024-12-16 14:55:15.114923+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '52a0e8148dd5'
down_revision = '77ac824a77ff'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('comp_tasks', 'submit')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('comp_tasks', sa.Column('submit', postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=True))
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ class NodeClass(enum.Enum):
nullable=True,
doc="current progress of the task if available",
),
# utc timestamps for submission/start/end
sa.Column(
"submit", sa.DateTime(timezone=True), doc="UTC timestamp for task submission"
),
sa.Column(
"start", sa.DateTime(timezone=True), doc="UTC timestamp when task started"
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import json
import random
from collections.abc import Callable
from datetime import datetime, timedelta, timezone
from datetime import UTC, datetime, timedelta
from typing import Any, Final
from uuid import uuid4

import arrow
import faker
from faker import Faker

Expand Down Expand Up @@ -182,7 +183,7 @@ def fake_task_factory(first_internal_id=1) -> Callable:
_index_in_sequence = itertools.count(start=first_internal_id)

def fake_task(**overrides) -> dict[str, Any]:
t0 = datetime.utcnow()
t0 = arrow.utcnow().datetime
data = {
"project_id": uuid4(),
"node_id": uuid4(),
Expand All @@ -193,7 +194,6 @@ def fake_task(**overrides) -> dict[str, Any]:
"outputs": json.dumps({}),
"image": json.dumps({}),
"state": random.choice(_get_comp_pipeline_test_states()),
"submit": t0,
"start": t0 + timedelta(seconds=1),
"end": t0 + timedelta(minutes=5),
}
Expand Down Expand Up @@ -251,7 +251,7 @@ def random_product(


def utcnow() -> datetime:
return datetime.now(tz=timezone.utc)
return datetime.now(tz=UTC)


def random_payment_method(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" CRUD operations on a "computation" resource
"""CRUD operations on a "computation" resource
A computation is a resource that represents a running pipeline of computational services in a give project
Therefore,
Expand All @@ -15,7 +15,6 @@
# pylint: disable=too-many-arguments
# pylint: disable=too-many-statements


import contextlib
import logging
from typing import Annotated, Any, Final
Expand Down Expand Up @@ -75,7 +74,6 @@
compute_pipeline_details,
compute_pipeline_started_timestamp,
compute_pipeline_stopped_timestamp,
compute_pipeline_submitted_timestamp,
create_complete_dag,
create_complete_dag_from_tasks,
create_minimal_computational_graph_based_on_selection,
Expand Down Expand Up @@ -396,9 +394,7 @@ async def create_computation( # noqa: PLR0913 # pylint: disable=too-many-positi
stopped=compute_pipeline_stopped_timestamp(
minimal_computational_dag, comp_tasks
),
submitted=compute_pipeline_submitted_timestamp(
minimal_computational_dag, comp_tasks
),
submitted=last_run.created if last_run else None,
)

except ProjectNotFoundError as e:
Expand Down Expand Up @@ -498,7 +494,7 @@ async def get_computation(
result=None,
started=compute_pipeline_started_timestamp(pipeline_dag, all_tasks),
stopped=compute_pipeline_stopped_timestamp(pipeline_dag, all_tasks),
submitted=compute_pipeline_submitted_timestamp(pipeline_dag, all_tasks),
submitted=last_run.created if last_run else None,
)


Expand Down Expand Up @@ -572,7 +568,7 @@ async def stop_computation(
result=None,
started=compute_pipeline_started_timestamp(pipeline_dag, tasks),
stopped=compute_pipeline_stopped_timestamp(pipeline_dag, tasks),
submitted=compute_pipeline_submitted_timestamp(pipeline_dag, tasks),
submitted=last_run.created if last_run else None,
)

except ProjectNotFoundError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ class CompTaskAtDB(BaseModel):
description="the hex digest of the resolved inputs +outputs hash at the time when the last outputs were generated",
)
image: Image
submit: dt.datetime
start: dt.datetime | None = None
end: dt.datetime | None = None
state: RunningState
Expand Down Expand Up @@ -163,7 +162,7 @@ def _convert_state_from_state_type_enum_if_needed(cls, v):
return RunningState(DB_TO_RUNNING_STATE[StateType(v)])
return v

@field_validator("start", "end", "submit")
@field_validator("start", "end")
@classmethod
def _ensure_utc(cls, v: dt.datetime | None) -> dt.datetime | None:
if v is not None and v.tzinfo is None:
Expand Down Expand Up @@ -228,7 +227,6 @@ def to_db_model(self, **exclusion_rules) -> dict[str, Any]:
}
},
"image": image_example,
"submit": "2021-03-01 13:07:34.19161",
"node_class": "INTERACTIVE",
"state": "NOT_STARTED",
"progress": 0.44,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ async def _get_node_infos(
None,
)

result: tuple[ServiceMetaDataPublished, ServiceExtras, SimcoreServiceLabels] = (
await asyncio.gather(
_get_service_details(catalog_client, user_id, product_name, node),
director_client.get_service_extras(node.key, node.version),
director_client.get_service_labels(node),
)
result: tuple[
ServiceMetaDataPublished, ServiceExtras, SimcoreServiceLabels
] = await asyncio.gather(
_get_service_details(catalog_client, user_id, product_name, node),
director_client.get_service_extras(node.key, node.version),
director_client.get_service_labels(node),
)
return result

Expand Down Expand Up @@ -246,9 +246,9 @@ async def _get_pricing_and_hardware_infos(
return pricing_info, hardware_info


_RAM_SAFE_MARGIN_RATIO: Final[float] = (
0.1 # NOTE: machines always have less available RAM than advertised
)
_RAM_SAFE_MARGIN_RATIO: Final[
float
] = 0.1 # NOTE: machines always have less available RAM than advertised
_CPUS_SAFE_MARGIN: Final[float] = 0.1


Expand All @@ -266,11 +266,11 @@ async def _update_project_node_resources_from_hardware_info(
if not hardware_info.aws_ec2_instances:
return
try:
unordered_list_ec2_instance_types: list[EC2InstanceTypeGet] = (
await get_instance_type_details(
rabbitmq_rpc_client,
instance_type_names=set(hardware_info.aws_ec2_instances),
)
unordered_list_ec2_instance_types: list[
EC2InstanceTypeGet
] = await get_instance_type_details(
rabbitmq_rpc_client,
instance_type_names=set(hardware_info.aws_ec2_instances),
)

assert unordered_list_ec2_instance_types # nosec
Expand Down Expand Up @@ -439,7 +439,6 @@ async def generate_tasks_list_from_project(
inputs=node.inputs,
outputs=node.outputs,
image=image,
submit=arrow.utcnow().datetime,
state=task_state,
internal_id=internal_id,
node_class=to_node_class(node.key),
Expand Down
11 changes: 0 additions & 11 deletions services/director-v2/src/simcore_service_director_v2/utils/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,6 @@ def compute_pipeline_stopped_timestamp(
return pipeline_stopped_at


def compute_pipeline_submitted_timestamp(
pipeline_dag: nx.DiGraph, comp_tasks: list[CompTaskAtDB]
) -> datetime.datetime | None:
if not pipeline_dag.nodes:
return None
node_id_to_comp_task: dict[NodeIDStr, CompTaskAtDB] = {
NodeIDStr(f"{task.node_id}"): task for task in comp_tasks
}
return max(node_id_to_comp_task[node_id].submit for node_id in pipeline_dag.nodes)


async def compute_pipeline_details(
complete_dag: nx.DiGraph, pipeline_dag: nx.DiGraph, comp_tasks: list[CompTaskAtDB]
) -> PipelineDetails:
Expand Down
1 change: 0 additions & 1 deletion services/director-v2/tests/mocks/fake_task.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
"requires_gpu": false,
"requires_mpi": true
},
"submit": "1994-11-10T19:23:02.115Z",
"state": "PUBLISHED",
"internal_id": 21107840,
"node_class": "COMPUTATIONAL",
Expand Down
20 changes: 8 additions & 12 deletions services/director-v2/tests/unit/test_utils_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,8 @@ def pipeline_test_params(
state=RunningState.NOT_STARTED,
internal_id=3,
node_class=NodeClass.COMPUTATIONAL,
submit=datetime.datetime.now(tz=datetime.timezone.utc),
created=datetime.datetime.now(tz=datetime.timezone.utc),
modified=datetime.datetime.now(tz=datetime.timezone.utc),
created=datetime.datetime.now(tz=datetime.UTC),
modified=datetime.datetime.now(tz=datetime.UTC),
last_heartbeat=None,
progress=1.00,
)
Expand Down Expand Up @@ -536,9 +535,8 @@ def pipeline_test_params(
state=RunningState.NOT_STARTED,
internal_id=3,
node_class=NodeClass.COMPUTATIONAL,
submit=datetime.datetime.now(tz=datetime.timezone.utc),
created=datetime.datetime.now(tz=datetime.timezone.utc),
modified=datetime.datetime.now(tz=datetime.timezone.utc),
created=datetime.datetime.now(tz=datetime.UTC),
modified=datetime.datetime.now(tz=datetime.UTC),
last_heartbeat=None,
),
CompTaskAtDB.model_construct(
Expand All @@ -550,9 +548,8 @@ def pipeline_test_params(
state=RunningState.NOT_STARTED,
internal_id=3,
node_class=NodeClass.COMPUTATIONAL,
submit=datetime.datetime.now(tz=datetime.timezone.utc),
created=datetime.datetime.now(tz=datetime.timezone.utc),
modified=datetime.datetime.now(tz=datetime.timezone.utc),
created=datetime.datetime.now(tz=datetime.UTC),
modified=datetime.datetime.now(tz=datetime.UTC),
last_heartbeat=None,
),
CompTaskAtDB.model_construct(
Expand All @@ -564,9 +561,8 @@ def pipeline_test_params(
state=RunningState.NOT_STARTED,
internal_id=3,
node_class=NodeClass.COMPUTATIONAL,
submit=datetime.datetime.now(tz=datetime.timezone.utc),
created=datetime.datetime.now(tz=datetime.timezone.utc),
modified=datetime.datetime.now(tz=datetime.timezone.utc),
created=datetime.datetime.now(tz=datetime.UTC),
modified=datetime.datetime.now(tz=datetime.UTC),
last_heartbeat=None,
progress=1.00,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,11 @@ def _mocked_services_details(
).isoformat()
}

data = {**ServiceGet.model_config["json_schema_extra"]["examples"][0], **data_published, **deprecated} # type: ignore
data = {
**ServiceGet.model_config["json_schema_extra"]["examples"][0],
**data_published,
**deprecated,
} # type: ignore

payload = ServiceGet.model_validate(data)

Expand Down Expand Up @@ -354,7 +358,6 @@ def _mocked_get_pricing_unit(request, pricing_plan_id: int) -> httpx.Response:
assert_all_called=False,
assert_all_mocked=True,
) as respx_mock:

respx_mock.get(
re.compile(
r"services/(?P<service_key>simcore/services/(comp|dynamic|frontend)/[^/]+)/(?P<service_version>[^\.]+.[^\.]+.[^/\?]+)/pricing-plan.+"
Expand Down Expand Up @@ -915,13 +918,7 @@ async def test_get_computation_from_not_started_computation_task(
stopped=None,
submitted=None,
)
_CHANGED_FIELDS = {"submitted"}
assert returned_computation.model_dump(
exclude=_CHANGED_FIELDS
) == expected_computation.model_dump(exclude=_CHANGED_FIELDS)
assert returned_computation.model_dump(
include=_CHANGED_FIELDS
) != expected_computation.model_dump(include=_CHANGED_FIELDS)
assert returned_computation == expected_computation


async def test_get_computation_from_published_computation_task(
Expand Down
Loading

0 comments on commit 8f4c1b2

Please sign in to comment.