Skip to content

Commit

Permalink
Merge branch 'master' into fix/leave-study
Browse files Browse the repository at this point in the history
  • Loading branch information
odeimaiz authored Sep 20, 2024
2 parents b193b8d + 2990728 commit 13f1f9d
Show file tree
Hide file tree
Showing 34 changed files with 511 additions and 124 deletions.
1 change: 1 addition & 0 deletions .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG=master-github-latest
CLUSTERS_KEEPER_DASK_NTHREADS=0
CLUSTERS_KEEPER_DASK_WORKER_SATURATION=inf
CLUSTERS_KEEPER_EC2_ACCESS=null
CLUSTERS_KEEPER_SSM_ACCESS=null
CLUSTERS_KEEPER_EC2_INSTANCES_PREFIX=""
CLUSTERS_KEEPER_LOGLEVEL=WARNING
CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION=5
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-testing-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ jobs:
unit-test-autoscaling:
needs: changes
if: ${{ needs.changes.outputs.autoscaling == 'true' || github.event_name == 'push' }}
timeout-minutes: 19 # if this timeout gets too small, then split the tests
timeout-minutes: 22 # temporary: mypy takes a huge amount of time to run here, maybe we should cache it
name: "[unit] autoscaling"
runs-on: ${{ matrix.os }}
strategy:
Expand Down
3 changes: 3 additions & 0 deletions packages/models-library/src/models_library/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class Config(BaseAuthentication.Config):
class NoAuthentication(BaseAuthentication):
type: Literal["none"] = "none"

class Config(BaseAuthentication.Config):
schema_extra: ClassVar[dict[str, Any]] = {"examples": [{"type": "none"}]}


