Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 expose service_run_id as an env var for both comp and new style dynamic services #6942

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

from ..projects import ProjectID
from ..projects_nodes_io import NodeID
from ..resource_tracker import CreditTransactionStatus, ServiceRunId, ServiceRunStatus
from ..resource_tracker import CreditTransactionStatus, ServiceRunStatus
from ..services import ServiceKey, ServiceVersion
from ..services_types import ServiceRunID
from ..users import UserID
from ..wallets import WalletID


class ServiceRunGet(BaseModel):
service_run_id: ServiceRunId
service_run_id: ServiceRunID
wallet_id: WalletID | None
wallet_name: str | None
user_id: UserID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
PricingPlanId,
PricingUnitCostUpdate,
PricingUnitId,
ServiceRunId,
ServiceRunStatus,
SpecificInfo,
UnitExtraInfo,
)
from ..services import ServiceKey, ServiceVersion
from ..services_types import ServiceRunID
from ..users import UserID
from ..wallets import WalletID
from ._base import InputSchema, OutputSchema
Expand All @@ -27,7 +27,7 @@
class ServiceRunGet(
BaseModel
): # NOTE: this is already in use so I didnt modidy inheritance from OutputSchema
service_run_id: ServiceRunId
service_run_id: ServiceRunID
wallet_id: WalletID | None
wallet_name: str | None
user_id: UserID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .projects_state import RunningState
from .services import ServiceKey, ServiceType, ServiceVersion
from .services_resources import ServiceResourcesDict
from .services_types import ServiceRunID
from .users import UserID
from .utils.enums import StrAutoEnum
from .wallets import WalletID
Expand Down Expand Up @@ -178,7 +179,7 @@ class RabbitResourceTrackingMessageType(StrAutoEnum):
class RabbitResourceTrackingBaseMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.service.tracking"] = "io.simcore.service.tracking"

service_run_id: str = Field(
service_run_id: ServiceRunID = Field(
..., description="uniquely identitifies the service run"
)
created_at: datetime.datetime = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

_logger = logging.getLogger(__name__)

ServiceRunId: TypeAlias = str
PricingPlanId: TypeAlias = PositiveInt
PricingUnitId: TypeAlias = PositiveInt
PricingUnitCostId: TypeAlias = PositiveInt
Expand Down
4 changes: 2 additions & 2 deletions packages/models-library/src/models_library/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from .services_metadata_published import ServiceInputsDict, ServiceMetaDataPublished
from .services_types import (
DynamicServiceKey,
RunID,
ServiceKey,
ServicePortKey,
ServiceRunID,
ServiceVersion,
)

Expand All @@ -21,14 +21,14 @@
"BootOptions",
"DynamicServiceKey",
"LATEST_INTEGRATION_VERSION",
"RunID",
"ServiceInput",
"ServiceInputsDict",
"ServiceKey",
"ServiceKeyVersion",
"ServiceMetaDataPublished",
"ServiceOutput",
"ServicePortKey",
"ServiceRunID",
"ServiceType",
"ServiceVersion",
)
Expand Down
34 changes: 29 additions & 5 deletions packages/models-library/src/models_library/services_types.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
from typing import Annotated, Any, TypeAlias
from typing import TYPE_CHECKING, Annotated, Any, Self, TypeAlias
from uuid import uuid4

import arrow
from pydantic import GetCoreSchemaHandler, StringConstraints, ValidationInfo
from pydantic import (
GetCoreSchemaHandler,
PositiveInt,
StringConstraints,
ValidationInfo,
)
from pydantic_core import CoreSchema, core_schema

from .basic_regex import PROPERTY_KEY_RE, SIMPLE_VERSION_RE
from .projects_nodes_io import NodeID
from .services_regex import (
COMPUTATIONAL_SERVICE_KEY_RE,
DYNAMIC_SERVICE_KEY_RE,
FILENAME_RE,
SERVICE_ENCODED_KEY_RE,
SERVICE_KEY_RE,
)
from .users import UserID

if TYPE_CHECKING:
from .projects import ProjectID

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this this necessary? Is it because of cyclic dependencies?

TIP: For things like ProjectID we can always move these to common_library. I already did that with user_enums and groups_enums (check how it is imported in the Groups and Users models)

ServicePortKey: TypeAlias = Annotated[str, StringConstraints(pattern=PROPERTY_KEY_RE)]

Expand All @@ -35,7 +45,7 @@
ServiceVersion: TypeAlias = Annotated[str, StringConstraints(pattern=SIMPLE_VERSION_RE)]


