Skip to content

Commit

Permalink
Merge pull request #6780 from sanderegg/update-master-XXXXX
Browse files Browse the repository at this point in the history
⬆️Pydantic V2L Update from upstream/master
  • Loading branch information
sanderegg authored Nov 20, 2024
2 parents c38b064 + 150b17a commit 17d1702
Show file tree
Hide file tree
Showing 45 changed files with 972 additions and 386 deletions.
1 change: 1 addition & 0 deletions ci/github/system-testing/e2e-playwright.bash
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ test() {
source .venv/bin/activate
pushd tests/e2e-playwright
make test-sleepers
make test-platform
popd
}

Expand Down
152 changes: 111 additions & 41 deletions packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@
import json
import logging
import re
import typing
from collections import defaultdict
from collections.abc import Generator, Iterator
from collections.abc import Generator
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import Enum, unique
from typing import Any, Final

import httpx
from playwright._impl._sync_base import EventContextManager
from playwright.sync_api import FrameLocator, Page, Request
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import WebSocket
from pydantic import AnyUrl
from pytest_simcore.helpers.logging_tools import log_context

from .logging_tools import log_context

_logger = logging.getLogger(__name__)


SECOND: Final[int] = 1000
MINUTE: Final[int] = 60 * SECOND
Expand Down Expand Up @@ -106,6 +112,94 @@ class SocketIOEvent:
SOCKETIO_MESSAGE_PREFIX: Final[str] = "42"


@dataclass
class RestartableWebSocket:
page: Page
ws: WebSocket
_registered_events: list[tuple[str, typing.Callable | None]] = field(
default_factory=list
)
_number_of_restarts: int = 0

def __post_init__(self):
self._configure_websocket_events()

def _configure_websocket_events(self):
try:
with log_context(
logging.DEBUG,
msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)",
) as ctx:

def on_framesent(payload: str | bytes) -> None:
ctx.logger.debug("⬇️ Frame sent: %s", payload)

def on_framereceived(payload: str | bytes) -> None:
ctx.logger.debug("⬆️ Frame received: %s", payload)

def on_close(_: WebSocket) -> None:
ctx.logger.warning(
"⚠️ WebSocket closed. Attempting to reconnect..."
)
self._attempt_reconnect(ctx.logger)

def on_socketerror(error_msg: str) -> None:
ctx.logger.error("❌ WebSocket error: %s", error_msg)

# Attach core event listeners
self.ws.on("framesent", on_framesent)
self.ws.on("framereceived", on_framereceived)
self.ws.on("close", on_close)
self.ws.on("socketerror", on_socketerror)

finally:
# Detach core event listeners
self.ws.remove_listener("framesent", on_framesent)
self.ws.remove_listener("framereceived", on_framereceived)
self.ws.remove_listener("close", on_close)
self.ws.remove_listener("socketerror", on_socketerror)

def _attempt_reconnect(self, logger: logging.Logger) -> None:
"""
Attempt to reconnect the WebSocket and restore event listeners.
"""
try:
with self.page.expect_websocket() as ws_info:
assert not ws_info.value.is_closed()

self.ws = ws_info.value
self._number_of_restarts += 1
logger.info(
"🔄 Reconnected to WebSocket successfully. Number of reconnections: %s",
self._number_of_restarts,
)
self._configure_websocket_events()
# Re-register all custom event listeners
for event, predicate in self._registered_events:
self.ws.expect_event(event, predicate)

except Exception as e: # pylint: disable=broad-except
logger.error("🚨 Failed to reconnect WebSocket: %s", e)

def expect_event(
self,
event: str,
predicate: typing.Callable | None = None,
*,
timeout: float | None = None,
) -> EventContextManager:
"""
Register an event listener with support for reconnection.
"""
output = self.ws.expect_event(event, predicate, timeout=timeout)
self._registered_events.append((event, predicate))
return output

@classmethod
def create(cls, page: Page, ws: WebSocket):
return cls(page, ws)


