Skip to content

Commit

Permalink
Merge branch 'master' into 2024/fix/tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnicegyu11 authored Nov 19, 2024
2 parents c0d70b6 + 5cf0635 commit 0ae4170
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 92 deletions.
145 changes: 105 additions & 40 deletions packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@
import json
import logging
import re
import typing
from collections import defaultdict
from collections.abc import Generator, Iterator
from collections.abc import Generator
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from enum import Enum, unique
from typing import Any, Final

import httpx
from playwright._impl._sync_base import EventContextManager
from playwright.sync_api import FrameLocator, Page, Request
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import WebSocket
from pydantic import AnyUrl
from pytest_simcore.helpers.logging_tools import log_context

from .logging_tools import log_context

_logger = logging.getLogger(__name__)


SECOND: Final[int] = 1000
MINUTE: Final[int] = 60 * SECOND
Expand Down Expand Up @@ -106,6 +112,94 @@ class SocketIOEvent:
SOCKETIO_MESSAGE_PREFIX: Final[str] = "42"


@dataclass
class RestartableWebSocket:
page: Page
ws: WebSocket
_registered_events: list[tuple[str, typing.Callable | None]] = field(
default_factory=list
)
_number_of_restarts: int = 0

def __post_init__(self):
self._configure_websocket_events()

def _configure_websocket_events(self):
try:
with log_context(
logging.DEBUG,
msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)",
) as ctx:

def on_framesent(payload: str | bytes) -> None:
ctx.logger.debug("⬇️ Frame sent: %s", payload)

def on_framereceived(payload: str | bytes) -> None:
ctx.logger.debug("⬆️ Frame received: %s", payload)

def on_close(_: WebSocket) -> None:
ctx.logger.warning(
"⚠️ WebSocket closed. Attempting to reconnect..."
)
self._attempt_reconnect(ctx.logger)

def on_socketerror(error_msg: str) -> None:
ctx.logger.error("❌ WebSocket error: %s", error_msg)

# Attach core event listeners
self.ws.on("framesent", on_framesent)
self.ws.on("framereceived", on_framereceived)
self.ws.on("close", on_close)
self.ws.on("socketerror", on_socketerror)

finally:
# Detach core event listeners
self.ws.remove_listener("framesent", on_framesent)
self.ws.remove_listener("framereceived", on_framereceived)
self.ws.remove_listener("close", on_close)
self.ws.remove_listener("socketerror", on_socketerror)

def _attempt_reconnect(self, logger: logging.Logger) -> None:
"""
Attempt to reconnect the WebSocket and restore event listeners.
"""
try:
with self.page.expect_websocket() as ws_info:
assert not ws_info.value.is_closed()

self.ws = ws_info.value
self._number_of_restarts += 1
logger.info(
"🔄 Reconnected to WebSocket successfully. Number of reconnections: %s",
self._number_of_restarts,
)
self._configure_websocket_events()
# Re-register all custom event listeners
for event, predicate in self._registered_events:
self.ws.expect_event(event, predicate)

except Exception as e: # pylint: disable=broad-except
logger.error("🚨 Failed to reconnect WebSocket: %s", e)

def expect_event(
self,
event: str,
predicate: typing.Callable | None = None,
*,
timeout: float | None = None,
) -> EventContextManager:
"""
Register an event listener with support for reconnection.
"""
output = self.ws.expect_event(event, predicate, timeout=timeout)
self._registered_events.append((event, predicate))
return output

@classmethod
def create(cls, page: Page, ws: WebSocket):
return cls(page, ws)


