Skip to content

Commit

Permalink
🐛 Avoid aioredis client form hanging on calls if redis is no longer a…
Browse files Browse the repository at this point in the history
…vailable (ITISFoundation#5821)

Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored May 16, 2024
1 parent a14ec1b commit 734168d
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 79 deletions.
42 changes: 42 additions & 0 deletions packages/pytest-simcore/src/pytest_simcore/container_pause.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio
import contextlib
from collections.abc import AsyncIterator, Callable
from contextlib import AbstractAsyncContextManager

import aiodocker
import pytest


@contextlib.asynccontextmanager
async def _pause_container(
async_docker_client: aiodocker.Docker, container_name: str
) -> AsyncIterator[None]:
containers = await async_docker_client.containers.list(
filters={"name": [f"{container_name}."]}
)
await asyncio.gather(*(c.pause() for c in containers))
# refresh
container_attrs = await asyncio.gather(*(c.show() for c in containers))
for container_status in container_attrs:
assert container_status["State"]["Status"] == "paused"

yield

await asyncio.gather(*(c.unpause() for c in containers))
# refresh
container_attrs = await asyncio.gather(*(c.show() for c in containers))
for container_status in container_attrs:
assert container_status["State"]["Status"] == "running"
# NOTE: container takes some time to start


@pytest.fixture
async def paused_container() -> Callable[[str], AbstractAsyncContextManager[None]]:
@contextlib.asynccontextmanager
async def _(container_name: str) -> AsyncIterator[None]:
async with aiodocker.Docker() as docker_client, _pause_container(
docker_client, container_name
):
yield None

return _
23 changes: 13 additions & 10 deletions packages/service-library/src/servicelib/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
from .utils import logged_gather

_DEFAULT_LOCK_TTL: Final[datetime.timedelta] = datetime.timedelta(seconds=10)
_DEFAULT_SOCKET_TIMEOUT: Final[datetime.timedelta] = datetime.timedelta(seconds=30)


logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


class BaseRedisError(PydanticErrorMixin, RuntimeError):
Expand Down Expand Up @@ -59,16 +60,18 @@ def __post_init__(self):
redis.exceptions.ConnectionError,
redis.exceptions.TimeoutError,
],
socket_timeout=_DEFAULT_SOCKET_TIMEOUT.total_seconds(),
socket_connect_timeout=_DEFAULT_SOCKET_TIMEOUT.total_seconds(),
encoding="utf-8",
decode_responses=True,
)