def decode_socketio_42_message(message: str) -> SocketIOEvent:
data = json.loads(message.removeprefix(SOCKETIO_MESSAGE_PREFIX))
return SocketIOEvent(name=data[0], obj=data[1])
Expand Down Expand Up @@ -245,9 +339,14 @@ def __call__(self, message: str) -> bool:
url = f"https://{self.node_id}.services.{self.get_partial_product_url()}"
response = httpx.get(url, timeout=10)
self.logger.info(
"Querying the service endpoint from the E2E test. Url: %s Response: %s",
"Querying the service endpoint from the E2E test. Url: %s Response: %s TIP: %s",
url,
response,
(
"Response 401 is OK. It means that service is ready."
if response.status_code == 401
else "We are emulating the frontend; a 500 response is acceptable if the service is not yet ready."
),
)
if response.status_code <= 401:
# NOTE: If the response status is less than 400, it means that the backend is ready (There are some services that respond with a 3XX)
Expand Down Expand Up @@ -278,7 +377,7 @@ def get_partial_product_url(self):
def wait_for_pipeline_state(
current_state: RunningState,
*,
websocket: WebSocket,
websocket: RestartableWebSocket,
if_in_states: tuple[RunningState, ...],
expected_states: tuple[RunningState, ...],
timeout_ms: int,
Expand All @@ -301,39 +400,6 @@ def wait_for_pipeline_state(
return current_state


@contextlib.contextmanager
def web_socket_default_log_handler(web_socket: WebSocket) -> Iterator[None]:

try:
with log_context(
logging.DEBUG,
msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)",
) as ctx:

def on_framesent(payload: str | bytes) -> None:
ctx.logger.debug("⬇️ Frame sent: %s", payload)

def on_framereceived(payload: str | bytes) -> None:
ctx.logger.debug("⬆️ Frame received: %s", payload)

def on_close(payload: WebSocket) -> None:
ctx.logger.warning("⚠️ Websocket closed: %s", payload)

def on_socketerror(error_msg: str) -> None:
ctx.logger.error("❌ Websocket error: %s", error_msg)

web_socket.on("framesent", on_framesent)
web_socket.on("framereceived", on_framereceived)
web_socket.on("close", on_close)
web_socket.on("socketerror", on_socketerror)
yield
finally:
web_socket.remove_listener("framesent", on_framesent)
web_socket.remove_listener("framereceived", on_framereceived)
web_socket.remove_listener("close", on_close)
web_socket.remove_listener("socketerror", on_socketerror)


def _node_started_predicate(request: Request) -> bool:
return bool(
re.search(NODE_START_REQUEST_PATTERN, request.url)
Expand All @@ -358,12 +424,14 @@ def expected_service_running(
*,
page: Page,
node_id: str,
websocket: WebSocket,
websocket: RestartableWebSocket,
timeout: int,
press_start_button: bool,
product_url: AnyUrl,
) -> Generator[ServiceRunning, None, None]:
with log_context(logging.INFO, msg="Waiting for node to run") as ctx:
with log_context(
logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}"
) as ctx:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id, logger=ctx.logger, product_url=product_url
)
Expand Down Expand Up @@ -395,15 +463,17 @@ def wait_for_service_running(
*,
page: Page,
node_id: str,
websocket: WebSocket,
websocket: RestartableWebSocket,
timeout: int,
press_start_button: bool,
product_url: AnyUrl,
) -> FrameLocator:
"""NOTE: if the service was already started this will not work as some of the required websocket events will not be emitted again
In which case this will need further adjutment"""

with log_context(logging.INFO, msg="Waiting for node to run") as ctx:
with log_context(
logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}"
) as ctx:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id, logger=ctx.logger, product_url=product_url
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
MINUTE,
SECOND,
SOCKETIO_MESSAGE_PREFIX,
RestartableWebSocket,
SocketIOEvent,
decode_socketio_42_message,
wait_for_service_running,
Expand Down Expand Up @@ -100,7 +101,7 @@ class WaitForS4LDict(TypedDict):
def wait_for_launched_s4l(
page: Page,
node_id,
log_in_and_out: WebSocket,
log_in_and_out: RestartableWebSocket,
*,
autoscaled: bool,
copy_workspace: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from fastapi import FastAPI
from models_library.basic_types import LogLevel, PortInt, VersionTag
from pydantic import AliasChoices, Field, NonNegativeInt, field_validator
from pydantic import Field, NonNegativeInt, PositiveInt, field_validator
from servicelib.logging_utils_filtering import LoggerName, MessageSubstring
from settings_library.application import BaseApplicationSettings
from settings_library.docker_registry import RegistrySettings
Expand Down Expand Up @@ -112,6 +112,9 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings):
),
)

DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS: PositiveInt = 20
DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS: PositiveInt = 30

@field_validator("DIRECTOR_GENERIC_RESOURCE_PLACEMENT_CONSTRAINTS_SUBSTITUTIONS")
@classmethod
def _validate_substitutions(cls, v):
Expand Down
17 changes: 9 additions & 8 deletions services/director/src/simcore_service_director/registry_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from collections.abc import Mapping
from http import HTTPStatus
from pprint import pformat
from typing import Any, Final, cast
from typing import Any, cast

from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped]
from aiohttp import BasicAuth, ClientSession, client_exceptions
Expand Down Expand Up @@ -33,9 +33,6 @@

DEPENDENCIES_LABEL_KEY: str = "simcore.service.dependencies"

NUMBER_OF_RETRIEVED_REPOS: int = 50
NUMBER_OF_RETRIEVED_TAGS: int = 50
_MAX_CONCURRENT_CALLS: Final[int] = 50
VERSION_REG = re.compile(
r"^(0|[1-9]\d*)(\.(0|[1-9]\d*)){2}(-(0|[1-9]\d*|\d*[-a-zA-Z][-\da-zA-Z]*)(\.(0|[1-9]\d*|\d*[-a-zA-Z][-\da-zA-Z]*))*)?(\+[-\da-zA-Z]+(\.[-\da-zA-Z-]+)*)?$"
)
Expand Down Expand Up @@ -277,7 +274,7 @@ async def on_shutdown() -> None:
async def _list_repositories(app: FastAPI) -> list[str]:
logger.debug("listing repositories")
# if there are more repos, the Link will be available in the response headers until none available
path = f"/v2/_catalog?n={NUMBER_OF_RETRIEVED_REPOS}"
path = f"/v2/_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
repos_list: list = []
while True:
result, headers = await registry_request(app, path)
Expand All @@ -294,7 +291,7 @@ async def list_image_tags(app: FastAPI, image_key: str) -> list[str]:
logger.debug("listing image tags in %s", image_key)
image_tags: list = []
# get list of image tags
path = f"/v2/{image_key}/tags/list?n={NUMBER_OF_RETRIEVED_TAGS}"
path = f"/v2/{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
while True:
tags, headers = await registry_request(app, path)
if tags["tags"]:
Expand Down Expand Up @@ -385,7 +382,9 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]
*[get_image_details(app, image_key, tag) for tag in image_tags],
reraise=False,
log=logger,
limit=_MAX_CONCURRENT_CALLS,
limit=get_application_settings(
app
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
)
return [result for result in results if not isinstance(result, BaseException)]

Expand All @@ -407,7 +406,9 @@ async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]:
*[get_repo_details(app, repo) for repo in repos],
reraise=False,
log=logger,
limit=_MAX_CONCURRENT_CALLS,
limit=get_application_settings(
app
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
)

return [
Expand Down
8 changes: 7 additions & 1 deletion services/opentelemetry-collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ service:
traces:
receivers: [otlp]
exporters: [otlphttp]
processors: [batch,probabilistic_sampler]
processors: [batch,probabilistic_sampler,filter/drop_healthcheck]
processors:
batch:
timeout: 5s
send_batch_size: ${TRACING_OPENTELEMETRY_COLLECTOR_BATCH_SIZE}
probabilistic_sampler:
sampling_percentage: ${TRACING_OPENTELEMETRY_COLLECTOR_SAMPLING_PERCENTAGE}
filter/drop_healthcheck:
error_mode: ignore
traces:
span:
- attributes["http.route"] == "healthcheck_readiness_probe"
- attributes["db.statement"] == "PING" and attributes["db.system"] == "redis"
Loading

0 comments on commit 17d1702

Please sign in to comment.