Skip to content

Commit

Permalink
✨🗃️ Is922/resource usage publish updated wallet credits (ITISFoundati…
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored Sep 20, 2023
1 parent c0ba13d commit 2c48d13
Show file tree
Hide file tree
Showing 54 changed files with 1,041 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ..projects_nodes_io import NodeID
from ..projects_pipeline import ComputationTask
from ..users import UserID
from ..wallets import WalletInfo


class ComputationGet(ComputationTask):
Expand Down Expand Up @@ -40,6 +41,10 @@ class ComputationCreate(BaseModel):
default=False,
description="if True, a cluster will be created as necessary (wallet_id cannot be None, and cluster_id must be None)",
)
wallet_info: WalletInfo | None = Field(
default=None,
description="contains information about the wallet used to bill the running service",
)

@validator("product_name", always=True)
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ..services import ServicePortKey
from ..services_resources import ServiceResourcesDict, ServiceResourcesDictHelpers
from ..wallets import WalletInfo
from .dynamic_services_service import RunningDynamicServiceDetails, ServiceDetails


Expand Down Expand Up @@ -41,6 +42,10 @@ class DynamicServiceCreate(ServiceDetails):
can_save: bool = Field(
..., description="the service data must be saved when closing"
)
wallet_info: WalletInfo | None = Field(
default=None,
description="contains information about the wallet used to bill the running service",
)

class Config:
schema_extra: ClassVar[dict[str, Any]] = {
Expand All @@ -56,6 +61,7 @@ class Config:
"service_resources": ServiceResourcesDictHelpers.Config.schema_extra[
"examples"
][0],
"wallet_info": WalletInfo.Config.schema_extra["examples"][0],
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class WalletGet(OutputSchema):


class WalletGetWithAvailableCredits(WalletGet):
available_credits: float
available_credits: Decimal


class WalletGetPermissions(WalletGet):
Expand Down
16 changes: 16 additions & 0 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import logging
from abc import abstractmethod
from decimal import Decimal
from enum import Enum, auto
from typing import Any, Literal, TypeAlias

Expand Down Expand Up @@ -250,3 +251,18 @@ class RabbitResourceTrackingStoppedMessage(RabbitResourceTrackingBaseMessage):
| RabbitResourceTrackingStoppedMessage
| RabbitResourceTrackingHeartbeatMessage
)


class WalletCreditsMessage(RabbitMessageBase):
channel_name: Literal["io.simcore.service.wallets"] = Field(
default="io.simcore.service.wallets", const=True
)
created_at: datetime.datetime = Field(
default_factory=lambda: arrow.utcnow().datetime,
description="message creation datetime",
)
wallet_id: WalletID
credits: Decimal

def routing_key(self) -> str | None:
return f"{self.wallet_id}"
12 changes: 11 additions & 1 deletion packages/models-library/src/models_library/wallets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime
from enum import auto
from typing import TypeAlias
from typing import Any, ClassVar, TypeAlias

from pydantic import BaseModel, Field, PositiveInt

Expand All @@ -14,6 +14,16 @@ class WalletStatus(StrAutoEnum):
INACTIVE = auto()


class WalletInfo(BaseModel):
wallet_id: WalletID
wallet_name: str

class Config:
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [{"wallet_id": 1, "wallet_name": "My Wallet"}]
}