@retry(**RedisRetryPolicyUponInitialization(logger).kwargs)
@retry(**RedisRetryPolicyUponInitialization(_logger).kwargs)
async def setup(self) -> None:
if not await self._client.ping():
await self.shutdown()
raise CouldNotConnectToRedisError(dsn=self.redis_dsn)
logger.info(
_logger.info(
"Connection to %s succeeded with %s",
f"redis at {self.redis_dsn=}",
f"{self._client=}",
Expand All @@ -78,10 +81,10 @@ async def shutdown(self) -> None:
await self._client.close(close_connection_pool=True)

async def ping(self) -> bool:
try:
return await self._client.ping()
except redis.exceptions.ConnectionError:
return False
with log_catch(_logger, reraise=False):
await self._client.ping()
return True
return False

@contextlib.asynccontextmanager
async def lock_context(
Expand Down Expand Up @@ -119,8 +122,8 @@ async def lock_context(

async def _extend_lock(lock: Lock) -> None:
with log_context(
logger, logging.DEBUG, f"Extending lock {lock_unique_id}"
), log_catch(logger, reraise=False):
_logger, logging.DEBUG, f"Extending lock {lock_unique_id}"
), log_catch(_logger, reraise=False):
await lock.reacquire()

try:
Expand Down Expand Up @@ -156,7 +159,7 @@ async def _extend_lock(lock: Lock) -> None:
await ttl_lock.release()
except redis.exceptions.LockNotOwnedError:
# if this appears outside tests it can cause issues since something might be happening
logger.warning(
_logger.warning(
"Attention: lock is no longer owned. This is unexpected and requires investigation"
)

Expand Down
3 changes: 2 additions & 1 deletion packages/service-library/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from faker import Faker

pytest_plugins = [
"pytest_simcore.container_pause",
"pytest_simcore.docker_compose",
"pytest_simcore.docker_registry",
"pytest_simcore.docker_swarm",
Expand All @@ -21,9 +22,9 @@
"pytest_simcore.rabbit_service",
"pytest_simcore.redis_service",
"pytest_simcore.repository_paths",
"pytest_simcore.schemas",
"pytest_simcore.simcore_service_library_fixtures",
"pytest_simcore.tmp_path_extra",
"pytest_simcore.schemas",
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
# pylint:disable=protected-access

import asyncio
import contextlib
from collections.abc import AsyncIterator, Callable
from collections.abc import Callable
from contextlib import AbstractAsyncContextManager
from dataclasses import dataclass
from typing import Any

import aiodocker
import docker
import pytest
import requests
Expand All @@ -27,44 +26,13 @@
]


@contextlib.asynccontextmanager
async def paused_container(
async_docker_client: aiodocker.Docker, container_name: str
) -> AsyncIterator[None]:
containers = await async_docker_client.containers.list(
filters={"name": [container_name]}
)
await asyncio.gather(*(c.pause() for c in containers))
# refresh
container_attrs = await asyncio.gather(*(c.show() for c in containers))
for container_status in container_attrs:
assert container_status["State"]["Status"] == "paused"

yield

await asyncio.gather(*(c.unpause() for c in containers))
# refresh
container_attrs = await asyncio.gather(*(c.show() for c in containers))
for container_status in container_attrs:
assert container_status["State"]["Status"] == "running"
# NOTE: let the container some time to recover...
await asyncio.sleep(3)


@pytest.fixture
async def async_docker_client() -> AsyncIterator[aiodocker.Docker]:
async with aiodocker.Docker() as docker_client:
yield docker_client


async def test_rabbit_client_lose_connection(
async_docker_client: aiodocker.Docker,
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
create_rabbitmq_client: Callable[[str], RabbitMQClient],
docker_client: docker.client.DockerClient,
):
rabbit_client = create_rabbitmq_client("pinger")
assert await rabbit_client.ping() is True
async with paused_container(async_docker_client, "rabbit"):
async with paused_container("rabbit"):
# check that connection was lost
async for attempt in AsyncRetrying(
stop=stop_after_delay(15), wait=wait_fixed(0.5), reraise=True
Expand Down Expand Up @@ -101,17 +69,17 @@ def _creator(**kwargs: dict[str, Any]) -> PytestRabbitMessage:

@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
async def test_rabbit_client_with_paused_container(
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
random_exchange_name: Callable[[], str],
random_rabbit_message: Callable[..., PytestRabbitMessage],
create_rabbitmq_client: Callable[[str], RabbitMQClient],
async_docker_client: aiodocker.Docker,
):
rabbit_client = create_rabbitmq_client("pinger")
assert await rabbit_client.ping() is True
exchange_name = random_exchange_name()
message = random_rabbit_message()
await rabbit_client.publish(exchange_name, message)
async with paused_container(async_docker_client, "rabbit"):
async with paused_container("rabbit"):
# check that connection was lost
with pytest.raises(asyncio.TimeoutError):
await rabbit_client.publish(exchange_name, message)
Expand Down
24 changes: 23 additions & 1 deletion packages/service-library/tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import asyncio
import datetime
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Callable
from contextlib import AbstractAsyncContextManager
from typing import Final

import pytest
Expand Down Expand Up @@ -279,3 +280,24 @@ async def test_redis_client_sdk_health_checked(redis_service: RedisSettings):
# cleanup
await client.redis.flushall()
await client.shutdown()


@pytest.fixture
def mock_default_socket_timeout(mocker: MockerFixture) -> None:
mocker.patch.object(
servicelib_redis, "_DEFAULT_SOCKET_TIMEOUT", datetime.timedelta(seconds=0.25)
)


async def test_regression_fails_if_on_redis_service_outage(
mock_default_socket_timeout: None,
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
redis_client_sdk: RedisClientSDK,
):
assert await redis_client_sdk.ping() is True

async with paused_container("redis"):
# no connection available any longer should not hang but timeout
assert await redis_client_sdk.ping() is False

assert await redis_client_sdk.ping() is True
3 changes: 2 additions & 1 deletion services/clusters-keeper/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
from types_aiobotocore_ec2.literals import InstanceTypeType

pytest_plugins = [
"pytest_simcore.aws_server",
"pytest_simcore.aws_ec2_service",
"pytest_simcore.aws_server",
"pytest_simcore.container_pause",
"pytest_simcore.dask_scheduler",
"pytest_simcore.docker_compose",
"pytest_simcore.docker_swarm",
Expand Down
32 changes: 4 additions & 28 deletions services/clusters-keeper/tests/unit/test_modules_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
# pylint:disable=unused-argument
# pylint:disable=redefined-outer-name

import asyncio
import contextlib
from collections.abc import AsyncIterator, Callable
from collections.abc import Callable
from contextlib import AbstractAsyncContextManager

import aiodocker
import pytest
from faker import Faker
from fastapi import FastAPI
Expand Down Expand Up @@ -133,39 +131,17 @@ async def test_post_message_with_disabled_rabbit_does_not_raise(
await post_message(initialized_app, message=rabbit_message)


@contextlib.asynccontextmanager
async def paused_container(
async_docker_client: aiodocker.Docker, container_name: str
) -> AsyncIterator[None]:
containers = await async_docker_client.containers.list(
filters={"name": [container_name]}
)
await asyncio.gather(*(c.pause() for c in containers))
# refresh
container_attrs = await asyncio.gather(*(c.show() for c in containers))
for container_status in container_attrs:
assert container_status["State"]["Status"] == "paused"

yield

await asyncio.gather(*(c.unpause() for c in containers))
# refresh
container_attrs = await asyncio.gather(*(c.show() for c in containers))
for container_status in container_attrs:
assert container_status["State"]["Status"] == "running"


async def test_post_message_when_rabbit_disconnected_does_not_raise(
paused_container: Callable[[str], AbstractAsyncContextManager[None]],
enabled_rabbitmq: RabbitSettings,
disabled_ec2: None,
mocked_redis_server: None,
initialized_app: FastAPI,
rabbit_log_message: LoggerRabbitMessage,
async_docker_client: aiodocker.Docker,
):
# NOTE: if the connection is not initialized before pausing the container, then
# this test hangs forever!!! This needs investigations!
await post_message(initialized_app, message=rabbit_log_message)
async with paused_container(async_docker_client, "rabbit"):
async with paused_container("rabbit"):
# now posting should not raise out
await post_message(initialized_app, message=rabbit_log_message)

0 comments on commit 734168d

Please sign in to comment.