diff --git a/.env-devel b/.env-devel index 4a28dc33cf1..af4ba2efdcc 100644 --- a/.env-devel +++ b/.env-devel @@ -50,6 +50,7 @@ CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=master-github-latest CLUSTERS_KEEPER_DASK_NTHREADS=0 CLUSTERS_KEEPER_DASK_WORKER_SATURATION=inf CLUSTERS_KEEPER_EC2_ACCESS=null +CLUSTERS_KEEPER_SSM_ACCESS=null CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX="" CLUSTERS_KEEPER_LOGLEVEL=WARNING CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=5 diff --git a/.github/workflows/ci-testing-deploy.yml b/.github/workflows/ci-testing-deploy.yml index 76c478addd9..89b0620085d 100644 --- a/.github/workflows/ci-testing-deploy.yml +++ b/.github/workflows/ci-testing-deploy.yml @@ -590,7 +590,7 @@ jobs: unit-test-autoscaling: needs: changes if: ${{ needs.changes.outputs.autoscaling == 'true' || github.event_name == 'push' }} - timeout-minutes: 19 # if this timeout gets too small, then split the tests + timeout-minutes: 22 # temporary: mypy takes a huge amount of time to run here, maybe we should cache it name: "[unit] autoscaling" runs-on: ${{ matrix.os }} strategy: diff --git a/packages/models-library/src/models_library/clusters.py b/packages/models-library/src/models_library/clusters.py index c51598b06ee..1856dc5c287 100644 --- a/packages/models-library/src/models_library/clusters.py +++ b/packages/models-library/src/models_library/clusters.py @@ -96,6 +96,9 @@ class Config(BaseAuthentication.Config): class NoAuthentication(BaseAuthentication): type: Literal["none"] = "none" + class Config(BaseAuthentication.Config): + schema_extra: ClassVar[dict[str, Any]] = {"examples": [{"type": "none"}]} + class TLSAuthentication(BaseAuthentication): type: Literal["tls"] = "tls" diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/logging_tools.py b/packages/pytest-simcore/src/pytest_simcore/helpers/logging_tools.py index 427117749aa..2bb29562d75 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/logging_tools.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/logging_tools.py @@ -133,14 +133,14 @@ def log_context( else: ctx_msg = msg - started_time = datetime.datetime.now(tz=datetime.timezone.utc) + started_time = datetime.datetime.now(tz=datetime.UTC) try: DynamicIndentFormatter.cls_increase_indent() logger.log(level, ctx_msg.starting, *args, **kwargs) with _increased_logger_indent(logger): yield SimpleNamespace(logger=logger, messages=ctx_msg) - elapsed_time = datetime.datetime.now(tz=datetime.timezone.utc) - started_time + elapsed_time = datetime.datetime.now(tz=datetime.UTC) - started_time done_message = ( f"{ctx_msg.done} ({_timedelta_as_minute_second_ms(elapsed_time)})" ) @@ -152,7 +152,7 @@ def log_context( ) except: - elapsed_time = datetime.datetime.now(tz=datetime.timezone.utc) - started_time + elapsed_time = datetime.datetime.now(tz=datetime.UTC) - started_time error_message = ( f"{ctx_msg.raised} ({_timedelta_as_minute_second_ms(elapsed_time)})" ) diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py b/packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py index 6060b2d026f..0225642cc4f 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py @@ -3,10 +3,10 @@ import logging import re from collections import defaultdict -from contextlib import ExitStack +from collections.abc import Generator, Iterator from dataclasses import dataclass, field from enum import Enum, unique -from typing import Any, Final, Generator +from typing import Any, Final from playwright.sync_api import FrameLocator, Page, Request, WebSocket from pytest_simcore.helpers.logging_tools import log_context @@ -263,28 +263,37 @@ def wait_for_pipeline_state( return current_state -def on_web_socket_default_handler(ws) -> None: - """Usage - - from pytest_simcore.playwright_utils import on_web_socket_default_handler - - page.on("websocket", on_web_socket_default_handler) - - """ - stack = ExitStack() - ctx = stack.enter_context( - log_context( - logging.INFO, - ( - f"WebSocket opened: {ws.url}", - "WebSocket closed", - ), - ) - ) +@contextlib.contextmanager +def web_socket_default_log_handler(web_socket: WebSocket) -> Iterator[None]: - ws.on("framesent", lambda payload: ctx.logger.info("⬇️ %s", payload)) - ws.on("framereceived", lambda payload: ctx.logger.info("⬆️ %s", payload)) - ws.on("close", lambda payload: stack.close()) # noqa: ARG005 + try: + with log_context( + logging.DEBUG, + msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)", + ) as ctx: + + def on_framesent(payload: str | bytes) -> None: + ctx.logger.debug("⬇️ Frame sent: %s", payload) + + def on_framereceived(payload: str | bytes) -> None: + ctx.logger.debug("⬆️ Frame received: %s", payload) + + def on_close(payload: WebSocket) -> None: + ctx.logger.warning("⚠️ Websocket closed: %s", payload) + + def on_socketerror(error_msg: str) -> None: + ctx.logger.error("❌ Websocket error: %s", error_msg) + + web_socket.on("framesent", on_framesent) + web_socket.on("framereceived", on_framereceived) + web_socket.on("close", on_close) + web_socket.on("socketerror", on_socketerror) + yield + finally: + web_socket.remove_listener("framesent", on_framesent) + web_socket.remove_listener("framereceived", on_framereceived) + web_socket.remove_listener("close", on_close) + web_socket.remove_listener("socketerror", on_socketerror) def _node_started_predicate(request: Request) -> bool: diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py b/packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py index 58c00e69597..ddbd444c5f6 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py @@ -6,6 +6,8 @@ import arrow from playwright.sync_api import FrameLocator, Page, WebSocket, expect +from pydantic import TypeAdapter # pylint: disable=no-name-in-module +from pydantic import ByteSize from .logging_tools import log_context from .playwright import ( @@ -17,7 +19,7 @@ wait_for_service_running, ) -_S4L_STREAMING_ESTABLISHMENT_MAX_TIME: Final[int] = 15 * SECOND +_S4L_STREAMING_ESTABLISHMENT_MAX_TIME: Final[int] = 30 * SECOND _S4L_SOCKETIO_REGEX: Final[re.Pattern] = re.compile( r"^(?P[^:]+)://(?P[^\.]+)\.services\.(?P[^\/]+)\/socket\.io\/.+$" ) @@ -63,7 +65,7 @@ def __call__(self, message: str) -> bool: self._initial_bit_rate_time = arrow.utcnow().datetime self.logger.info( "%s", - f"{self._initial_bit_rate=} at {self._initial_bit_rate_time.isoformat()}", + f"{TypeAdapter(ByteSize).validate_python(self._initial_bit_rate).human_readable()}/s at {self._initial_bit_rate_time.isoformat()}", ) return False @@ -78,7 +80,7 @@ def __call__(self, message: str) -> bool: bitrate_test = bool(self._initial_bit_rate != current_bitrate) self.logger.info( "%s", - f"{current_bitrate=} after {elapsed_time=}: {'good!' if bitrate_test else 'failed! bitrate did not change! TIP: talk with MaG about underwater cables!'}", + f"{TypeAdapter(ByteSize).validate_python(current_bitrate).human_readable()}/s after {elapsed_time=}: {'good!' if bitrate_test else 'failed! bitrate did not change! TIP: talk with MaG about underwater cables!'}", ) return bitrate_test diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/api/health.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/api/health.py index ad2882da3c8..a971a551e4e 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/api/health.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/api/health.py @@ -21,7 +21,7 @@ @router.get("/", include_in_schema=True, response_class=PlainTextResponse) async def health_check(): # NOTE: sync url in docker/healthcheck.py with this entrypoint! - return f"{__name__}.health_check@{datetime.datetime.now(datetime.timezone.utc).isoformat()}" + return f"{__name__}.health_check@{datetime.datetime.now(datetime.UTC).isoformat()}" class _ComponentStatus(BaseModel): @@ -33,6 +33,7 @@ class _StatusGet(BaseModel): rabbitmq: _ComponentStatus ec2: _ComponentStatus redis_client_sdk: _ComponentStatus + ssm: _ComponentStatus @router.get("/status", include_in_schema=True, response_model=_StatusGet) @@ -40,18 +41,26 @@ async def get_status(app: Annotated[FastAPI, Depends(get_app)]) -> _StatusGet: return _StatusGet( rabbitmq=_ComponentStatus( is_enabled=is_rabbitmq_enabled(app), - is_responsive=await get_rabbitmq_client(app).ping() - if is_rabbitmq_enabled(app) - else False, + is_responsive=( + await get_rabbitmq_client(app).ping() + if is_rabbitmq_enabled(app) + else False + ), ), ec2=_ComponentStatus( is_enabled=bool(app.state.ec2_client), - is_responsive=await app.state.ec2_client.ping() - if app.state.ec2_client - else False, + is_responsive=( + await app.state.ec2_client.ping() if app.state.ec2_client else False + ), ), redis_client_sdk=_ComponentStatus( is_enabled=bool(app.state.redis_client_sdk), is_responsive=await get_redis_client(app).ping(), ), + ssm=_ComponentStatus( + is_enabled=(app.state.ssm_client is not None), + is_responsive=( + await app.state.ssm_client.ping() if app.state.ssm_client else False + ), + ), ) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/constants.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/constants.py new file mode 100644 index 00000000000..7f970665f25 --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/constants.py @@ -0,0 +1,15 @@ +from typing import Final + +from aws_library.ec2._models import AWSTagKey, AWSTagValue +from pydantic import parse_obj_as + +DOCKER_STACK_DEPLOY_COMMAND_NAME: Final[str] = "private cluster docker deploy" +DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as( + AWSTagKey, "io.simcore.clusters-keeper.private_cluster_docker_deploy" +) + +USER_ID_TAG_KEY: Final[AWSTagKey] = parse_obj_as(AWSTagKey, "user_id") +WALLET_ID_TAG_KEY: Final[AWSTagKey] = parse_obj_as(AWSTagKey, "wallet_id") +ROLE_TAG_KEY: Final[AWSTagKey] = parse_obj_as(AWSTagKey, "role") +WORKER_ROLE_TAG_VALUE: Final[AWSTagValue] = parse_obj_as(AWSTagValue, "worker") +MANAGER_ROLE_TAG_VALUE: Final[AWSTagValue] = parse_obj_as(AWSTagValue, "manager") diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py index 14b3d344b70..5948715b081 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/application.py @@ -19,6 +19,7 @@ from ..modules.ec2 import setup as setup_ec2 from ..modules.rabbitmq import setup as setup_rabbitmq from ..modules.redis import setup as setup_redis +from ..modules.ssm import setup as setup_ssm from ..rpc.rpc_routes import setup_rpc_routes from .settings import ApplicationSettings @@ -55,6 +56,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI: setup_rabbitmq(app) setup_rpc_routes(app) setup_ec2(app) + setup_ssm(app) setup_redis(app) setup_clusters_management(app) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py index 17a8ffcaae8..07fd7deb8bf 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/core/settings.py @@ -25,6 +25,7 @@ from settings_library.ec2 import EC2Settings from settings_library.rabbit import RabbitSettings from settings_library.redis import RedisSettings +from settings_library.ssm import SSMSettings from settings_library.tracing import TracingSettings from settings_library.utils_logging import MixinLoggingSettings from types_aiobotocore_ec2.literals import InstanceTypeType @@ -50,6 +51,21 @@ class Config(EC2Settings.Config): } +class ClustersKeeperSSMSettings(SSMSettings): + class Config(SSMSettings.Config): + env_prefix = CLUSTERS_KEEPER_ENV_PREFIX + + schema_extra: ClassVar[dict[str, Any]] = { # type: ignore[misc] + "examples": [ + { + f"{CLUSTERS_KEEPER_ENV_PREFIX}{key}": var + for key, var in example.items() + } + for example in SSMSettings.Config.schema_extra["examples"] + ], + } + + class WorkersEC2InstancesSettings(BaseCustomSettings): WORKERS_EC2_INSTANCES_ALLOWED_TYPES: dict[str, EC2InstanceBootSpecific] = Field( ..., @@ -183,6 +199,12 @@ class PrimaryEC2InstancesSettings(BaseCustomSettings): "that take longer than this time will be terminated as sometimes it happens that EC2 machine fail on start.", ) + PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL: str = Field( + default="172.20.0.0/14", + description="defines the docker swarm default address pool in CIDR format " + "(see https://docs.docker.com/reference/cli/docker/swarm/init/)", + ) + @validator("PRIMARY_EC2_INSTANCES_ALLOWED_TYPES") @classmethod def check_valid_instance_names( @@ -250,6 +272,10 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): auto_default_from_env=True ) + CLUSTERS_KEEPER_SSM_ACCESS: ClustersKeeperSSMSettings | None = Field( + auto_default_from_env=True + ) + CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES: PrimaryEC2InstancesSettings | None = Field( auto_default_from_env=True ) @@ -285,9 +311,11 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings): "(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)", ) - CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION: NonNegativeInt = Field( - default=5, - description="Max number of missed heartbeats before a cluster is terminated", + CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION: NonNegativeInt = ( + Field( + default=5, + description="Max number of missed heartbeats before a cluster is terminated", + ) ) CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG: str = Field( diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py index 38246f3008a..89860549fd3 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters.py @@ -49,7 +49,7 @@ async def _get_primary_ec2_params( ec2_instance_types: list[ EC2InstanceType ] = await ec2_client.get_ec2_instance_capabilities( - instance_type_names=[ec2_type_name] + instance_type_names={ec2_type_name} ) assert ec2_instance_types # nosec assert len(ec2_instance_types) == 1 # nosec @@ -72,15 +72,7 @@ async def create_cluster( tags=creation_ec2_tags(app_settings, user_id=user_id, wallet_id=wallet_id), startup_script=create_startup_script( app_settings, - cluster_machines_name_prefix=get_cluster_name( - app_settings, user_id=user_id, wallet_id=wallet_id, is_manager=False - ), ec2_boot_specific=ec2_instance_boot_specs, - additional_custom_tags={ - AWSTagKey("user_id"): AWSTagValue(f"{user_id}"), - AWSTagKey("wallet_id"): AWSTagValue(f"{wallet_id}"), - AWSTagKey("role"): AWSTagValue("worker"), - }, ), ami_id=ec2_instance_boot_specs.ami_id, key_name=app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_KEY_NAME, diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_core.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_core.py index a7c23143a0b..871ad8bd242 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_core.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_core.py @@ -5,12 +5,22 @@ import arrow from aws_library.ec2 import AWSTagKey, EC2InstanceData +from aws_library.ec2._models import AWSTagValue from fastapi import FastAPI from models_library.users import UserID from models_library.wallets import WalletID from pydantic import parse_obj_as from servicelib.logging_utils import log_catch - +from servicelib.utils import limited_gather + +from ..constants import ( + DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY, + DOCKER_STACK_DEPLOY_COMMAND_NAME, + ROLE_TAG_KEY, + USER_ID_TAG_KEY, + WALLET_ID_TAG_KEY, + WORKER_ROLE_TAG_VALUE, +) from ..core.settings import get_application_settings from ..modules.clusters import ( delete_clusters, @@ -18,9 +28,17 @@ get_cluster_workers, set_instance_heartbeat, ) +from ..utils.clusters import create_deploy_cluster_stack_script from ..utils.dask import get_scheduler_auth, get_scheduler_url -from ..utils.ec2 import HEARTBEAT_TAG_KEY +from ..utils.ec2 import ( + HEARTBEAT_TAG_KEY, + get_cluster_name, + user_id_from_instance_tags, + wallet_id_from_instance_tags, +) from .dask import is_scheduler_busy, ping_scheduler +from .ec2 import get_ec2_client +from .ssm import get_ssm_client _logger = logging.getLogger(__name__) @@ -42,8 +60,8 @@ def _get_instance_last_heartbeat(instance: EC2InstanceData) -> datetime.datetime async def _get_all_associated_worker_instances( app: FastAPI, primary_instances: Iterable[EC2InstanceData], -) -> list[EC2InstanceData]: - worker_instances = [] +) -> set[EC2InstanceData]: + worker_instances: set[EC2InstanceData] = set() for instance in primary_instances: assert "user_id" in instance.tags # nosec user_id = UserID(instance.tags[_USER_ID_TAG_KEY]) @@ -55,7 +73,7 @@ async def _get_all_associated_worker_instances( else None ) - worker_instances.extend( + worker_instances.update( await get_cluster_workers(app, user_id=user_id, wallet_id=wallet_id) ) return worker_instances @@ -63,12 +81,12 @@ async def _get_all_associated_worker_instances( async def _find_terminateable_instances( app: FastAPI, instances: Iterable[EC2InstanceData] -) -> list[EC2InstanceData]: +) -> set[EC2InstanceData]: app_settings = get_application_settings(app) assert app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES # nosec # get the corresponding ec2 instance data - terminateable_instances: list[EC2InstanceData] = [] + terminateable_instances: set[EC2InstanceData] = set() time_to_wait_before_termination = ( app_settings.CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION @@ -82,7 +100,7 @@ async def _find_terminateable_instances( elapsed_time_since_heartbeat = arrow.utcnow().datetime - last_heartbeat allowed_time_to_wait = time_to_wait_before_termination if elapsed_time_since_heartbeat >= allowed_time_to_wait: - terminateable_instances.append(instance) + terminateable_instances.add(instance) else: _logger.info( "%s has still %ss before being terminateable", @@ -93,14 +111,14 @@ async def _find_terminateable_instances( elapsed_time_since_startup = arrow.utcnow().datetime - instance.launch_time allowed_time_to_wait = startup_delay if elapsed_time_since_startup >= allowed_time_to_wait: - terminateable_instances.append(instance) + terminateable_instances.add(instance) # get all terminateable instances associated worker instances worker_instances = await _get_all_associated_worker_instances( app, terminateable_instances ) - return terminateable_instances + worker_instances + return terminateable_instances.union(worker_instances) async def check_clusters(app: FastAPI) -> None: @@ -112,6 +130,7 @@ async def check_clusters(app: FastAPI) -> None: if await ping_scheduler(get_scheduler_url(instance), get_scheduler_auth(app)) } + # set intance heartbeat if scheduler is busy for instance in connected_intances: with log_catch(_logger, reraise=False): # NOTE: some connected instance could in theory break between these 2 calls, therefore this is silenced and will @@ -124,6 +143,7 @@ async def check_clusters(app: FastAPI) -> None: f"{instance.id=} for {instance.tags=}", ) await set_instance_heartbeat(app, instance=instance) + # clean any cluster that is not doing anything if terminateable_instances := await _find_terminateable_instances( app, connected_intances ): @@ -138,7 +158,7 @@ async def check_clusters(app: FastAPI) -> None: for instance in disconnected_instances if _get_instance_last_heartbeat(instance) is None } - + # remove instances that were starting for too long if terminateable_instances := await _find_terminateable_instances( app, starting_instances ): @@ -149,7 +169,72 @@ async def check_clusters(app: FastAPI) -> None: ) await delete_clusters(app, instances=terminateable_instances) - # the other instances are broken (they were at some point connected but now not anymore) + # NOTE: transmit command to start docker swarm/stack if needed + # once the instance is connected to the SSM server, + # use ssm client to send the command to these instances, + # we send a command that contain: + # the docker-compose file in binary, + # the call to init the docker swarm and the call to deploy the stack + instances_in_need_of_deployment = { + i + for i in starting_instances - terminateable_instances + if DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY not in i.tags + } + + if instances_in_need_of_deployment: + app_settings = get_application_settings(app) + ssm_client = get_ssm_client(app) + ec2_client = get_ec2_client(app) + instances_in_need_of_deployment_ssm_connection_state = await limited_gather( + *[ + ssm_client.is_instance_connected_to_ssm_server(i.id) + for i in instances_in_need_of_deployment + ], + reraise=False, + log=_logger, + limit=20, + ) + ec2_connected_to_ssm_server = [ + i + for i, c in zip( + instances_in_need_of_deployment, + instances_in_need_of_deployment_ssm_connection_state, + strict=True, + ) + if c is True + ] + started_instances_ready_for_command = ec2_connected_to_ssm_server + if started_instances_ready_for_command: + # we need to send 1 command per machine here, as the user_id/wallet_id changes + for i in started_instances_ready_for_command: + ssm_command = await ssm_client.send_command( + [i.id], + command=create_deploy_cluster_stack_script( + app_settings, + cluster_machines_name_prefix=get_cluster_name( + app_settings, + user_id=user_id_from_instance_tags(i.tags), + wallet_id=wallet_id_from_instance_tags(i.tags), + is_manager=False, + ), + additional_custom_tags={ + USER_ID_TAG_KEY: i.tags[USER_ID_TAG_KEY], + WALLET_ID_TAG_KEY: i.tags[WALLET_ID_TAG_KEY], + ROLE_TAG_KEY: WORKER_ROLE_TAG_VALUE, + }, + ), + command_name=DOCKER_STACK_DEPLOY_COMMAND_NAME, + ) + await ec2_client.set_instances_tags( + started_instances_ready_for_command, + tags={ + DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY: AWSTagValue( + ssm_command.command_id + ), + }, + ) + + # the remaining instances are broken (they were at some point connected but now not anymore) broken_instances = disconnected_instances - starting_instances if terminateable_instances := await _find_terminateable_instances( app, broken_instances diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py index 806cb6d472c..410edba1efb 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/clusters_management_task.py @@ -47,6 +47,7 @@ def setup(app: FastAPI): for s in [ app_settings.CLUSTERS_KEEPER_EC2_ACCESS, app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES, + app_settings.CLUSTERS_KEEPER_SSM_ACCESS, ] ): logger.warning( diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ssm.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ssm.py new file mode 100644 index 00000000000..218812d5523 --- /dev/null +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/modules/ssm.py @@ -0,0 +1,56 @@ +import logging +from typing import cast + +from aws_library.ssm import SimcoreSSMAPI +from aws_library.ssm._errors import SSMNotConnectedError +from fastapi import FastAPI +from settings_library.ssm import SSMSettings +from tenacity.asyncio import AsyncRetrying +from tenacity.before_sleep import before_sleep_log +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_random_exponential + +from ..core.errors import ConfigurationError +from ..core.settings import get_application_settings + +_logger = logging.getLogger(__name__) + + +def setup(app: FastAPI) -> None: + async def on_startup() -> None: + app.state.ssm_client = None + settings: SSMSettings | None = get_application_settings( + app + ).CLUSTERS_KEEPER_SSM_ACCESS + + if not settings: + _logger.warning("SSM client is de-activated in the settings") + return + + app.state.ssm_client = client = await SimcoreSSMAPI.create(settings) + + async for attempt in AsyncRetrying( + reraise=True, + stop=stop_after_delay(120), + wait=wait_random_exponential(max=30), + before_sleep=before_sleep_log(_logger, logging.WARNING), + ): + with attempt: + connected = await client.ping() + if not connected: + raise SSMNotConnectedError # pragma: no cover + + async def on_shutdown() -> None: + if app.state.ssm_client: + await cast(SimcoreSSMAPI, app.state.ssm_client).close() + + app.add_event_handler("startup", on_startup) + app.add_event_handler("shutdown", on_shutdown) + + +def get_ssm_client(app: FastAPI) -> SimcoreSSMAPI: + if not app.state.ssm_client: + raise ConfigurationError( + msg="SSM client is not available. Please check the configuration." + ) + return cast(SimcoreSSMAPI, app.state.ssm_client) diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py index 48eb4dee380..c9b4a32f4af 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/clusters.py @@ -8,6 +8,7 @@ import arrow import yaml from aws_library.ec2 import EC2InstanceBootSpecific, EC2InstanceData, EC2Tags +from aws_library.ec2._models import CommandStr from fastapi.encoders import jsonable_encoder from models_library.api_schemas_clusters_keeper.clusters import ( ClusterState, @@ -107,35 +108,43 @@ def _convert_to_env_dict(entries: dict[str, Any]) -> str: def create_startup_script( app_settings: ApplicationSettings, *, - cluster_machines_name_prefix: str, ec2_boot_specific: EC2InstanceBootSpecific, - additional_custom_tags: EC2Tags, ) -> str: assert app_settings.CLUSTERS_KEEPER_EC2_ACCESS # nosec assert app_settings.CLUSTERS_KEEPER_WORKERS_EC2_INSTANCES # nosec - environment_variables = _prepare_environment_variables( - app_settings, - cluster_machines_name_prefix=cluster_machines_name_prefix, - additional_custom_tags=additional_custom_tags, - ) - startup_commands = ec2_boot_specific.custom_boot_scripts.copy() + return "\n".join(startup_commands) + + +def create_deploy_cluster_stack_script( + app_settings: ApplicationSettings, + *, + cluster_machines_name_prefix: str, + additional_custom_tags: EC2Tags, +) -> str: + deploy_script: list[CommandStr] = [] assert app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES # nosec if isinstance( app_settings.CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH, TLSAuthentication, ): - + # get the dask certificates download_certificates_commands = [ f"mkdir --parents {_HOST_CERTIFICATES_BASE_PATH}", f'aws ssm get-parameter --name "{app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_CA}" --region us-east-1 --with-decryption --query "Parameter.Value" --output text > {_HOST_TLS_CA_FILE_PATH}', f'aws ssm get-parameter --name "{app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_CERT}" --region us-east-1 --with-decryption --query "Parameter.Value" --output text > {_HOST_TLS_CERT_FILE_PATH}', f'aws ssm get-parameter --name "{app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_KEY}" --region us-east-1 --with-decryption --query "Parameter.Value" --output text > {_HOST_TLS_KEY_FILE_PATH}', ] - startup_commands.extend(download_certificates_commands) + deploy_script.extend(download_certificates_commands) + + environment_variables = _prepare_environment_variables( + app_settings, + cluster_machines_name_prefix=cluster_machines_name_prefix, + additional_custom_tags=additional_custom_tags, + ) - startup_commands.extend( + deploy_script.extend( [ # NOTE: https://stackoverflow.com/questions/41203492/solving-redis-warnings-on-overcommit-memory-and-transparent-huge-pages-for-ubunt "sysctl vm.overcommit_memory=1", @@ -143,11 +152,11 @@ def create_startup_script( f"echo '{_prometheus_yml_base64_encoded()}' | base64 -d > {_HOST_PROMETHEUS_PATH}", f"echo '{_prometheus_basic_auth_yml_base64_encoded(app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_PROMETHEUS_USERNAME, app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_PROMETHEUS_PASSWORD.get_secret_value())}' | base64 -d > {_HOST_PROMETHEUS_WEB_PATH}", # NOTE: --default-addr-pool is necessary in order to prevent conflicts with AWS node IPs - "docker swarm init --default-addr-pool 172.20.0.0/14", + f"docker swarm init --default-addr-pool {app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL}", f"{' '.join(environment_variables)} docker stack deploy --with-registry-auth --compose-file={_HOST_DOCKER_COMPOSE_PATH} dask_stack", ] ) - return "\n".join(startup_commands) + return "\n".join(deploy_script) def _convert_ec2_state_to_cluster_state( diff --git a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py index c74bbc554d9..b48e1076e59 100644 --- a/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py +++ b/services/clusters-keeper/src/simcore_service_clusters_keeper/utils/ec2.py @@ -7,6 +7,12 @@ from pydantic import parse_obj_as from .._meta import VERSION +from ..constants import ( + MANAGER_ROLE_TAG_VALUE, + ROLE_TAG_KEY, + USER_ID_TAG_KEY, + WALLET_ID_TAG_KEY, +) from ..core.settings import ApplicationSettings _APPLICATION_TAG_KEY: Final[str] = "io.simcore.clusters-keeper" @@ -50,9 +56,9 @@ def creation_ec2_tags( app_settings, user_id=user_id, wallet_id=wallet_id, is_manager=True ) ), - AWSTagKey("user_id"): AWSTagValue(f"{user_id}"), - AWSTagKey("wallet_id"): AWSTagValue(f"{wallet_id}"), - AWSTagKey("role"): AWSTagValue("manager"), + USER_ID_TAG_KEY: AWSTagValue(f"{user_id}"), + WALLET_ID_TAG_KEY: AWSTagValue(f"{wallet_id}"), + ROLE_TAG_KEY: MANAGER_ROLE_TAG_VALUE, } | app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_CUSTOM_TAGS ) @@ -67,8 +73,8 @@ def ec2_instances_for_user_wallet_filter( ) -> EC2Tags: return ( _minimal_identification_tag(app_settings) - | {AWSTagKey("user_id"): AWSTagValue(f"{user_id}")} - | {AWSTagKey("wallet_id"): AWSTagValue(f"{wallet_id}")} + | {USER_ID_TAG_KEY: AWSTagValue(f"{user_id}")} + | {WALLET_ID_TAG_KEY: AWSTagValue(f"{wallet_id}")} ) @@ -81,3 +87,14 @@ def compose_user_data(bash_command: str) -> str: echo "completed user data bash script" """ ) + + +def wallet_id_from_instance_tags(tags: EC2Tags) -> WalletID | None: + wallet_id_str = tags[WALLET_ID_TAG_KEY] + if wallet_id_str == "None": + return None + return WalletID(wallet_id_str) + + +def user_id_from_instance_tags(tags: EC2Tags) -> UserID: + return UserID(tags[USER_ID_TAG_KEY]) diff --git a/services/clusters-keeper/tests/unit/conftest.py b/services/clusters-keeper/tests/unit/conftest.py index a8f4913d4bb..43805123c30 100644 --- a/services/clusters-keeper/tests/unit/conftest.py +++ b/services/clusters-keeper/tests/unit/conftest.py @@ -22,11 +22,13 @@ from fastapi import FastAPI from models_library.users import UserID from models_library.wallets import WalletID +from pydantic import SecretStr from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from servicelib.rabbitmq import RabbitMQRPCClient from settings_library.ec2 import EC2Settings from settings_library.rabbit import RabbitSettings +from settings_library.ssm import SSMSettings from simcore_service_clusters_keeper.core.application import create_app from simcore_service_clusters_keeper.core.settings import ( CLUSTERS_KEEPER_ENV_PREFIX, @@ -86,6 +88,21 @@ def mocked_ec2_server_envs( return setenvs_from_dict(monkeypatch, changed_envs) +@pytest.fixture +def mocked_ssm_server_envs( + mocked_ssm_server_settings: SSMSettings, + monkeypatch: pytest.MonkeyPatch, +) -> EnvVarsDict: + # NOTE: overrides the SSMSettings with what clusters-keeper expects + changed_envs: EnvVarsDict = { + f"{CLUSTERS_KEEPER_ENV_PREFIX}{k}": ( + v.get_secret_value() if isinstance(v, SecretStr) else v + ) + for k, v in mocked_ssm_server_settings.dict().items() + } + return setenvs_from_dict(monkeypatch, changed_envs) + + @pytest.fixture def ec2_settings(mocked_ec2_server_settings: EC2Settings) -> EC2Settings: return mocked_ec2_server_settings @@ -105,6 +122,9 @@ def app_environment( "CLUSTERS_KEEPER_EC2_ACCESS": "{}", "CLUSTERS_KEEPER_EC2_ACCESS_KEY_ID": faker.pystr(), "CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY": faker.pystr(), + "CLUSTERS_KEEPER_SSM_ACCESS": "{}", + "CLUSTERS_KEEPER_SSM_ACCESS_KEY_ID": faker.pystr(), + "CLUSTERS_KEEPER_SSM_SECRET_ACCESS_KEY": faker.pystr(), "CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES": "{}", "CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX": faker.pystr(), "CLUSTERS_KEEPER_DASK_NTHREADS": f"{faker.pyint(min_value=0)}", @@ -206,6 +226,11 @@ def disabled_ec2(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch): monkeypatch.setenv("CLUSTERS_KEEPER_EC2_ACCESS", "null") +@pytest.fixture +def disabled_ssm(app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch): + monkeypatch.setenv("CLUSTERS_KEEPER_SSM_ACCESS", "null") + + @pytest.fixture def enabled_rabbitmq( app_environment: EnvVarsDict, rabbit_service: RabbitSettings diff --git a/services/clusters-keeper/tests/unit/test_api_health.py b/services/clusters-keeper/tests/unit/test_api_health.py index 734620afa1b..5bf72ccae8e 100644 --- a/services/clusters-keeper/tests/unit/test_api_health.py +++ b/services/clusters-keeper/tests/unit/test_api_health.py @@ -21,6 +21,7 @@ def app_environment( app_environment: EnvVarsDict, enabled_rabbitmq: None, mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, mocked_redis_server: None, ) -> EnvVarsDict: return app_environment @@ -69,6 +70,9 @@ async def test_status( assert status_response.ec2.is_enabled is True assert status_response.ec2.is_responsive is False + assert status_response.ssm.is_enabled is True + assert status_response.ssm.is_responsive is False + # restart the server mocked_aws_server.start() @@ -83,3 +87,6 @@ async def test_status( assert status_response.ec2.is_enabled is True assert status_response.ec2.is_responsive is True + + assert status_response.ssm.is_enabled is True + assert status_response.ssm.is_responsive is True diff --git a/services/clusters-keeper/tests/unit/test_modules_clusters.py b/services/clusters-keeper/tests/unit/test_modules_clusters.py index 16cfbde04b2..497b9e447e7 100644 --- a/services/clusters-keeper/tests/unit/test_modules_clusters.py +++ b/services/clusters-keeper/tests/unit/test_modules_clusters.py @@ -49,6 +49,7 @@ def _base_configuration( mocked_redis_server: None, mocked_ec2_server_envs: EnvVarsDict, mocked_primary_ec2_instances_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, ) -> None: ... diff --git a/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py b/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py index 09720632fd4..438e69ee72e 100644 --- a/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py +++ b/services/clusters-keeper/tests/unit/test_modules_clusters_management_core.py @@ -60,6 +60,7 @@ def _base_configuration( mocked_redis_server: None, mocked_ec2_server_envs: EnvVarsDict, mocked_primary_ec2_instances_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, ) -> None: ... diff --git a/services/clusters-keeper/tests/unit/test_modules_clusters_management_task.py b/services/clusters-keeper/tests/unit/test_modules_clusters_management_task.py index 0c9c52eab4c..d22bdce1f76 100644 --- a/services/clusters-keeper/tests/unit/test_modules_clusters_management_task.py +++ b/services/clusters-keeper/tests/unit/test_modules_clusters_management_task.py @@ -37,6 +37,7 @@ def mock_background_task(mocker: MockerFixture) -> mock.Mock: async def test_clusters_management_task_created_and_deleted( disabled_rabbitmq: None, mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, mocked_redis_server: None, mock_background_task: mock.Mock, initialized_app: FastAPI, diff --git a/services/clusters-keeper/tests/unit/test_modules_ec2.py b/services/clusters-keeper/tests/unit/test_modules_ec2.py index 0820ada5818..439e54aaa2d 100644 --- a/services/clusters-keeper/tests/unit/test_modules_ec2.py +++ b/services/clusters-keeper/tests/unit/test_modules_ec2.py @@ -5,13 +5,16 @@ import pytest from fastapi import FastAPI +from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict from simcore_service_clusters_keeper.core.errors import ConfigurationError from simcore_service_clusters_keeper.modules.ec2 import get_ec2_client +from simcore_service_clusters_keeper.modules.ssm import get_ssm_client -async def test_ec2_does_not_initialize_if_deactivated( +async def test_ec2_does_not_initialize_if_ec2_deactivated( disabled_rabbitmq: None, disabled_ec2: None, + mocked_ssm_server_envs: EnvVarsDict, mocked_redis_server: None, initialized_app: FastAPI, ): @@ -19,3 +22,5 @@ async def test_ec2_does_not_initialize_if_deactivated( assert initialized_app.state.ec2_client is None with pytest.raises(ConfigurationError): get_ec2_client(initialized_app) + + assert get_ssm_client(initialized_app) diff --git a/services/clusters-keeper/tests/unit/test_modules_rabbitmq.py b/services/clusters-keeper/tests/unit/test_modules_rabbitmq.py index a2c23ac0602..1bbd5683c76 100644 --- a/services/clusters-keeper/tests/unit/test_modules_rabbitmq.py +++ b/services/clusters-keeper/tests/unit/test_modules_rabbitmq.py @@ -43,8 +43,8 @@ def rabbit_log_message(faker: Faker) -> LoggerRabbitMessage: return LoggerRabbitMessage( user_id=faker.pyint(min_value=1), - project_id=faker.uuid4(), - node_id=faker.uuid4(), + project_id=faker.uuid4(cast_to=None), + node_id=faker.uuid4(cast_to=None), messages=faker.pylist(allowed_types=(str,)), ) @@ -62,6 +62,7 @@ def rabbit_message( def test_rabbitmq_does_not_initialize_if_deactivated( disabled_rabbitmq: None, disabled_ec2: None, + disabled_ssm: None, mocked_redis_server: None, initialized_app: FastAPI, ): @@ -78,6 +79,7 @@ def test_rabbitmq_does_not_initialize_if_deactivated( def test_rabbitmq_initializes( enabled_rabbitmq: RabbitSettings, disabled_ec2: None, + disabled_ssm: None, mocked_redis_server: None, initialized_app: FastAPI, ): @@ -95,6 +97,7 @@ def test_rabbitmq_initializes( async def test_post_message( enabled_rabbitmq: RabbitSettings, disabled_ec2: None, + disabled_ssm: None, mocked_redis_server: None, initialized_app: FastAPI, rabbit_message: RabbitMessageBase, @@ -124,6 +127,7 @@ async def test_post_message( async def test_post_message_with_disabled_rabbit_does_not_raise( disabled_rabbitmq: None, disabled_ec2: None, + disabled_ssm: None, mocked_redis_server: None, initialized_app: FastAPI, rabbit_message: RabbitMessageBase, @@ -135,6 +139,7 @@ async def test_post_message_when_rabbit_disconnected_does_not_raise( paused_container: Callable[[str], AbstractAsyncContextManager[None]], enabled_rabbitmq: RabbitSettings, disabled_ec2: None, + disabled_ssm: None, mocked_redis_server: None, initialized_app: FastAPI, rabbit_log_message: LoggerRabbitMessage, diff --git a/services/clusters-keeper/tests/unit/test_modules_redis.py b/services/clusters-keeper/tests/unit/test_modules_redis.py index f6b760f27fb..44fb9a9f6ac 100644 --- a/services/clusters-keeper/tests/unit/test_modules_redis.py +++ b/services/clusters-keeper/tests/unit/test_modules_redis.py @@ -10,6 +10,7 @@ async def test_redis_raises_if_missing( disabled_rabbitmq: None, disabled_ec2: None, + disabled_ssm: None, mocked_redis_server: None, initialized_app: FastAPI, ): diff --git a/services/clusters-keeper/tests/unit/test_modules_remote_debug.py b/services/clusters-keeper/tests/unit/test_modules_remote_debug.py index dbb5a91922e..3fe8b823d13 100644 --- a/services/clusters-keeper/tests/unit/test_modules_remote_debug.py +++ b/services/clusters-keeper/tests/unit/test_modules_remote_debug.py @@ -23,6 +23,7 @@ def app_environment( def test_application_with_debug_enabled( disabled_rabbitmq: None, disabled_ec2: None, + disabled_ssm: None, mocked_redis_server: None, initialized_app: FastAPI, ): diff --git a/services/clusters-keeper/tests/unit/test_modules_ssm.py b/services/clusters-keeper/tests/unit/test_modules_ssm.py new file mode 100644 index 00000000000..3bcffb72661 --- /dev/null +++ b/services/clusters-keeper/tests/unit/test_modules_ssm.py @@ -0,0 +1,22 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable + + +import pytest +from fastapi import FastAPI +from simcore_service_clusters_keeper.core.errors import ConfigurationError +from simcore_service_clusters_keeper.modules.ssm import get_ssm_client + + +async def test_ssm_does_not_initialize_if_ssm_deactivated( + disabled_rabbitmq: None, + disabled_ec2: None, + disabled_ssm: None, + mocked_redis_server: None, + initialized_app: FastAPI, +): + assert hasattr(initialized_app.state, "ssm_client") + assert initialized_app.state.ssm_client is None + with pytest.raises(ConfigurationError): + get_ssm_client(initialized_app) diff --git a/services/clusters-keeper/tests/unit/test_rpc_clusters.py b/services/clusters-keeper/tests/unit/test_rpc_clusters.py index 41146c827bd..a280cbb5338 100644 --- a/services/clusters-keeper/tests/unit/test_rpc_clusters.py +++ b/services/clusters-keeper/tests/unit/test_rpc_clusters.py @@ -43,6 +43,7 @@ def _base_configuration( mocked_redis_server: None, mocked_ec2_server_envs: EnvVarsDict, mocked_primary_ec2_instances_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, initialized_app: FastAPI, ensure_run_in_sequence_context_is_empty: None, ) -> None: diff --git a/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py b/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py index d03b6b74502..f4eea132cdf 100644 --- a/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py +++ b/services/clusters-keeper/tests/unit/test_rpc_ec2_instances.py @@ -24,6 +24,7 @@ def _base_configuration( enabled_rabbitmq: None, mocked_redis_server: None, mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, initialized_app: FastAPI, ) -> None: ... diff --git a/services/clusters-keeper/tests/unit/test_utils_clusters.py b/services/clusters-keeper/tests/unit/test_utils_clusters.py index a6592ed1fa4..1c4a7760d5f 100644 --- a/services/clusters-keeper/tests/unit/test_utils_clusters.py +++ b/services/clusters-keeper/tests/unit/test_utils_clusters.py @@ -29,6 +29,7 @@ from simcore_service_clusters_keeper.utils.clusters import ( _prepare_environment_variables, create_cluster_from_ec2_instance, + create_deploy_cluster_stack_script, create_startup_script, ) from types_aiobotocore_ec2.literals import InstanceStateNameType @@ -51,16 +52,26 @@ def ec2_boot_specs(app_settings: ApplicationSettings) -> EC2InstanceBootSpecific return ec2_boot_specs +@pytest.fixture(params=[TLSAuthentication, NoAuthentication]) +def backend_cluster_auth( + request: pytest.FixtureRequest, +) -> InternalClusterAuthentication: + return request.param + + @pytest.fixture def app_environment( app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch, + backend_cluster_auth: InternalClusterAuthentication, ) -> EnvVarsDict: return app_environment | setenvs_from_dict( monkeypatch, { "CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DEFAULT_CLUSTER_AUTH": json_dumps( TLSAuthentication.Config.schema_extra["examples"][0] + if isinstance(backend_cluster_auth, TLSAuthentication) + else NoAuthentication.Config.schema_extra["examples"][0] ) }, ) @@ -69,38 +80,52 @@ def app_environment( def test_create_startup_script( disabled_rabbitmq: None, mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, mocked_redis_server: None, app_settings: ApplicationSettings, - cluster_machines_name_prefix: str, - clusters_keeper_docker_compose: dict[str, Any], ec2_boot_specs: EC2InstanceBootSpecific, ): - additional_custom_tags = { - AWSTagKey("pytest-tag-key"): AWSTagValue("pytest-tag-value") - } startup_script = create_startup_script( app_settings, - cluster_machines_name_prefix=cluster_machines_name_prefix, ec2_boot_specific=ec2_boot_specs, - additional_custom_tags=additional_custom_tags, ) assert isinstance(startup_script, str) assert len(ec2_boot_specs.custom_boot_scripts) > 0 for boot_script in ec2_boot_specs.custom_boot_scripts: assert boot_script in startup_script + + +def test_create_deploy_cluster_stack_script( + disabled_rabbitmq: None, + mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, + mocked_redis_server: None, + app_settings: ApplicationSettings, + cluster_machines_name_prefix: str, + clusters_keeper_docker_compose: dict[str, Any], +): + additional_custom_tags = { + AWSTagKey("pytest-tag-key"): AWSTagValue("pytest-tag-value") + } + deploy_script = create_deploy_cluster_stack_script( + app_settings, + cluster_machines_name_prefix=cluster_machines_name_prefix, + additional_custom_tags=additional_custom_tags, + ) + assert isinstance(deploy_script, str) # we have commands to pipe into a docker-compose file - assert " | base64 -d > /docker-compose.yml" in startup_script + assert " | base64 -d > /docker-compose.yml" in deploy_script # we have commands to init a docker-swarm - assert "docker swarm init" in startup_script + assert "docker swarm init --default-addr-pool" in deploy_script # we have commands to deploy a stack assert ( "docker stack deploy --with-registry-auth --compose-file=/docker-compose.yml dask_stack" - in startup_script + in deploy_script ) # before that we have commands that setup ENV variables, let's check we have all of them as defined in the docker-compose # let's get what was set in the startup script and compare with the expected one of the docker-compose startup_script_envs_definition = ( - startup_script.splitlines()[-1].split("docker stack deploy")[0].strip() + deploy_script.splitlines()[-1].split("docker stack deploy")[0].strip() ) assert startup_script_envs_definition # Use regular expression to split the string into key-value pairs (courtesy of chatGPT) @@ -137,7 +162,7 @@ def test_create_startup_script( "WORKERS_EC2_INSTANCES_SECURITY_GROUP_IDS", ] assert all( - re.search(rf"{i}=\[(\\\".+\\\")*\]", startup_script) for i in list_settings + re.search(rf"{i}=\[(\\\".+\\\")*\]", deploy_script) for i in list_settings ) # check dicts have \' in front @@ -146,35 +171,55 @@ def test_create_startup_script( "WORKERS_EC2_INSTANCES_CUSTOM_TAGS", ] assert all( - re.search(rf"{i}=\'{{(\".+\":\s\".*\")+}}\'", startup_script) + re.search(rf"{i}=\'{{(\".+\":\s\".*\")+}}\'", deploy_script) for i in dict_settings ) # check the additional tags are in assert all( - f'"{key}": "{value}"' in startup_script + f'"{key}": "{value}"' in deploy_script for key, value in additional_custom_tags.items() ) -def test_create_startup_script_script_size_below_16kb( +def test_create_deploy_cluster_stack_script_below_64kb( disabled_rabbitmq: None, mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, mocked_redis_server: None, app_settings: ApplicationSettings, cluster_machines_name_prefix: str, clusters_keeper_docker_compose: dict[str, Any], - ec2_boot_specs: EC2InstanceBootSpecific, ): additional_custom_tags = { AWSTagKey("pytest-tag-key"): AWSTagValue("pytest-tag-value") } - startup_script = create_startup_script( + deploy_script = create_deploy_cluster_stack_script( app_settings, cluster_machines_name_prefix=cluster_machines_name_prefix, - ec2_boot_specific=ec2_boot_specs, additional_custom_tags=additional_custom_tags, ) + deploy_script_size_in_bytes = len(deploy_script.encode("utf-8")) + assert deploy_script_size_in_bytes < 64000, ( + f"script size is {deploy_script_size_in_bytes} bytes that exceeds the SSM command of 64KB. " + "TIP: split commands or reduce size." + ) + + +def test_create_startup_script_script_size_below_16kb( + disabled_rabbitmq: None, + mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, + mocked_redis_server: None, + app_settings: ApplicationSettings, + cluster_machines_name_prefix: str, + clusters_keeper_docker_compose: dict[str, Any], + ec2_boot_specs: EC2InstanceBootSpecific, +): + startup_script = create_startup_script( + app_settings, + ec2_boot_specific=ec2_boot_specs, + ) script_size_in_bytes = len(startup_script.encode("utf-8")) print( @@ -184,13 +229,13 @@ def test_create_startup_script_script_size_below_16kb( assert script_size_in_bytes < 15 * 1024 -def test_startup_script_defines_all_envs_for_docker_compose( +def test__prepare_environment_variables_defines_all_envs_for_docker_compose( disabled_rabbitmq: None, mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, mocked_redis_server: None, app_settings: ApplicationSettings, cluster_machines_name_prefix: str, - ec2_boot_specs: EC2InstanceBootSpecific, clusters_keeper_docker_compose_file: Path, ): additional_custom_tags = { @@ -202,8 +247,8 @@ def test_startup_script_defines_all_envs_for_docker_compose( additional_custom_tags=additional_custom_tags, ) assert environment_variables - process = subprocess.run( - [ # noqa: S603, S607 + process = subprocess.run( # noqa: S603 + [ # noqa: S607 "docker", "compose", "--dry-run", diff --git a/services/clusters-keeper/tests/unit/test_utils_ec2.py b/services/clusters-keeper/tests/unit/test_utils_ec2.py index cc466d113ac..125670475db 100644 --- a/services/clusters-keeper/tests/unit/test_utils_ec2.py +++ b/services/clusters-keeper/tests/unit/test_utils_ec2.py @@ -25,6 +25,7 @@ def wallet_id(faker: Faker) -> WalletID: def test_get_cluster_name( disabled_rabbitmq: None, disabled_ec2: None, + disabled_ssm: None, mocked_redis_server: None, app_settings: ApplicationSettings, user_id: UserID, @@ -46,9 +47,21 @@ def test_get_cluster_name( == f"{app_settings.CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX}osparc-computational-cluster-worker-{app_settings.SWARM_STACK_NAME}-user_id:{user_id}-wallet_id:{wallet_id}" ) + assert ( + get_cluster_name(app_settings, user_id=user_id, wallet_id=None, is_manager=True) + == f"{app_settings.CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX}osparc-computational-cluster-manager-{app_settings.SWARM_STACK_NAME}-user_id:{user_id}-wallet_id:None" + ) + assert ( + get_cluster_name( + app_settings, user_id=user_id, wallet_id=None, is_manager=False + ) + == f"{app_settings.CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX}osparc-computational-cluster-worker-{app_settings.SWARM_STACK_NAME}-user_id:{user_id}-wallet_id:None" + ) + def test_creation_ec2_tags( mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, disabled_rabbitmq: None, mocked_redis_server: None, app_settings: ApplicationSettings, @@ -78,6 +91,7 @@ def test_creation_ec2_tags( def test_all_created_ec2_instances_filter( mocked_ec2_server_envs: EnvVarsDict, + mocked_ssm_server_envs: EnvVarsDict, disabled_rabbitmq: None, mocked_redis_server: None, app_settings: ApplicationSettings, diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 60ce2c26b17..af73de611b4 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -189,6 +189,11 @@ services: CLUSTERS_KEEPER_EC2_ENDPOINT: ${CLUSTERS_KEEPER_EC2_ENDPOINT} CLUSTERS_KEEPER_EC2_REGION_NAME: ${CLUSTERS_KEEPER_EC2_REGION_NAME} CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY: ${CLUSTERS_KEEPER_EC2_SECRET_ACCESS_KEY} + CLUSTERS_KEEPER_SSM_ACCESS: ${CLUSTERS_KEEPER_SSM_ACCESS} + CLUSTERS_KEEPER_SSM_ACCESS_KEY_ID: ${CLUSTERS_KEEPER_SSM_ACCESS_KEY_ID} + CLUSTERS_KEEPER_SSM_ENDPOINT: ${CLUSTERS_KEEPER_SSM_ENDPOINT} + CLUSTERS_KEEPER_SSM_REGION_NAME: ${CLUSTERS_KEEPER_SSM_REGION_NAME} + CLUSTERS_KEEPER_SSM_SECRET_ACCESS_KEY: ${CLUSTERS_KEEPER_SSM_SECRET_ACCESS_KEY} CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX: ${CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX} LOG_FORMAT_LOCAL_DEV_ENABLED: ${LOG_FORMAT_LOCAL_DEV_ENABLED} CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES: ${CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES} @@ -204,6 +209,8 @@ services: PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_KEY: ${PRIMARY_EC2_INSTANCES_SSM_TLS_DASK_KEY} PRIMARY_EC2_INSTANCES_PROMETHEUS_USERNAME: ${PRIMARY_EC2_INSTANCES_PROMETHEUS_USERNAME} PRIMARY_EC2_INSTANCES_PROMETHEUS_PASSWORD: ${PRIMARY_EC2_INSTANCES_PROMETHEUS_PASSWORD} + PRIMARY_EC2_INSTANCES_MAX_START_TIME: ${PRIMARY_EC2_INSTANCES_MAX_START_TIME} + PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL: ${PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL} RABBIT_HOST: ${RABBIT_HOST} RABBIT_PASSWORD: ${RABBIT_PASSWORD} RABBIT_PORT: ${RABBIT_PORT} diff --git a/tests/e2e-playwright/tests/conftest.py b/tests/e2e-playwright/tests/conftest.py index d7104c6fe70..997ac6b7138 100644 --- a/tests/e2e-playwright/tests/conftest.py +++ b/tests/e2e-playwright/tests/conftest.py @@ -25,8 +25,8 @@ from pytest import Item from pytest_simcore.helpers.logging_tools import log_context from pytest_simcore.helpers.playwright import ( - SECOND, MINUTE, + SECOND, AutoRegisteredUser, RunningState, ServiceType, @@ -34,6 +34,7 @@ SocketIOProjectClosedWaiter, SocketIOProjectStateUpdatedWaiter, decode_socketio_42_message, + web_socket_default_log_handler, ) _PROJECT_CLOSING_TIMEOUT: Final[int] = 10 * MINUTE @@ -171,9 +172,11 @@ def pytest_runtest_makereport(item: Item, call): diagnostics["duration"] = str(end_time - start_time) # Print the diagnostics report - print(f"\nDiagnostics repoort for {test_name} ---") - print(json.dumps(diagnostics, indent=2)) - print("---") + with log_context( + logging.WARNING, + f"ℹ️ Diagnostics report for {test_name} ---", # noqa: RUF001 + ) as ctx: + ctx.logger.warning(json.dumps(diagnostics, indent=2)) @pytest.hookimpl(tryfirst=True) @@ -369,7 +372,8 @@ def log_in_and_out( if quickStartWindowCloseBtnLocator.is_visible(): quickStartWindowCloseBtnLocator.click() - yield ws + with web_socket_default_log_handler(ws): + yield ws with log_context( logging.INFO, @@ -410,12 +414,17 @@ def _( f"Open project in {product_url=} as {product_billable=}", ) as ctx: waiter = SocketIOProjectStateUpdatedWaiter(expected_states=expected_states) - timeout = _OPENING_TUTORIAL_MAX_WAIT_TIME if template_id is not None else _OPENING_NEW_EMPTY_PROJECT_MAX_WAIT_TIME + timeout = ( + _OPENING_TUTORIAL_MAX_WAIT_TIME + if template_id is not None + else _OPENING_NEW_EMPTY_PROJECT_MAX_WAIT_TIME + ) with ( - log_in_and_out.expect_event("framereceived", waiter, timeout=timeout + 10 * SECOND), + log_in_and_out.expect_event( + "framereceived", waiter, timeout=timeout + 10 * SECOND + ), page.expect_response( - re.compile(r"/projects/[^:]+:open"), - timeout=timeout + 5 * SECOND + re.compile(r"/projects/[^:]+:open"), timeout=timeout + 5 * SECOND ) as response_info, ): # Project detail view pop-ups shows @@ -436,8 +445,11 @@ def _( # From the long running tasks response's urls, only their path is relevant def url_to_path(url): return urllib.parse.urlparse(url).path + def wait_for_done(response): - if url_to_path(response.url) == url_to_path(lrt_data["status_href"]): + if url_to_path(response.url) == url_to_path( + lrt_data["status_href"] + ): resp_data = response.json() resp_data = resp_data["data"] assert "task_progress" in resp_data @@ -448,10 +460,13 @@ def wait_for_done(response): task_progress["message"], ) return False - if url_to_path(response.url) == url_to_path(lrt_data["result_href"]): + if url_to_path(response.url) == url_to_path( + lrt_data["result_href"] + ): copying_logger.logger.info("project created") return response.status == 201 return False + with page.expect_response(wait_for_done, timeout=timeout): # if the above calls go to fast, this test could fail # not expected in the sim4life context though diff --git a/tests/e2e-playwright/tests/sim4life/test_sim4life.py b/tests/e2e-playwright/tests/sim4life/test_sim4life.py index 23778f3f3f5..b993f262181 100644 --- a/tests/e2e-playwright/tests/sim4life/test_sim4life.py +++ b/tests/e2e-playwright/tests/sim4life/test_sim4life.py @@ -11,7 +11,10 @@ from typing import Any from playwright.sync_api import Page, WebSocket -from pytest_simcore.helpers.playwright import ServiceType +from pytest_simcore.helpers.playwright import ( + ServiceType, + web_socket_default_log_handler, +) from pytest_simcore.helpers.playwright_sim4life import ( check_video_streaming, interact_with_s4l, @@ -49,8 +52,9 @@ def test_sim4life( page, node_ids[0], log_in_and_out, autoscaled=autoscaled, copy_workspace=False ) s4l_websocket = resp["websocket"] - s4l_iframe = resp["iframe"] - interact_with_s4l(page, s4l_iframe) + with web_socket_default_log_handler(s4l_websocket): + s4l_iframe = resp["iframe"] + interact_with_s4l(page, s4l_iframe) - if check_videostreaming: - check_video_streaming(page, s4l_iframe, s4l_websocket) + if check_videostreaming: + check_video_streaming(page, s4l_iframe, s4l_websocket) diff --git a/tests/e2e-playwright/tests/sim4life/test_template.py b/tests/e2e-playwright/tests/sim4life/test_template.py index 9ac5d4ae065..a4f104a6291 100644 --- a/tests/e2e-playwright/tests/sim4life/test_template.py +++ b/tests/e2e-playwright/tests/sim4life/test_template.py @@ -11,6 +11,7 @@ from typing import Any from playwright.sync_api import Page, WebSocket +from pytest_simcore.helpers.playwright import web_socket_default_log_handler from pytest_simcore.helpers.playwright_sim4life import ( check_video_streaming, interact_with_s4l, @@ -39,8 +40,9 @@ def test_template( page, node_ids[0], log_in_and_out, autoscaled=autoscaled, copy_workspace=True ) s4l_websocket = resp["websocket"] - s4l_iframe = resp["iframe"] - interact_with_s4l(page, s4l_iframe) + with web_socket_default_log_handler(s4l_websocket): + s4l_iframe = resp["iframe"] + interact_with_s4l(page, s4l_iframe) - if check_videostreaming: - check_video_streaming(page, s4l_iframe, s4l_websocket) + if check_videostreaming: + check_video_streaming(page, s4l_iframe, s4l_websocket)