From bda5bc69d17f2e569ac57e78233cad5d7786fff4 Mon Sep 17 00:00:00 2001 From: Andrei Neagu <5694077+GitHK@users.noreply.github.com> Date: Fri, 15 Dec 2023 14:44:19 +0000 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20do=20not=20send=20heartbea?= =?UTF-8?q?t=20as=20soon=20as=20the=20service=20is=20started=20(#5168)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrei Neagu --- .../src/servicelib/background_task.py | 8 ++++- .../src/servicelib/decorators.py | 31 +++++++++++++---- .../service-library/tests/test_decorators.py | 34 +++++++++++++++---- ...modules_dynamic_sidecar_client_api_base.py | 16 ++++----- .../modules/resource_tracking/_core.py | 1 + .../unit/test_api_workflow_service_metrics.py | 3 ++ 6 files changed, 70 insertions(+), 23 deletions(-) diff --git a/packages/service-library/src/servicelib/background_task.py b/packages/service-library/src/servicelib/background_task.py index 4c9864e2698..843fb13bc6b 100644 --- a/packages/service-library/src/servicelib/background_task.py +++ b/packages/service-library/src/servicelib/background_task.py @@ -12,6 +12,8 @@ from tenacity.stop import stop_after_attempt from tenacity.wait import wait_fixed +from .decorators import async_delayed + logger = logging.getLogger(__name__) @@ -48,13 +50,17 @@ def start_periodic_task( *, interval: datetime.timedelta, task_name: str, + wait_before_running: datetime.timedelta = datetime.timedelta(0), **kwargs, ) -> asyncio.Task: with log_context( logger, logging.DEBUG, msg=f"create periodic background task '{task_name}'" ): + delayed_periodic_scheduled_task = async_delayed(wait_before_running)( + _periodic_scheduled_task + ) return asyncio.create_task( - _periodic_scheduled_task( + delayed_periodic_scheduled_task( task, interval=interval, task_name=task_name, diff --git a/packages/service-library/src/servicelib/decorators.py b/packages/service-library/src/servicelib/decorators.py index e51d1fb4a3b..825171145ff 100644 --- a/packages/service-library/src/servicelib/decorators.py +++ b/packages/service-library/src/servicelib/decorators.py @@ -4,32 +4,49 @@ I order to avoid cyclic dependences, please DO NOT IMPORT ANYTHING from . """ +import asyncio +import datetime import logging +from collections.abc import Callable, Coroutine from copy import deepcopy from functools import wraps +from typing import Any -log = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) -def safe_return(if_fails_return=False, catch=None, logger=None): +def safe_return(if_fails_return=False, catch=None, logger=None): # noqa: FBT002 # defaults if catch is None: catch = (RuntimeError,) if logger is None: - logger = log + logger = _logger def decorate(func): @wraps(func) - def safe_func(*args, **kargs): + def safe_func(*args, **kwargs): try: - res = func(*args, **kargs) - return res + return func(*args, **kwargs) except catch as err: logger.info("%s failed: %s", func.__name__, str(err)) except Exception: # pylint: disable=broad-except logger.info("%s failed unexpectedly", func.__name__, exc_info=True) - return deepcopy(if_fails_return) # avoid issues with default mutables + return deepcopy(if_fails_return) # avoid issues with default mutable return safe_func return decorate + + +def async_delayed( + interval: datetime.timedelta, +) -> Callable[..., Callable[..., Coroutine]]: + def decorator(func) -> Callable[..., Coroutine]: + @wraps(func) + async def wrapper(*args, **kwargs) -> Any: + await asyncio.sleep(interval.total_seconds()) + return await func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/packages/service-library/tests/test_decorators.py b/packages/service-library/tests/test_decorators.py index dcdf359af4a..af120e5dc4f 100644 --- a/packages/service-library/tests/test_decorators.py +++ b/packages/service-library/tests/test_decorators.py @@ -2,16 +2,18 @@ # pylint:disable=unused-argument # pylint:disable=redefined-outer-name -from servicelib.decorators import safe_return +from datetime import timedelta + +from servicelib.decorators import async_delayed, safe_return def test_safe_return_decorator(): - class MyException(Exception): + class AnError(Exception): pass - @safe_return(if_fails_return=False, catch=(MyException,), logger=None) + @safe_return(if_fails_return=False, catch=(AnError,), logger=None) def raise_my_exception(): - raise MyException() + raise AnError assert not raise_my_exception() @@ -19,9 +21,27 @@ def raise_my_exception(): def test_safe_return_mutables(): some_mutable_return = ["some", "defaults"] - @safe_return(if_fails_return=some_mutable_return) + @safe_return(if_fails_return=some_mutable_return) # type: ignore def return_mutable(): - raise RuntimeError("Runtime is default") + msg = "Runtime is default" + raise RuntimeError(msg) assert return_mutable() == some_mutable_return # contains the same - assert not (return_mutable() is some_mutable_return) # but is not the same + assert return_mutable() is not some_mutable_return # but is not the same + + +async def test_async_delayed(): + @async_delayed(timedelta(seconds=0.2)) + async def decorated_awaitable() -> int: + return 42 + + assert await decorated_awaitable() == 42 + + async def another_awaitable() -> int: + return 42 + + decorated_another_awaitable = async_delayed(timedelta(seconds=0.2))( + another_awaitable + ) + + assert await decorated_another_awaitable() == 42 diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py index 055560b1e38..daf696129b8 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py @@ -2,16 +2,15 @@ import pytest from httpx import ( - ConnectError, HTTPError, PoolTimeout, Request, RequestError, Response, + TransportError, codes, ) from pydantic import AnyHttpUrl, parse_obj_as -from pytest import LogCaptureFixture from respx import MockRouter from simcore_service_director_v2.modules.dynamic_sidecar.api_client._base import ( BaseThinClient, @@ -79,13 +78,13 @@ async def test_connection_error( await thick_client.get_provided_url(test_url) assert isinstance(exe_info.value, ClientHttpError) - assert isinstance(exe_info.value.error, ConnectError) + assert isinstance(exe_info.value.error, TransportError) async def test_retry_on_errors( request_timeout: int, test_url: AnyHttpUrl, - caplog_info_level: LogCaptureFixture, + caplog_info_level: pytest.LogCaptureFixture, ) -> None: client = FakeThickClient(request_timeout=request_timeout) @@ -95,10 +94,10 @@ async def test_retry_on_errors( _assert_messages(caplog_info_level.messages) -@pytest.mark.parametrize("error_class", [ConnectError, PoolTimeout]) +@pytest.mark.parametrize("error_class", [TransportError, PoolTimeout]) async def test_retry_on_errors_by_error_type( error_class: type[RequestError], - caplog_info_level: LogCaptureFixture, + caplog_info_level: pytest.LogCaptureFixture, request_timeout: int, test_url: AnyHttpUrl, ) -> None: @@ -107,7 +106,7 @@ class ATestClient(BaseThinClient): @retry_on_errors async def raises_request_error(self) -> Response: raise error_class( - "mock_connect_error", + "mock_connect_error", # noqa: EM101 request=Request(method="GET", url=test_url), ) @@ -134,7 +133,8 @@ class ATestClient(BaseThinClient): # pylint: disable=no-self-use @retry_on_errors async def raises_http_error(self) -> Response: - raise HTTPError("mock_http_error") + msg = "mock_http_error" + raise HTTPError(msg) client = ATestClient(request_timeout=request_timeout) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py index 90a226999bc..524f21c83e0 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/resource_tracking/_core.py @@ -49,6 +49,7 @@ async def _start_heart_beat_task(app: FastAPI) -> None: app=app, interval=resource_tracking_settings.RESOURCE_TRACKING_HEARTBEAT_INTERVAL, task_name="resource_tracking_heart_beat", + wait_before_running=resource_tracking_settings.RESOURCE_TRACKING_HEARTBEAT_INTERVAL, ) diff --git a/services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py b/services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py index 18b037e05d3..61dcedbf1d7 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py +++ b/services/dynamic-sidecar/tests/unit/test_api_workflow_service_metrics.py @@ -300,6 +300,9 @@ async def test_user_services_fail_to_stop_or_save_data( await _wait_for_containers_to_be_running(app) + # let a few heartbeats pass + await asyncio.sleep(_BASE_HEART_BEAT_INTERVAL * 2) + # in case of manual intervention multiple stops will be sent _EXPECTED_STOP_MESSAGES = 4 for _ in range(_EXPECTED_STOP_MESSAGES):