Skip to content

Commit

Permalink
♻️ refactor stop service command ⚠️🚨 (#5924)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Jun 7, 2024
1 parent 74d4870 commit d25082c
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 149 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from typing import Any, ClassVar

from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceCreate
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.resource_tracker import HardwareInfo, PricingInfo
from models_library.services_resources import ServiceResourcesDictHelpers
from models_library.users import UserID
from models_library.wallets import WalletInfo
from pydantic import BaseModel


class RPCDynamicServiceCreate(DynamicServiceCreate):
class DynamicServiceStart(DynamicServiceCreate):
request_dns: str
request_scheme: str
simcore_user_agent: str
Expand All @@ -32,3 +36,22 @@ class Config:
"hardware_info": HardwareInfo.Config.schema_extra["examples"][0],
}
}


class DynamicServiceStop(BaseModel):
user_id: UserID
project_id: ProjectID
node_id: NodeID
simcore_user_agent: str
save_state: bool

class Config:
schema_extra: ClassVar[dict[str, Any]] = {
"example": {
"user_id": 234,
"project_id": "dd1d04d9-d704-4f7e-8f0f-1ca60cc771fe",
"node_id": "75c7f3f4-18f9-4678-8610-54a2ade78eaa",
"simcore_user_agent": "",
"save_state": True,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
RPCDynamicServiceCreate,
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects_nodes_io import NodeID
Expand Down Expand Up @@ -44,12 +45,12 @@ async def get_service_status(
async def run_dynamic_service(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
rpc_dynamic_service_create: RPCDynamicServiceCreate,
dynamic_service_start: DynamicServiceStart,
) -> DynamicServiceGet | NodeGet:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
parse_obj_as(RPCMethodName, "run_dynamic_service"),
rpc_dynamic_service_create=rpc_dynamic_service_create,
dynamic_service_start=dynamic_service_start,
timeout_s=_RPC_DEFAULT_TIMEOUT_S,
)
assert isinstance(result, DynamicServiceGet | NodeGet) # nosec
Expand All @@ -60,17 +61,13 @@ async def run_dynamic_service(
async def stop_dynamic_service(
rabbitmq_rpc_client: RabbitMQRPCClient,
*,
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
dynamic_service_stop: DynamicServiceStop,
timeout_s: NonNegativeInt,
) -> None:
result = await rabbitmq_rpc_client.request(
DYNAMIC_SCHEDULER_RPC_NAMESPACE,
parse_obj_as(RPCMethodName, "stop_dynamic_service"),
node_id=node_id,
simcore_user_agent=simcore_user_agent,
save_state=save_state,
dynamic_service_stop=dynamic_service_stop,
timeout_s=timeout_s,
)
assert result is None # nosec
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from fastapi import FastAPI
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
RPCDynamicServiceCreate,
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects_nodes_io import NodeID
Expand All @@ -27,10 +28,10 @@ async def get_service_status(

@router.expose()
async def run_dynamic_service(
app: FastAPI, *, rpc_dynamic_service_create: RPCDynamicServiceCreate
app: FastAPI, *, dynamic_service_start: DynamicServiceStart
) -> NodeGet | DynamicServiceGet:
director_v2_client = DirectorV2Client.get_from_app_state(app)
return await director_v2_client.run_dynamic_service(rpc_dynamic_service_create)
return await director_v2_client.run_dynamic_service(dynamic_service_start)


@router.expose(
Expand All @@ -40,13 +41,13 @@ async def run_dynamic_service(
)
)
async def stop_dynamic_service(
app: FastAPI, *, node_id: NodeID, simcore_user_agent: str, save_state: bool
app: FastAPI, *, dynamic_service_stop: DynamicServiceStop
) -> NodeGet | DynamicServiceGet:
director_v2_client = DirectorV2Client.get_from_app_state(app)
settings: ApplicationSettings = app.state.settings
return await director_v2_client.stop_dynamic_service(
node_id=node_id,
simcore_user_agent=simcore_user_agent,
save_state=save_state,
node_id=dynamic_service_stop.node_id,
simcore_user_agent=dynamic_service_stop.simcore_user_agent,
save_state=dynamic_service_stop.save_state,
timeout=settings.DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT,
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi import FastAPI, status
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
RPCDynamicServiceCreate,
DynamicServiceStart,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects_nodes_io import NodeID
Expand Down Expand Up @@ -55,11 +55,9 @@ async def get_status(
raise

async def run_dynamic_service(
self, rpc_dynamic_service_create: RPCDynamicServiceCreate
self, dynamic_service_start: DynamicServiceStart
) -> NodeGet | DynamicServiceGet:
response = await self.thin_client.post_dynamic_service(
rpc_dynamic_service_create
)
response = await self.thin_client.post_dynamic_service(dynamic_service_start)
dict_response: dict[str, Any] = response.json()

# legacy services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from fastapi import FastAPI, status
from httpx import Response, Timeout
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
RPCDynamicServiceCreate,
DynamicServiceStart,
)
from models_library.projects_nodes_io import NodeID
from models_library.services_resources import ServiceResourcesDictHelpers
Expand Down Expand Up @@ -48,29 +48,29 @@ async def get_status(self, node_id: NodeID) -> Response:
@retry_on_errors()
@expect_status(status.HTTP_201_CREATED)
async def post_dynamic_service(
self, rpc_dynamic_service_create: RPCDynamicServiceCreate
self, dynamic_service_start: DynamicServiceStart
) -> Response:
post_data = {
"product_name": rpc_dynamic_service_create.product_name,
"can_save": rpc_dynamic_service_create.can_save,
"user_id": rpc_dynamic_service_create.user_id,
"project_id": rpc_dynamic_service_create.project_id,
"key": rpc_dynamic_service_create.key,
"version": rpc_dynamic_service_create.version,
"node_uuid": rpc_dynamic_service_create.node_uuid,
"basepath": f"/x/{rpc_dynamic_service_create.node_uuid}",
"product_name": dynamic_service_start.product_name,
"can_save": dynamic_service_start.can_save,
"user_id": dynamic_service_start.user_id,
"project_id": dynamic_service_start.project_id,
"key": dynamic_service_start.key,
"version": dynamic_service_start.version,
"node_uuid": dynamic_service_start.node_uuid,
"basepath": f"/x/{dynamic_service_start.node_uuid}",
"service_resources": ServiceResourcesDictHelpers.create_jsonable(
rpc_dynamic_service_create.service_resources
dynamic_service_start.service_resources
),
"wallet_info": rpc_dynamic_service_create.wallet_info,
"pricing_info": rpc_dynamic_service_create.pricing_info,
"hardware_info": rpc_dynamic_service_create.hardware_info,
"wallet_info": dynamic_service_start.wallet_info,
"pricing_info": dynamic_service_start.pricing_info,
"hardware_info": dynamic_service_start.hardware_info,
}

headers = {
X_DYNAMIC_SIDECAR_REQUEST_DNS: rpc_dynamic_service_create.request_dns,
X_DYNAMIC_SIDECAR_REQUEST_SCHEME: rpc_dynamic_service_create.request_scheme,
X_SIMCORE_USER_AGENT: rpc_dynamic_service_create.simcore_user_agent,
X_DYNAMIC_SIDECAR_REQUEST_DNS: dynamic_service_start.request_dns,
X_DYNAMIC_SIDECAR_REQUEST_SCHEME: dynamic_service_start.request_scheme,
X_SIMCORE_USER_AGENT: dynamic_service_start.simcore_user_agent,
}

return await self.client.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
from fastapi.encoders import jsonable_encoder
from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
RPCDynamicServiceCreate,
DynamicServiceStart,
DynamicServiceStop,
)
from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.users import UserID
from pytest_mock import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from servicelib.rabbitmq import RabbitMQRPCClient, RPCServerError
Expand All @@ -24,8 +27,10 @@
ServiceWasNotFoundError,
)
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings

pytest_simcore_core_services_selection = [
"redis",
"rabbit",
]

Expand Down Expand Up @@ -125,9 +130,9 @@ def mock_director_v2_service_state(

@pytest.fixture
def app_environment(
disable_redis_setup: None,
app_environment: EnvVarsDict,
rabbit_service: RabbitSettings,
redis_service: RedisSettings,
) -> EnvVarsDict:
return app_environment

Expand Down Expand Up @@ -166,10 +171,10 @@ async def test_get_state(


@pytest.fixture
def rpc_dynamic_service_create() -> RPCDynamicServiceCreate:
def dynamic_service_start() -> DynamicServiceStart:
# one for legacy and one for new style?
return RPCDynamicServiceCreate.parse_obj(
RPCDynamicServiceCreate.Config.schema_extra["example"]
return DynamicServiceStart.parse_obj(
DynamicServiceStart.Config.schema_extra["example"]
)


Expand Down Expand Up @@ -221,11 +226,11 @@ async def test_run_dynamic_service(
mock_director_v0_service_run: None,
mock_director_v2_service_run: None,
rpc_client: RabbitMQRPCClient,
rpc_dynamic_service_create: RPCDynamicServiceCreate,
dynamic_service_start: DynamicServiceStart,
is_legacy: bool,
):
result = await services.run_dynamic_service(
rpc_client, rpc_dynamic_service_create=rpc_dynamic_service_create
rpc_client, dynamic_service_start=dynamic_service_start
)

if is_legacy:
Expand Down Expand Up @@ -254,6 +259,16 @@ def node_id_manual_intervention(faker: Faker) -> NodeID:
return faker.uuid4(cast_to=None)


@pytest.fixture
def user_id() -> UserID:
return 42


@pytest.fixture
def project_id(faker: Faker) -> ProjectID:
return faker.uuid4(cast_to=None)


@pytest.fixture
def mock_director_v0_service_stop(
fake_director_v0_base_url: str,
Expand Down Expand Up @@ -344,18 +359,27 @@ async def test_stop_dynamic_service(
mock_director_v0_service_stop: None,
mock_director_v2_service_stop: None,
rpc_client: RabbitMQRPCClient,
user_id: UserID,
project_id: ProjectID,
node_id: NodeID,
node_id_not_found: NodeID,
node_id_manual_intervention: NodeID,
simcore_user_agent: str,
save_state: bool,
):
def _get_rpc_stop(with_node_id: NodeID) -> DynamicServiceStop:
return DynamicServiceStop(
user_id=user_id,
project_id=project_id,
node_id=with_node_id,
simcore_user_agent=simcore_user_agent,
save_state=save_state,
)

# service was stopped
result = await services.stop_dynamic_service(
rpc_client,
node_id=node_id,
simcore_user_agent=simcore_user_agent,
save_state=save_state,
dynamic_service_stop=_get_rpc_stop(node_id),
timeout_s=5,
)
assert result is None
Expand All @@ -364,19 +388,15 @@ async def test_stop_dynamic_service(
with pytest.raises(ServiceWasNotFoundError):
await services.stop_dynamic_service(
rpc_client,
node_id=node_id_not_found,
simcore_user_agent=simcore_user_agent,
save_state=save_state,
dynamic_service_stop=_get_rpc_stop(node_id_not_found),
timeout_s=5,
)

# service awaits for manual intervention
with pytest.raises(ServiceWaitingForManualInterventionError):
await services.stop_dynamic_service(
rpc_client,
node_id=node_id_manual_intervention,
simcore_user_agent=simcore_user_agent,
save_state=save_state,
dynamic_service_stop=_get_rpc_stop(node_id_manual_intervention),
timeout_s=5,
)

Expand All @@ -399,6 +419,8 @@ def mock_raise_generic_error(
async def test_stop_dynamic_service_serializes_generic_errors(
mock_raise_generic_error: None,
rpc_client: RabbitMQRPCClient,
user_id: UserID,
project_id: ProjectID,
node_id: NodeID,
simcore_user_agent: str,
save_state: bool,
Expand All @@ -408,8 +430,12 @@ async def test_stop_dynamic_service_serializes_generic_errors(
):
await services.stop_dynamic_service(
rpc_client,
node_id=node_id,
simcore_user_agent=simcore_user_agent,
save_state=save_state,
dynamic_service_stop=DynamicServiceStop(
user_id=user_id,
project_id=project_id,
node_id=node_id,
simcore_user_agent=simcore_user_agent,
save_state=save_state,
),
timeout_s=5,
)
Loading

0 comments on commit d25082c

Please sign in to comment.