class RunID(str):
class ServiceRunID(str):
"""
Used to assign a unique identifier to the run of a service.

Expand All @@ -44,12 +54,15 @@ class RunID(str):
and old volumes for different runs.
Avoids overwriting data that left dropped on the node (due to an error)
and gives the osparc-agent an opportunity to back it up.
The resource-usage-tracker tracker uses these RunIDs to keep track of
resource usage from comp and dynamic services.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
resource usage from comp and dynamic services.
resource usage from computational and dynamic services.


__slots__ = ()

@classmethod
def create(cls) -> "RunID":
def get_resource_tracking_run_id_for_dynamic(cls) -> Self:
"""used for dynamic services"""
# NOTE: there was a legacy version of this RunID
# legacy version:
# '0ac3ed64-665b-42d2-95f7-e59e0db34242'
Expand All @@ -59,6 +72,17 @@ def create(cls) -> "RunID":
run_id_format = f"{utc_int_timestamp}_{uuid4()}"
return cls(run_id_format)

@classmethod
def get_resource_tracking_run_id_for_computational(
cls,
user_id: UserID,
project_id: "ProjectID",
node_id: NodeID,
iteration: PositiveInt,
) -> Self:
"""used by computational services"""
return cls(f"comp_{user_id}_{project_id}_{node_id}_{iteration}")

@classmethod
def __get_pydantic_core_schema__(
cls,
Expand All @@ -68,7 +92,7 @@ def __get_pydantic_core_schema__(
return core_schema.no_info_after_validator_function(cls, handler(str))

@classmethod
def validate(cls, v: "RunID | str", _: ValidationInfo) -> "RunID":
def validate(cls, v: "ServiceRunID | str", _: ValidationInfo) -> "ServiceRunID":
if isinstance(v, cls):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: should this not raise a ValueError instead of TypeError?

return v
if isinstance(v, str):
Expand Down
40 changes: 40 additions & 0 deletions packages/models-library/tests/test_services_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest
from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
from models_library.services_types import ServiceRunID
from models_library.users import UserID
from pydantic import PositiveInt


@pytest.mark.parametrize(
"user_id, project_id, node_id, iteration, expected_result",
[
(
2,
ProjectID("e08356e4-eb74-49e9-b769-2c26e34c61d9"),
NodeID("a08356e4-eb74-49e9-b769-2c26e34c61d1"),
5,
"comp_2_e08356e4-eb74-49e9-b769-2c26e34c61d9_a08356e4-eb74-49e9-b769-2c26e34c61d1_5",
)
],
)
def test_run_id_get_resource_tracking_run_id(
user_id: UserID,
project_id: ProjectID,
node_id: NodeID,
iteration: PositiveInt,
expected_result: str,
):
resource_tracking_service_run_id = (
ServiceRunID.get_resource_tracking_run_id_for_computational(
user_id, project_id, node_id, iteration
)
)
assert isinstance(resource_tracking_service_run_id, ServiceRunID)
assert resource_tracking_service_run_id == expected_result


def test_get_resource_tracking_run_id_for_dynamic():
assert isinstance(
ServiceRunID.get_resource_tracking_run_id_for_dynamic(), ServiceRunID
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from models_library.licensed_items import LicensedItemID
from models_library.products import ProductName
from models_library.rabbitmq_basic_types import RPCMethodName
from models_library.resource_tracker import ServiceRunId
from models_library.services_types import ServiceRunID
from models_library.users import UserID
from models_library.wallets import WalletID
from pydantic import TypeAdapter
Expand Down Expand Up @@ -65,7 +65,7 @@ async def checkout_licensed_item_for_wallet(
wallet_id: WalletID,
licensed_item_id: LicensedItemID,
num_of_seats: int,
service_run_id: ServiceRunId,
service_run_id: ServiceRunID,
) -> None:
result = await rabbitmq_rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
Expand All @@ -89,7 +89,7 @@ async def release_licensed_item_for_wallet(
wallet_id: WalletID,
licensed_item_id: LicensedItemID,
num_of_seats: int,
service_run_id: ServiceRunId,
service_run_id: ServiceRunID,
) -> None:
result = await rabbitmq_rpc_client.request(
WEBSERVER_RPC_NAMESPACE,
Expand Down
4 changes: 2 additions & 2 deletions services/agent/src/simcore_service_agent/models/volumes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
)
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import RunID
from models_library.services_types import ServiceRunID
from models_library.users import UserID
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter


class DynamicServiceVolumeLabels(BaseModel):
node_uuid: NodeID
run_id: RunID
run_id: ServiceRunID
source: str
study_id: ProjectID
swarm_stack_name: str
Expand Down
12 changes: 6 additions & 6 deletions services/agent/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from fastapi.testclient import TestClient
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import RunID
from models_library.services_types import ServiceRunID
from models_library.users import UserID
from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict
from settings_library.rabbit import RabbitSettings
Expand Down Expand Up @@ -56,8 +56,8 @@ def test_client(initialized_app: FastAPI) -> TestClient:


@pytest.fixture
def run_id() -> RunID:
return RunID.create()
def service_run_id() -> ServiceRunID:
return ServiceRunID.get_resource_tracking_run_id_for_dynamic()


@pytest.fixture
Expand All @@ -77,7 +77,7 @@ def volumes_path(tmp_path: Path) -> Path:

@pytest.fixture
async def create_dynamic_sidecar_volume(
run_id: RunID,
service_run_id: ServiceRunID,
project_id: ProjectID,
swarm_stack_name: str,
user_id: UserID,
Expand All @@ -89,13 +89,13 @@ async def create_dynamic_sidecar_volume(
async with aiodocker.Docker() as docker_client:

async def _(node_id: NodeID, in_use: bool, volume_name: str) -> str:
source = get_source(run_id, node_id, volumes_path / volume_name)
source = get_source(service_run_id, node_id, volumes_path / volume_name)
volume = await docker_client.volumes.create(
{
"Name": source,
"Labels": {
"node_uuid": f"{node_id}",
"run_id": run_id,
"run_id": service_run_id,
"source": source,
"study_id": f"{project_id}",
"swarm_stack_name": swarm_stack_name,
Expand Down
6 changes: 3 additions & 3 deletions services/agent/tests/unit/test_services_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from fastapi import FastAPI
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_types import RunID
from models_library.services_types import ServiceRunID
from pydantic import NonNegativeInt
from simcore_service_agent.core.settings import ApplicationSettings
from simcore_service_agent.services.backup import backup_volume
Expand Down Expand Up @@ -48,7 +48,7 @@ async def test_backup_volume(
volume_content: Path,
project_id: ProjectID,
swarm_stack_name: str,
run_id: RunID,
service_run_id: ServiceRunID,
downlaoded_from_s3: Path,
create_dynamic_sidecar_volumes: Callable[[NodeID, bool], Awaitable[set[str]]],
initialized_app: FastAPI,
Expand Down Expand Up @@ -80,7 +80,7 @@ async def test_backup_volume(
async with session.client("s3", endpoint_url=f"{settings.AGENT_VOLUMES_CLEANUP_S3_ENDPOINT}") as s3_client: # type: ignore
list_response = await s3_client.list_objects_v2(
Bucket=settings.AGENT_VOLUMES_CLEANUP_S3_BUCKET,
Prefix=f"{swarm_stack_name}/{project_id}/{node_id}/{run_id}",
Prefix=f"{swarm_stack_name}/{project_id}/{node_id}/{service_run_id}",
)
synced_keys: list[str] = [o["Key"] for o in list_response["Contents"]]

Expand Down
6 changes: 3 additions & 3 deletions services/agent/tests/unit/test_services_docker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from aiodocker.docker import Docker
from fastapi import FastAPI
from models_library.projects_nodes_io import NodeID
from models_library.services_types import RunID
from models_library.services_types import ServiceRunID
from pytest_mock import MockerFixture
from servicelib.docker_constants import PREFIX_DYNAMIC_SIDECAR_VOLUMES
from simcore_service_agent.services.docker_utils import (
Expand Down Expand Up @@ -43,9 +43,9 @@ def test__reverse_string():
],
)
def test__does_volume_require_backup(
run_id: RunID, volume_path_part: str, expected: bool
service_run_id: ServiceRunID, volume_path_part: str, expected: bool
) -> None:
volume_name = get_source(run_id, uuid4(), Path("/apath") / volume_path_part)
volume_name = get_source(service_run_id, uuid4(), Path("/apath") / volume_path_part)
print(volume_name)
assert _does_volume_require_backup(volume_name) is expected

Expand Down
12 changes: 7 additions & 5 deletions services/agent/tests/unit/test_services_volumes_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from aiodocker.docker import Docker
from fastapi import FastAPI
from models_library.projects_nodes_io import NodeID
from models_library.services_types import RunID
from models_library.services_types import ServiceRunID
from servicelib.rabbitmq.rpc_interfaces.agent.errors import (
NoServiceVolumesFoundRPCError,
)
Expand All @@ -30,12 +30,14 @@

@dataclass
class MockedVolumesProxy:
run_id: RunID
service_run_id: ServiceRunID
volumes: set[str] = field(default_factory=set)

def add_unused_volumes_for_service(self, node_id: NodeID) -> None:
for folder_name in VOLUMES_TO_CREATE:
volume_name = get_source(self.run_id, node_id, Path("/apath") / folder_name)
volume_name = get_source(
self.service_run_id, node_id, Path("/apath") / folder_name
)
self.volumes.add(volume_name)

def remove_volume(self, volume_name: str) -> None:
Expand All @@ -47,9 +49,9 @@ def get_unused_dynamc_sidecar_volumes(self) -> set[str]:

@pytest.fixture
async def mock_docker_utils(
mocker: pytest_mock.MockerFixture, run_id: RunID
mocker: pytest_mock.MockerFixture, service_run_id: ServiceRunID
) -> MockedVolumesProxy:
proxy = MockedVolumesProxy(run_id)
proxy = MockedVolumesProxy(service_run_id)

async def _remove_volume(
app: FastAPI, docker: Docker, *, volume_name: str, requires_backup: bool
Expand Down
Loading
Loading