class TLSAuthentication(BaseAuthentication):
type: Literal["tls"] = "tls"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ def log_context(
else:
ctx_msg = msg

started_time = datetime.datetime.now(tz=datetime.timezone.utc)
started_time = datetime.datetime.now(tz=datetime.UTC)
try:
DynamicIndentFormatter.cls_increase_indent()

logger.log(level, ctx_msg.starting, *args, **kwargs)
with _increased_logger_indent(logger):
yield SimpleNamespace(logger=logger, messages=ctx_msg)
elapsed_time = datetime.datetime.now(tz=datetime.timezone.utc) - started_time
elapsed_time = datetime.datetime.now(tz=datetime.UTC) - started_time
done_message = (
f"{ctx_msg.done} ({_timedelta_as_minute_second_ms(elapsed_time)})"
)
Expand All @@ -152,7 +152,7 @@ def log_context(
)

except:
elapsed_time = datetime.datetime.now(tz=datetime.timezone.utc) - started_time
elapsed_time = datetime.datetime.now(tz=datetime.UTC) - started_time
error_message = (
f"{ctx_msg.raised} ({_timedelta_as_minute_second_ms(elapsed_time)})"
)
Expand Down
55 changes: 32 additions & 23 deletions packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import logging
import re
from collections import defaultdict
from contextlib import ExitStack
from collections.abc import Generator, Iterator
from dataclasses import dataclass, field
from enum import Enum, unique
from typing import Any, Final, Generator
from typing import Any, Final

from playwright.sync_api import FrameLocator, Page, Request, WebSocket
from pytest_simcore.helpers.logging_tools import log_context
Expand Down Expand Up @@ -263,28 +263,37 @@ def wait_for_pipeline_state(
return current_state


def on_web_socket_default_handler(ws) -> None:
"""Usage
from pytest_simcore.playwright_utils import on_web_socket_default_handler
page.on("websocket", on_web_socket_default_handler)
"""
stack = ExitStack()
ctx = stack.enter_context(
log_context(
logging.INFO,
(
f"WebSocket opened: {ws.url}",
"WebSocket closed",
),
)
)
@contextlib.contextmanager
def web_socket_default_log_handler(web_socket: WebSocket) -> Iterator[None]:

ws.on("framesent", lambda payload: ctx.logger.info("⬇️ %s", payload))
ws.on("framereceived", lambda payload: ctx.logger.info("⬆️ %s", payload))
ws.on("close", lambda payload: stack.close()) # noqa: ARG005
try:
with log_context(
logging.DEBUG,
msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)",
) as ctx:

def on_framesent(payload: str | bytes) -> None:
ctx.logger.debug("⬇️ Frame sent: %s", payload)

def on_framereceived(payload: str | bytes) -> None:
ctx.logger.debug("⬆️ Frame received: %s", payload)

def on_close(payload: WebSocket) -> None:
ctx.logger.warning("⚠️ Websocket closed: %s", payload)

def on_socketerror(error_msg: str) -> None:
ctx.logger.error("❌ Websocket error: %s", error_msg)

web_socket.on("framesent", on_framesent)
web_socket.on("framereceived", on_framereceived)
web_socket.on("close", on_close)
web_socket.on("socketerror", on_socketerror)
yield
finally:
web_socket.remove_listener("framesent", on_framesent)
web_socket.remove_listener("framereceived", on_framereceived)
web_socket.remove_listener("close", on_close)
web_socket.remove_listener("socketerror", on_socketerror)


def _node_started_predicate(request: Request) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import arrow
from playwright.sync_api import FrameLocator, Page, WebSocket, expect
from pydantic import TypeAdapter # pylint: disable=no-name-in-module
from pydantic import ByteSize

from .logging_tools import log_context
from .playwright import (
Expand All @@ -17,7 +19,7 @@
wait_for_service_running,
)

_S4L_STREAMING_ESTABLISHMENT_MAX_TIME: Final[int] = 15 * SECOND
_S4L_STREAMING_ESTABLISHMENT_MAX_TIME: Final[int] = 30 * SECOND
_S4L_SOCKETIO_REGEX: Final[re.Pattern] = re.compile(
r"^(?P<protocol>[^:]+)://(?P<node_id>[^\.]+)\.services\.(?P<hostname>[^\/]+)\/socket\.io\/.+$"
)
Expand Down Expand Up @@ -63,7 +65,7 @@ def __call__(self, message: str) -> bool:
self._initial_bit_rate_time = arrow.utcnow().datetime
self.logger.info(
"%s",
f"{self._initial_bit_rate=} at {self._initial_bit_rate_time.isoformat()}",
f"{TypeAdapter(ByteSize).validate_python(self._initial_bit_rate).human_readable()}/s at {self._initial_bit_rate_time.isoformat()}",
)
return False

Expand All @@ -78,7 +80,7 @@ def __call__(self, message: str) -> bool:
bitrate_test = bool(self._initial_bit_rate != current_bitrate)
self.logger.info(
"%s",
f"{current_bitrate=} after {elapsed_time=}: {'good!' if bitrate_test else 'failed! bitrate did not change! TIP: talk with MaG about underwater cables!'}",
f"{TypeAdapter(ByteSize).validate_python(current_bitrate).human_readable()}/s after {elapsed_time=}: {'good!' if bitrate_test else 'failed! bitrate did not change! TIP: talk with MaG about underwater cables!'}",
)
return bitrate_test

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
@router.get("/", include_in_schema=True, response_class=PlainTextResponse)
async def health_check():
# NOTE: sync url in docker/healthcheck.py with this entrypoint!
return f"{__name__}.health_check@{datetime.datetime.now(datetime.timezone.utc).isoformat()}"
return f"{__name__}.health_check@{datetime.datetime.now(datetime.UTC).isoformat()}"


class _ComponentStatus(BaseModel):
Expand All @@ -33,25 +33,34 @@ class _StatusGet(BaseModel):
rabbitmq: _ComponentStatus
ec2: _ComponentStatus
redis_client_sdk: _ComponentStatus
ssm: _ComponentStatus


@router.get("/status", include_in_schema=True, response_model=_StatusGet)
async def get_status(app: Annotated[FastAPI, Depends(get_app)]) -> _StatusGet:
return _StatusGet(
rabbitmq=_ComponentStatus(
is_enabled=is_rabbitmq_enabled(app),
is_responsive=await get_rabbitmq_client(app).ping()
if is_rabbitmq_enabled(app)
else False,
is_responsive=(
await get_rabbitmq_client(app).ping()
if is_rabbitmq_enabled(app)
else False
),
),
ec2=_ComponentStatus(
is_enabled=bool(app.state.ec2_client),
is_responsive=await app.state.ec2_client.ping()
if app.state.ec2_client
else False,
is_responsive=(
await app.state.ec2_client.ping() if app.state.ec2_client else False
),
),
redis_client_sdk=_ComponentStatus(
is_enabled=bool(app.state.redis_client_sdk),
is_responsive=await get_redis_client(app).ping(),
),
ssm=_ComponentStatus(
is_enabled=(app.state.ssm_client is not None),
is_responsive=(
await app.state.ssm_client.ping() if app.state.ssm_client else False
),
),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Final

from aws_library.ec2._models import AWSTagKey, AWSTagValue
from pydantic import parse_obj_as

DOCKER_STACK_DEPLOY_COMMAND_NAME: Final[str] = "private cluster docker deploy"
DOCKER_STACK_DEPLOY_COMMAND_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.clusters-keeper.private_cluster_docker_deploy"
)

USER_ID_TAG_KEY: Final[AWSTagKey] = parse_obj_as(AWSTagKey, "user_id")
WALLET_ID_TAG_KEY: Final[AWSTagKey] = parse_obj_as(AWSTagKey, "wallet_id")
ROLE_TAG_KEY: Final[AWSTagKey] = parse_obj_as(AWSTagKey, "role")
WORKER_ROLE_TAG_VALUE: Final[AWSTagValue] = parse_obj_as(AWSTagValue, "worker")
MANAGER_ROLE_TAG_VALUE: Final[AWSTagValue] = parse_obj_as(AWSTagValue, "manager")
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ..modules.ec2 import setup as setup_ec2
from ..modules.rabbitmq import setup as setup_rabbitmq
from ..modules.redis import setup as setup_redis
from ..modules.ssm import setup as setup_ssm
from ..rpc.rpc_routes import setup_rpc_routes
from .settings import ApplicationSettings

Expand Down Expand Up @@ -55,6 +56,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
setup_rabbitmq(app)
setup_rpc_routes(app)
setup_ec2(app)
setup_ssm(app)
setup_redis(app)
setup_clusters_management(app)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from settings_library.ec2 import EC2Settings
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings
from settings_library.ssm import SSMSettings
from settings_library.tracing import TracingSettings
from settings_library.utils_logging import MixinLoggingSettings
from types_aiobotocore_ec2.literals import InstanceTypeType
Expand All @@ -50,6 +51,21 @@ class Config(EC2Settings.Config):
}