def decode_socketio_42_message(message: str) -> SocketIOEvent:
data = json.loads(message.removeprefix(SOCKETIO_MESSAGE_PREFIX))
return SocketIOEvent(name=data[0], obj=data[1])
Expand Down Expand Up @@ -278,7 +372,7 @@ def get_partial_product_url(self):
def wait_for_pipeline_state(
current_state: RunningState,
*,
websocket: WebSocket,
websocket: RestartableWebSocket,
if_in_states: tuple[RunningState, ...],
expected_states: tuple[RunningState, ...],
timeout_ms: int,
Expand All @@ -301,39 +395,6 @@ def wait_for_pipeline_state(
return current_state


@contextlib.contextmanager
def web_socket_default_log_handler(web_socket: WebSocket) -> Iterator[None]:

try:
with log_context(
logging.DEBUG,
msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)",
) as ctx:

def on_framesent(payload: str | bytes) -> None:
ctx.logger.debug("⬇️ Frame sent: %s", payload)

def on_framereceived(payload: str | bytes) -> None:
ctx.logger.debug("⬆️ Frame received: %s", payload)

def on_close(payload: WebSocket) -> None:
ctx.logger.warning("⚠️ Websocket closed: %s", payload)

def on_socketerror(error_msg: str) -> None:
ctx.logger.error("❌ Websocket error: %s", error_msg)

web_socket.on("framesent", on_framesent)
web_socket.on("framereceived", on_framereceived)
web_socket.on("close", on_close)
web_socket.on("socketerror", on_socketerror)
yield
finally:
web_socket.remove_listener("framesent", on_framesent)
web_socket.remove_listener("framereceived", on_framereceived)
web_socket.remove_listener("close", on_close)
web_socket.remove_listener("socketerror", on_socketerror)


def _node_started_predicate(request: Request) -> bool:
return bool(
re.search(NODE_START_REQUEST_PATTERN, request.url)
Expand All @@ -358,12 +419,14 @@ def expected_service_running(
*,
page: Page,
node_id: str,
websocket: WebSocket,
websocket: RestartableWebSocket,
timeout: int,
press_start_button: bool,
product_url: AnyUrl,
) -> Generator[ServiceRunning, None, None]:
with log_context(logging.INFO, msg="Waiting for node to run") as ctx:
with log_context(
logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}"
) as ctx:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id, logger=ctx.logger, product_url=product_url
)
Expand Down Expand Up @@ -395,15 +458,17 @@ def wait_for_service_running(
*,
page: Page,
node_id: str,
websocket: WebSocket,
websocket: RestartableWebSocket,
timeout: int,
press_start_button: bool,
product_url: AnyUrl,
) -> FrameLocator:
"""NOTE: if the service was already started this will not work as some of the required websocket events will not be emitted again
In which case this will need further adjutment"""

with log_context(logging.INFO, msg="Waiting for node to run") as ctx:
with log_context(
logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}"
) as ctx:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id, logger=ctx.logger, product_url=product_url
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
MINUTE,
SECOND,
SOCKETIO_MESSAGE_PREFIX,
RestartableWebSocket,
SocketIOEvent,
decode_socketio_42_message,
wait_for_service_running,
Expand Down Expand Up @@ -100,7 +101,7 @@ class WaitForS4LDict(TypedDict):
def wait_for_launched_s4l(
page: Page,
node_id,
log_in_and_out: WebSocket,
log_in_and_out: RestartableWebSocket,
*,
autoscaled: bool,
copy_workspace: bool,
Expand Down
18 changes: 9 additions & 9 deletions tests/e2e-playwright/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import arrow
import pytest
from faker import Faker
from playwright.sync_api import APIRequestContext, BrowserContext, Page, WebSocket
from playwright.sync_api import APIRequestContext, BrowserContext, Page
from playwright.sync_api._generated import Playwright
from pydantic import AnyUrl, TypeAdapter
from pytest_simcore.helpers.faker_factories import DEFAULT_TEST_PASSWORD
Expand All @@ -29,13 +29,13 @@
MINUTE,
SECOND,
AutoRegisteredUser,
RestartableWebSocket,
RunningState,
ServiceType,
SocketIOEvent,
SocketIOProjectClosedWaiter,
SocketIOProjectStateUpdatedWaiter,
decode_socketio_42_message,
web_socket_default_log_handler,
)
from pytest_simcore.helpers.pydantic_extension import Secret4TestsStr