### DB


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""resource tracker pricing plan to service remo unique constrain
Revision ID: ae72826e75fc
Revises: e3334cced752
Create Date: 2023-09-18 17:25:32.499378+00:00
"""
from alembic import op

# revision identifiers, used by Alembic.
revision = "ae72826e75fc"
down_revision = "e3334cced752"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"rut_pricing_plan_to_service__service_product_unique_key",
"resource_tracker_pricing_plan_to_service",
type_="unique",
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_unique_constraint(
"rut_pricing_plan_to_service__service_product_unique_key",
"resource_tracker_pricing_plan_to_service",
["service_key", "service_version", "product"],
)
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,4 @@
column_created_datetime(timezone=True),
column_modified_datetime(timezone=True),
# ---------------------------
sa.UniqueConstraint(
"service_key",
"service_version",
"product",
name="rut_pricing_plan_to_service__service_product_unique_key",
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""
# pylint: disable=too-many-arguments
# pylint: disable=too-many-statements


import contextlib
Expand All @@ -29,7 +30,7 @@
from models_library.clusters import DEFAULT_CLUSTER_ID
from models_library.projects import ProjectAtDB, ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services import ServiceKeyVersion
from models_library.services import ServiceKey, ServiceKeyVersion, ServiceVersion
from models_library.users import UserID
from models_library.utils.fastapi_encoders import jsonable_encoder
from pydantic import AnyHttpUrl, parse_obj_as
Expand Down Expand Up @@ -61,6 +62,7 @@
from ...modules.db.repositories.projects import ProjectsRepository
from ...modules.db.repositories.users import UsersRepository
from ...modules.director_v0 import DirectorV0Client
from ...modules.resource_usage_client import ResourceUsageApi
from ...utils.computations import (
find_deprecated_tasks,
get_pipeline_state_from_task_states,
Expand Down Expand Up @@ -219,6 +221,26 @@ async def create_computation( # noqa: C901, PLR0912
detail=f"Project {computation.project_id} has no computational services",
)

# Billing info
wallet_id = None
wallet_name = None
pricing_plan_id = None
pricing_detail_id = None
if computation.wallet_info:
wallet_id = computation.wallet_info.wallet_id
wallet_name = computation.wallet_info.wallet_name

resource_usage_api = ResourceUsageApi.get_from_state(request.app)
# NOTE: MD/SAN -> add real service version/key and store in DB, issue: https://github.com/ITISFoundation/osparc-issues/issues/1131
(
pricing_plan_id,
pricing_detail_id,
) = await resource_usage_api.get_default_pricing_plan_and_pricing_detail_for_service(
computation.product_name,
ServiceKey("simcore/services/comp/itis/sleeper"),
ServiceVersion("2.1.6"),
)

await scheduler.run_new_pipeline(
computation.user_id,
computation.project_id,
Expand All @@ -232,10 +254,10 @@ async def create_computation( # noqa: C901, PLR0912
project_name=project.name,
simcore_user_agent=computation.simcore_user_agent,
user_email=await users_repo.get_user_email(computation.user_id),
wallet_id=None,
wallet_name=None,
pricing_plan_id=None,
pricing_detail_id=None,
wallet_id=wallet_id,
wallet_name=wallet_name,
pricing_plan_id=pricing_plan_id,
pricing_detail_id=pricing_detail_id,
),
use_on_demand_clusters=computation.use_on_demand_clusters,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
osparc_variables_substitutions,
rabbitmq,
remote_debug,
resource_usage_client,
storage,
)
from .errors import (
Expand Down Expand Up @@ -174,6 +175,9 @@ def init_app(settings: AppSettings | None = None) -> FastAPI:
if computational_backend_enabled:
comp_scheduler.setup(app)

if settings.DIRECTOR_V2_RESOURCE_USAGE_TRACKER:
resource_usage_client.setup(app)

node_rights.setup(app)

# setup app --
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from settings_library.redis import RedisSettings
from settings_library.resource_usage_tracker import (
DEFAULT_RESOURCE_USAGE_HEARTBEAT_INTERVAL,
ResourceUsageTrackerSettings,
)
from settings_library.utils_logging import MixinLoggingSettings
from settings_library.utils_service import DEFAULT_FASTAPI_PORT
Expand Down Expand Up @@ -590,6 +591,11 @@ class AppSettings(BaseCustomSettings, MixinLoggingSettings):

DIRECTOR_V2_DOCKER_REGISTRY: RegistrySettings = Field(auto_default_from_env=True)

DIRECTOR_V2_RESOURCE_USAGE_TRACKER: ResourceUsageTrackerSettings = Field(
auto_default_from_env=True,
description="resource usage tracker service client's plugin",
)

# This is just a service placement constraint, see
# https://docs.docker.com/engine/swarm/services/#control-service-placement.
DIRECTOR_V2_SERVICES_CUSTOM_CONSTRAINTS: list[PlacementConstraintStr] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from models_library.services import RunID
from models_library.services_resources import ServiceResourcesDict
from models_library.wallets import WalletInfo
from pydantic import AnyHttpUrl, BaseModel, ConstrainedStr, Extra, Field, parse_obj_as
from servicelib.error_codes import ErrorCodeStr
from servicelib.exception_utils import DelayedExceptionHandler
Expand Down Expand Up @@ -412,6 +413,10 @@ def endpoint(self) -> AnyHttpUrl:
proxy_admin_api_port: PortInt | None = Field(
default=None, description="used as the admin endpoint API port"
)
wallet_info: WalletInfo | None = Field(
default=None,
description="contains information about the wallet used to bill the running service",
)

@property
def get_proxy_endpoint(self) -> AnyHttpUrl:
Expand Down Expand Up @@ -468,6 +473,7 @@ def from_http_request(
"proxy_service_name": names_helper.proxy_service_name,
"request_simcore_user_agent": request_simcore_user_agent,
"dynamic_sidecar": {"service_removal_state": {"can_save": can_save}},
"wallet_info": service.wallet_info,
}
if run_id:
obj_dict["run_id"] = run_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from .....core.settings import DynamicSidecarSettings
from .....models.dynamic_services_scheduler import SchedulerData
from .....modules.resource_usage_client import ResourceUsageApi
from .....utils.db import get_repository
from ....db.repositories.groups_extra_properties import GroupsExtraPropertiesRepository
from ....db.repositories.projects import ProjectsRepository
Expand Down Expand Up @@ -122,11 +123,27 @@ async def progress_create_containers(
users_repository = get_repository(app, UsersRepository)
user_email = await users_repository.get_user_email(scheduler_data.user_id)

# Billing info
wallet_id = None
wallet_name = None
pricing_plan_id = None
pricing_detail_id = None
if scheduler_data.wallet_info:
wallet_id = scheduler_data.wallet_info.wallet_id
wallet_name = scheduler_data.wallet_info.wallet_name
resource_usage_api = ResourceUsageApi.get_from_state(app)
(
pricing_plan_id,
pricing_detail_id,
) = await resource_usage_api.get_default_pricing_plan_and_pricing_detail_for_service(
scheduler_data.product_name, scheduler_data.key, scheduler_data.version
)

metrics_params = CreateServiceMetricsAdditionalParams(
wallet_id=None,
wallet_name=None,
pricing_plan_id=None,
pricing_detail_id=None,
wallet_id=wallet_id,
wallet_name=wallet_name,
pricing_plan_id=pricing_plan_id,
pricing_detail_id=pricing_detail_id,
product_name=scheduler_data.product_name,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
user_email=user_email,
Expand Down
Loading

0 comments on commit 2c48d13

Please sign in to comment.