class ClustersKeeperSSMSettings(SSMSettings):
class Config(SSMSettings.Config):
env_prefix = CLUSTERS_KEEPER_ENV_PREFIX

schema_extra: ClassVar[dict[str, Any]] = { # type: ignore[misc]
"examples": [
{
f"{CLUSTERS_KEEPER_ENV_PREFIX}{key}": var
for key, var in example.items()
}
for example in SSMSettings.Config.schema_extra["examples"]
],
}


class WorkersEC2InstancesSettings(BaseCustomSettings):
WORKERS_EC2_INSTANCES_ALLOWED_TYPES: dict[str, EC2InstanceBootSpecific] = Field(
...,
Expand Down Expand Up @@ -183,6 +199,12 @@ class PrimaryEC2InstancesSettings(BaseCustomSettings):
"that take longer than this time will be terminated as sometimes it happens that EC2 machine fail on start.",
)

PRIMARY_EC2_INSTANCES_DOCKER_DEFAULT_ADDRESS_POOL: str = Field(
default="172.20.0.0/14",
description="defines the docker swarm default address pool in CIDR format "
"(see https://docs.docker.com/reference/cli/docker/swarm/init/)",
)

@validator("PRIMARY_EC2_INSTANCES_ALLOWED_TYPES")
@classmethod
def check_valid_instance_names(
Expand Down Expand Up @@ -250,6 +272,10 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
auto_default_from_env=True
)

CLUSTERS_KEEPER_SSM_ACCESS: ClustersKeeperSSMSettings | None = Field(
auto_default_from_env=True
)

CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES: PrimaryEC2InstancesSettings | None = Field(
auto_default_from_env=True
)
Expand Down Expand Up @@ -285,9 +311,11 @@ class ApplicationSettings(BaseCustomSettings, MixinLoggingSettings):
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
)

CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION: NonNegativeInt = Field(
default=5,
description="Max number of missed heartbeats before a cluster is terminated",
CLUSTERS_KEEPER_MAX_MISSED_HEARTBEATS_BEFORE_CLUSTER_TERMINATION: NonNegativeInt = (
Field(
default=5,
description="Max number of missed heartbeats before a cluster is terminated",
)
)

CLUSTERS_KEEPER_COMPUTATIONAL_BACKEND_DOCKER_IMAGE_TAG: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def _get_primary_ec2_params(
ec2_instance_types: list[
EC2InstanceType
] = await ec2_client.get_ec2_instance_capabilities(
instance_type_names=[ec2_type_name]
instance_type_names={ec2_type_name}
)
assert ec2_instance_types # nosec
assert len(ec2_instance_types) == 1 # nosec
Expand All @@ -72,15 +72,7 @@ async def create_cluster(
tags=creation_ec2_tags(app_settings, user_id=user_id, wallet_id=wallet_id),
startup_script=create_startup_script(
app_settings,
cluster_machines_name_prefix=get_cluster_name(
app_settings, user_id=user_id, wallet_id=wallet_id, is_manager=False
),
ec2_boot_specific=ec2_instance_boot_specs,
additional_custom_tags={
AWSTagKey("user_id"): AWSTagValue(f"{user_id}"),
AWSTagKey("wallet_id"): AWSTagValue(f"{wallet_id}"),
AWSTagKey("role"): AWSTagValue("worker"),
},
),
ami_id=ec2_instance_boot_specs.ami_id,
key_name=app_settings.CLUSTERS_KEEPER_PRIMARY_EC2_INSTANCES.PRIMARY_EC2_INSTANCES_KEY_NAME,
Expand Down
Loading

0 comments on commit 13f1f9d

Please sign in to comment.