Expand Down Expand Up @@ -331,7 +331,7 @@ def log_in_and_out(
user_password: Secret4TestsStr,
auto_register: bool,
register: Callable[[], AutoRegisteredUser],
) -> Iterator[WebSocket]:
) -> Iterator[RestartableWebSocket]:
with log_context(
logging.INFO,
f"Open {product_url=} using {user_name=}/{user_password=}/{auto_register=}",
Expand Down Expand Up @@ -374,8 +374,8 @@ def log_in_and_out(
page.get_by_test_id("loginSubmitBtn").click()
assert response_info.value.ok, f"{response_info.value.json()}"

ws = ws_info.value
assert not ws.is_closed()
assert not ws_info.value.is_closed()
restartable_wb = RestartableWebSocket.create(page, ws_info.value)

# Welcome to Sim4Life
page.wait_for_timeout(5000)
Expand All @@ -389,8 +389,8 @@ def log_in_and_out(
if quickStartWindowCloseBtnLocator.is_visible():
quickStartWindowCloseBtnLocator.click()

with web_socket_default_log_handler(ws):
yield ws
# with web_socket_default_log_handler(ws):
yield restartable_wb

with log_context(
logging.INFO,
Expand All @@ -408,7 +408,7 @@ def log_in_and_out(
@pytest.fixture
def create_new_project_and_delete(
page: Page,
log_in_and_out: WebSocket,
log_in_and_out: RestartableWebSocket,
is_product_billable: bool,
api_request_context: APIRequestContext,
product_url: AnyUrl,
Expand Down Expand Up @@ -660,7 +660,7 @@ def _(
def start_and_stop_pipeline(
product_url: AnyUrl,
page: Page,
log_in_and_out: WebSocket,
log_in_and_out: RestartableWebSocket,
api_request_context: APIRequestContext,
) -> Iterator[Callable[[], SocketIOEvent]]:
started_pipeline_ids = []
Expand Down
16 changes: 12 additions & 4 deletions tests/e2e-playwright/tests/jupyterlabs/test_jupyterlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
from playwright.sync_api import Page, WebSocket
from pydantic import ByteSize
from pytest_simcore.helpers.logging_tools import log_context
from pytest_simcore.helpers.playwright import MINUTE, SECOND, ServiceType
from pytest_simcore.helpers.playwright import (
MINUTE,
SECOND,
RestartableWebSocket,
ServiceType,
)

_WAITING_FOR_SERVICE_TO_START: Final[int] = (
10 * MINUTE
Expand Down Expand Up @@ -110,8 +115,11 @@ def test_jupyterlab(
iframe.get_by_role("button", name="New Launcher").click()
with page.expect_websocket(_JLabWaitForTerminalWebSocket()) as ws_info:
iframe.get_by_label("Launcher").get_by_text("Terminal").click()
terminal_web_socket = ws_info.value
assert not terminal_web_socket.is_closed()

assert not ws_info.value.is_closed()
restartable_terminal_web_socket = RestartableWebSocket.create(
page, ws_info.value
)

terminal = iframe.locator(
"#jp-Terminal-0 > div > div.xterm-screen"
Expand All @@ -122,7 +130,7 @@ def test_jupyterlab(
terminal.press("Enter")
# NOTE: this call creates a large file with random blocks inside
blocks_count = int(large_file_size / large_file_block_size)
with terminal_web_socket.expect_event(
with restartable_terminal_web_socket.expect_event(
"framereceived",
_JLabTerminalWebSocketWaiter(
expected_message_type="stdout", expected_message_contents="copied"
Expand Down
18 changes: 7 additions & 11 deletions tests/e2e-playwright/tests/sim4life/test_sim4life.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
from collections.abc import Callable
from typing import Any

from playwright.sync_api import Page, WebSocket
from playwright.sync_api import Page
from pydantic import AnyUrl
from pytest_simcore.helpers.playwright import (
ServiceType,
web_socket_default_log_handler,
)
from pytest_simcore.helpers.playwright import RestartableWebSocket, ServiceType
from pytest_simcore.helpers.playwright_sim4life import (
check_video_streaming,
interact_with_s4l,
Expand All @@ -29,7 +26,7 @@ def test_sim4life(
[ServiceType, str, str | None], dict[str, Any]
],
create_project_from_new_button: Callable[[str], dict[str, Any]],
log_in_and_out: WebSocket,
log_in_and_out: RestartableWebSocket,
service_key: str,
use_plus_button: bool,
is_autoscaled: bool,
Expand Down Expand Up @@ -59,9 +56,8 @@ def test_sim4life(
product_url=product_url,
)
s4l_websocket = resp["websocket"]
with web_socket_default_log_handler(s4l_websocket):
s4l_iframe = resp["iframe"]
interact_with_s4l(page, s4l_iframe)
s4l_iframe = resp["iframe"]
interact_with_s4l(page, s4l_iframe)

if check_videostreaming:
check_video_streaming(page, s4l_iframe, s4l_websocket)
if check_videostreaming:
check_video_streaming(page, s4l_iframe, s4l_websocket)
Loading

0 comments on commit 0ae4170

Please sign in to comment.