diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py b/packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py index f063b6efd61..02b8c457385 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py @@ -2,19 +2,25 @@ import json import logging import re +import typing from collections import defaultdict -from collections.abc import Generator, Iterator +from collections.abc import Generator from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from enum import Enum, unique from typing import Any, Final import httpx +from playwright._impl._sync_base import EventContextManager from playwright.sync_api import FrameLocator, Page, Request from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import WebSocket from pydantic import AnyUrl -from pytest_simcore.helpers.logging_tools import log_context + +from .logging_tools import log_context + +_logger = logging.getLogger(__name__) + SECOND: Final[int] = 1000 MINUTE: Final[int] = 60 * SECOND @@ -106,6 +112,94 @@ class SocketIOEvent: SOCKETIO_MESSAGE_PREFIX: Final[str] = "42" +@dataclass +class RestartableWebSocket: + page: Page + ws: WebSocket + _registered_events: list[tuple[str, typing.Callable | None]] = field( + default_factory=list + ) + _number_of_restarts: int = 0 + + def __post_init__(self): + self._configure_websocket_events() + + def _configure_websocket_events(self): + try: + with log_context( + logging.DEBUG, + msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)", + ) as ctx: + + def on_framesent(payload: str | bytes) -> None: + ctx.logger.debug("⬇️ Frame sent: %s", payload) + + def on_framereceived(payload: str | bytes) -> None: + ctx.logger.debug("⬆️ Frame received: %s", payload) + + def on_close(_: WebSocket) -> None: + ctx.logger.warning( + "⚠️ WebSocket closed. Attempting to reconnect..." + ) + self._attempt_reconnect(ctx.logger) + + def on_socketerror(error_msg: str) -> None: + ctx.logger.error("❌ WebSocket error: %s", error_msg) + + # Attach core event listeners + self.ws.on("framesent", on_framesent) + self.ws.on("framereceived", on_framereceived) + self.ws.on("close", on_close) + self.ws.on("socketerror", on_socketerror) + + finally: + # Detach core event listeners + self.ws.remove_listener("framesent", on_framesent) + self.ws.remove_listener("framereceived", on_framereceived) + self.ws.remove_listener("close", on_close) + self.ws.remove_listener("socketerror", on_socketerror) + + def _attempt_reconnect(self, logger: logging.Logger) -> None: + """ + Attempt to reconnect the WebSocket and restore event listeners. + """ + try: + with self.page.expect_websocket() as ws_info: + assert not ws_info.value.is_closed() + + self.ws = ws_info.value + self._number_of_restarts += 1 + logger.info( + "🔄 Reconnected to WebSocket successfully. Number of reconnections: %s", + self._number_of_restarts, + ) + self._configure_websocket_events() + # Re-register all custom event listeners + for event, predicate in self._registered_events: + self.ws.expect_event(event, predicate) + + except Exception as e: # pylint: disable=broad-except + logger.error("🚨 Failed to reconnect WebSocket: %s", e) + + def expect_event( + self, + event: str, + predicate: typing.Callable | None = None, + *, + timeout: float | None = None, + ) -> EventContextManager: + """ + Register an event listener with support for reconnection. + """ + output = self.ws.expect_event(event, predicate, timeout=timeout) + self._registered_events.append((event, predicate)) + return output + + @classmethod + def create(cls, page: Page, ws: WebSocket): + return cls(page, ws) + + def decode_socketio_42_message(message: str) -> SocketIOEvent: data = json.loads(message.removeprefix(SOCKETIO_MESSAGE_PREFIX)) return SocketIOEvent(name=data[0], obj=data[1]) @@ -278,7 +372,7 @@ def get_partial_product_url(self): def wait_for_pipeline_state( current_state: RunningState, *, - websocket: WebSocket, + websocket: RestartableWebSocket, if_in_states: tuple[RunningState, ...], expected_states: tuple[RunningState, ...], timeout_ms: int, @@ -301,39 +395,6 @@ def wait_for_pipeline_state( return current_state -@contextlib.contextmanager -def web_socket_default_log_handler(web_socket: WebSocket) -> Iterator[None]: - - try: - with log_context( - logging.DEBUG, - msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)", - ) as ctx: - - def on_framesent(payload: str | bytes) -> None: - ctx.logger.debug("⬇️ Frame sent: %s", payload) - - def on_framereceived(payload: str | bytes) -> None: - ctx.logger.debug("⬆️ Frame received: %s", payload) - - def on_close(payload: WebSocket) -> None: - ctx.logger.warning("⚠️ Websocket closed: %s", payload) - - def on_socketerror(error_msg: str) -> None: - ctx.logger.error("❌ Websocket error: %s", error_msg) - - web_socket.on("framesent", on_framesent) - web_socket.on("framereceived", on_framereceived) - web_socket.on("close", on_close) - web_socket.on("socketerror", on_socketerror) - yield - finally: - web_socket.remove_listener("framesent", on_framesent) - web_socket.remove_listener("framereceived", on_framereceived) - web_socket.remove_listener("close", on_close) - web_socket.remove_listener("socketerror", on_socketerror) - - def _node_started_predicate(request: Request) -> bool: return bool( re.search(NODE_START_REQUEST_PATTERN, request.url) @@ -358,12 +419,14 @@ def expected_service_running( *, page: Page, node_id: str, - websocket: WebSocket, + websocket: RestartableWebSocket, timeout: int, press_start_button: bool, product_url: AnyUrl, ) -> Generator[ServiceRunning, None, None]: - with log_context(logging.INFO, msg="Waiting for node to run") as ctx: + with log_context( + logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}" + ) as ctx: waiter = SocketIONodeProgressCompleteWaiter( node_id=node_id, logger=ctx.logger, product_url=product_url ) @@ -395,7 +458,7 @@ def wait_for_service_running( *, page: Page, node_id: str, - websocket: WebSocket, + websocket: RestartableWebSocket, timeout: int, press_start_button: bool, product_url: AnyUrl, @@ -403,7 +466,9 @@ def wait_for_service_running( """NOTE: if the service was already started this will not work as some of the required websocket events will not be emitted again In which case this will need further adjutment""" - with log_context(logging.INFO, msg="Waiting for node to run") as ctx: + with log_context( + logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}" + ) as ctx: waiter = SocketIONodeProgressCompleteWaiter( node_id=node_id, logger=ctx.logger, product_url=product_url ) diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py b/packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py index c59718f4aff..8c5b74d032b 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/playwright_sim4life.py @@ -13,6 +13,7 @@ MINUTE, SECOND, SOCKETIO_MESSAGE_PREFIX, + RestartableWebSocket, SocketIOEvent, decode_socketio_42_message, wait_for_service_running, @@ -100,7 +101,7 @@ class WaitForS4LDict(TypedDict): def wait_for_launched_s4l( page: Page, node_id, - log_in_and_out: WebSocket, + log_in_and_out: RestartableWebSocket, *, autoscaled: bool, copy_workspace: bool, diff --git a/tests/e2e-playwright/tests/conftest.py b/tests/e2e-playwright/tests/conftest.py index 6fd15e8218c..551dc1ad266 100644 --- a/tests/e2e-playwright/tests/conftest.py +++ b/tests/e2e-playwright/tests/conftest.py @@ -20,7 +20,7 @@ import arrow import pytest from faker import Faker -from playwright.sync_api import APIRequestContext, BrowserContext, Page, WebSocket +from playwright.sync_api import APIRequestContext, BrowserContext, Page from playwright.sync_api._generated import Playwright from pydantic import AnyUrl, TypeAdapter from pytest_simcore.helpers.faker_factories import DEFAULT_TEST_PASSWORD @@ -29,13 +29,13 @@ MINUTE, SECOND, AutoRegisteredUser, + RestartableWebSocket, RunningState, ServiceType, SocketIOEvent, SocketIOProjectClosedWaiter, SocketIOProjectStateUpdatedWaiter, decode_socketio_42_message, - web_socket_default_log_handler, ) from pytest_simcore.helpers.pydantic_extension import Secret4TestsStr @@ -331,7 +331,7 @@ def log_in_and_out( user_password: Secret4TestsStr, auto_register: bool, register: Callable[[], AutoRegisteredUser], -) -> Iterator[WebSocket]: +) -> Iterator[RestartableWebSocket]: with log_context( logging.INFO, f"Open {product_url=} using {user_name=}/{user_password=}/{auto_register=}", @@ -374,8 +374,8 @@ def log_in_and_out( page.get_by_test_id("loginSubmitBtn").click() assert response_info.value.ok, f"{response_info.value.json()}" - ws = ws_info.value - assert not ws.is_closed() + assert not ws_info.value.is_closed() + restartable_wb = RestartableWebSocket.create(page, ws_info.value) # Welcome to Sim4Life page.wait_for_timeout(5000) @@ -389,8 +389,8 @@ def log_in_and_out( if quickStartWindowCloseBtnLocator.is_visible(): quickStartWindowCloseBtnLocator.click() - with web_socket_default_log_handler(ws): - yield ws + # with web_socket_default_log_handler(ws): + yield restartable_wb with log_context( logging.INFO, @@ -408,7 +408,7 @@ def log_in_and_out( @pytest.fixture def create_new_project_and_delete( page: Page, - log_in_and_out: WebSocket, + log_in_and_out: RestartableWebSocket, is_product_billable: bool, api_request_context: APIRequestContext, product_url: AnyUrl, @@ -660,7 +660,7 @@ def _( def start_and_stop_pipeline( product_url: AnyUrl, page: Page, - log_in_and_out: WebSocket, + log_in_and_out: RestartableWebSocket, api_request_context: APIRequestContext, ) -> Iterator[Callable[[], SocketIOEvent]]: started_pipeline_ids = [] diff --git a/tests/e2e-playwright/tests/jupyterlabs/test_jupyterlab.py b/tests/e2e-playwright/tests/jupyterlabs/test_jupyterlab.py index 6afc2bb1f13..2451f1e3648 100644 --- a/tests/e2e-playwright/tests/jupyterlabs/test_jupyterlab.py +++ b/tests/e2e-playwright/tests/jupyterlabs/test_jupyterlab.py @@ -16,7 +16,12 @@ from playwright.sync_api import Page, WebSocket from pydantic import ByteSize from pytest_simcore.helpers.logging_tools import log_context -from pytest_simcore.helpers.playwright import MINUTE, SECOND, ServiceType +from pytest_simcore.helpers.playwright import ( + MINUTE, + SECOND, + RestartableWebSocket, + ServiceType, +) _WAITING_FOR_SERVICE_TO_START: Final[int] = ( 10 * MINUTE @@ -110,8 +115,11 @@ def test_jupyterlab( iframe.get_by_role("button", name="New Launcher").click() with page.expect_websocket(_JLabWaitForTerminalWebSocket()) as ws_info: iframe.get_by_label("Launcher").get_by_text("Terminal").click() - terminal_web_socket = ws_info.value - assert not terminal_web_socket.is_closed() + + assert not ws_info.value.is_closed() + restartable_terminal_web_socket = RestartableWebSocket.create( + page, ws_info.value + ) terminal = iframe.locator( "#jp-Terminal-0 > div > div.xterm-screen" @@ -122,7 +130,7 @@ def test_jupyterlab( terminal.press("Enter") # NOTE: this call creates a large file with random blocks inside blocks_count = int(large_file_size / large_file_block_size) - with terminal_web_socket.expect_event( + with restartable_terminal_web_socket.expect_event( "framereceived", _JLabTerminalWebSocketWaiter( expected_message_type="stdout", expected_message_contents="copied" diff --git a/tests/e2e-playwright/tests/sim4life/test_sim4life.py b/tests/e2e-playwright/tests/sim4life/test_sim4life.py index 924e6efa535..28b6ecf56a7 100644 --- a/tests/e2e-playwright/tests/sim4life/test_sim4life.py +++ b/tests/e2e-playwright/tests/sim4life/test_sim4life.py @@ -10,12 +10,9 @@ from collections.abc import Callable from typing import Any -from playwright.sync_api import Page, WebSocket +from playwright.sync_api import Page from pydantic import AnyUrl -from pytest_simcore.helpers.playwright import ( - ServiceType, - web_socket_default_log_handler, -) +from pytest_simcore.helpers.playwright import RestartableWebSocket, ServiceType from pytest_simcore.helpers.playwright_sim4life import ( check_video_streaming, interact_with_s4l, @@ -29,7 +26,7 @@ def test_sim4life( [ServiceType, str, str | None], dict[str, Any] ], create_project_from_new_button: Callable[[str], dict[str, Any]], - log_in_and_out: WebSocket, + log_in_and_out: RestartableWebSocket, service_key: str, use_plus_button: bool, is_autoscaled: bool, @@ -59,9 +56,8 @@ def test_sim4life( product_url=product_url, ) s4l_websocket = resp["websocket"] - with web_socket_default_log_handler(s4l_websocket): - s4l_iframe = resp["iframe"] - interact_with_s4l(page, s4l_iframe) + s4l_iframe = resp["iframe"] + interact_with_s4l(page, s4l_iframe) - if check_videostreaming: - check_video_streaming(page, s4l_iframe, s4l_websocket) + if check_videostreaming: + check_video_streaming(page, s4l_iframe, s4l_websocket) diff --git a/tests/e2e-playwright/tests/sim4life/test_template.py b/tests/e2e-playwright/tests/sim4life/test_template.py index fb9b260c992..423437e04a4 100644 --- a/tests/e2e-playwright/tests/sim4life/test_template.py +++ b/tests/e2e-playwright/tests/sim4life/test_template.py @@ -10,8 +10,9 @@ from collections.abc import Callable from typing import Any -from playwright.sync_api import Page, WebSocket -from pytest_simcore.helpers.playwright import web_socket_default_log_handler +from playwright.sync_api import Page +from pydantic import AnyUrl +from pytest_simcore.helpers.playwright import RestartableWebSocket from pytest_simcore.helpers.playwright_sim4life import ( check_video_streaming, interact_with_s4l, @@ -22,10 +23,11 @@ def test_template( page: Page, create_project_from_template_dashboard: Callable[[str], dict[str, Any]], - log_in_and_out: WebSocket, + log_in_and_out: RestartableWebSocket, template_id: str, is_autoscaled: bool, check_videostreaming: bool, + product_url: AnyUrl, ): project_data = create_project_from_template_dashboard(template_id) @@ -37,12 +39,16 @@ def test_template( assert len(node_ids) == 1, "Expected 1 node in the workbench!" resp = wait_for_launched_s4l( - page, node_ids[0], log_in_and_out, autoscaled=is_autoscaled, copy_workspace=True + page, + node_ids[0], + log_in_and_out, + autoscaled=is_autoscaled, + copy_workspace=True, + product_url=product_url, ) s4l_websocket = resp["websocket"] - with web_socket_default_log_handler(s4l_websocket): - s4l_iframe = resp["iframe"] - interact_with_s4l(page, s4l_iframe) + s4l_iframe = resp["iframe"] + interact_with_s4l(page, s4l_iframe) - if check_videostreaming: - check_video_streaming(page, s4l_iframe, s4l_websocket) + if check_videostreaming: + check_video_streaming(page, s4l_iframe, s4l_websocket) diff --git a/tests/e2e-playwright/tests/sleepers/test_sleepers.py b/tests/e2e-playwright/tests/sleepers/test_sleepers.py index d12953d1ed0..4415fcf3e49 100644 --- a/tests/e2e-playwright/tests/sleepers/test_sleepers.py +++ b/tests/e2e-playwright/tests/sleepers/test_sleepers.py @@ -16,7 +16,7 @@ from packaging.version import Version from packaging.version import parse as parse_version -from playwright.sync_api import Page, WebSocket +from playwright.sync_api import Page from pytest_simcore.helpers.logging_tools import ( ContextMessages, log_context, @@ -24,6 +24,7 @@ ) from pytest_simcore.helpers.playwright import ( MINUTE, + RestartableWebSocket, RunningState, ServiceType, SocketIOEvent, @@ -78,7 +79,7 @@ def _get_file_names(page: Page) -> list[str]: def test_sleepers( page: Page, - log_in_and_out: WebSocket, + log_in_and_out: RestartableWebSocket, create_project_from_service_dashboard: Callable[ [ServiceType, str, str | None], dict[str, Any] ], diff --git a/tests/e2e-playwright/tests/tip/test_ti_plan.py b/tests/e2e-playwright/tests/tip/test_ti_plan.py index 56f028d197d..7709071e7ea 100644 --- a/tests/e2e-playwright/tests/tip/test_ti_plan.py +++ b/tests/e2e-playwright/tests/tip/test_ti_plan.py @@ -19,6 +19,7 @@ from pytest_simcore.helpers.playwright import ( MINUTE, SECOND, + RestartableWebSocket, app_mode_trigger_next_app, expected_service_running, wait_for_service_running, @@ -89,7 +90,7 @@ def __call__(self, message: str) -> bool: def test_classic_ti_plan( # noqa: PLR0915 page: Page, - log_in_and_out: WebSocket, + log_in_and_out: RestartableWebSocket, is_autoscaled: bool, is_product_lite: bool, create_tip_plan_from_dashboard: Callable[[str], dict[str, Any]], @@ -209,11 +210,12 @@ def test_classic_ti_plan( # noqa: PLR0915 ti_iframe = service_running.iframe_locator assert ti_iframe - jlab_websocket = ws_info.value + assert not ws_info.value.is_closed() + restartable_jlab_websocket = RestartableWebSocket.create(page, ws_info.value) with ( log_context(logging.INFO, "Run optimization"), - jlab_websocket.expect_event( + restartable_jlab_websocket.expect_event( "framereceived", _JLabWebSocketWaiter( expected_header_msg_type="stream", @@ -228,11 +230,18 @@ def test_classic_ti_plan( # noqa: PLR0915 ) with log_context(logging.INFO, "Create report"): - - ti_iframe.get_by_role("button", name="Load Analysis").click() - page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) - ti_iframe.get_by_role("button", name="Load").nth(1).click() - page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) + with log_context( + logging.INFO, + f"Click button - `Load Analysis` and wait for {_JLAB_REPORTING_MAX_TIME}", + ): + ti_iframe.get_by_role("button", name="Load Analysis").click() + page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) + with log_context( + logging.INFO, + f"Click button - `Load` and wait for {_JLAB_REPORTING_MAX_TIME}", + ): + ti_iframe.get_by_role("button", name="Load").nth(1).click() + page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) if is_product_lite: assert ( @@ -248,14 +257,34 @@ def test_classic_ti_plan( # noqa: PLR0915 ).is_enabled() else: - ti_iframe.get_by_role("button", name="Add to Report (0)").nth(0).click() - page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) - ti_iframe.get_by_role("button", name="Export to S4L").click() - page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) - ti_iframe.get_by_role("button", name="Add to Report (1)").nth(1).click() - page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) - ti_iframe.get_by_role("button", name="Export Report").click() - page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) + with log_context( + logging.INFO, + f"Click button - `Add to Report (0)` and wait for {_JLAB_REPORTING_MAX_TIME}", + ): + ti_iframe.get_by_role("button", name="Add to Report (0)").nth( + 0 + ).click() + page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) + with log_context( + logging.INFO, + f"Click button - `Export to S4L` and wait for {_JLAB_REPORTING_MAX_TIME}", + ): + ti_iframe.get_by_role("button", name="Export to S4L").click() + page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) + with log_context( + logging.INFO, + f"Click button - `Add to Report (1)` and wait for {_JLAB_REPORTING_MAX_TIME}", + ): + ti_iframe.get_by_role("button", name="Add to Report (1)").nth( + 1 + ).click() + page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) + with log_context( + logging.INFO, + f"Click button - `Export Report` and wait for {_JLAB_REPORTING_MAX_TIME}", + ): + ti_iframe.get_by_role("button", name="Export Report").click() + page.wait_for_timeout(_JLAB_REPORTING_MAX_TIME) with log_context(logging.INFO, "Check outputs"): if is_product_lite: