Skip to content

Commit

Permalink
šŸ› Fixed flaky tests in services/dynamic-sidecar/tests/unit/test_modulā€¦
Browse files Browse the repository at this point in the history
ā€¦es_outputs_event_filter.py (#6795)

Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Nov 21, 2024
1 parent a87de46 commit 5945ea7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

PortEvent: TypeAlias = str | None

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)


_1_MB: Final[PositiveInt] = TypeAdapter(ByteSize).validate_python("1mib")
Expand Down Expand Up @@ -164,14 +164,14 @@ async def _worker_upload_events(self) -> None:
if port_key is None:
break

logger.debug("Request upload for port_key %s", port_key)
_logger.debug("Request upload for port_key %s", port_key)
await self.outputs_manager.port_key_content_changed(port_key)

async def enqueue(self, port_key: str) -> None:
await self._incoming_events_queue.put(port_key)

async def start(self) -> None:
with log_context(logger, logging.INFO, f"{EventFilter.__name__} start"):
with log_context(_logger, logging.INFO, f"{EventFilter.__name__} start"):
self._task_incoming_event_ingestion = create_task(
self._worker_incoming_event_ingestion(),
name=self._worker_incoming_event_ingestion.__name__,
Expand All @@ -195,7 +195,7 @@ async def _cancel_task(task: Task | None) -> None:
with suppress(CancelledError):
await task

with log_context(logger, logging.INFO, f"{EventFilter.__name__} shutdown"):
with log_context(_logger, logging.INFO, f"{EventFilter.__name__} shutdown"):
await self._incoming_events_queue.put(None)
await _cancel_task(self._task_incoming_event_ingestion)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import asyncio
from pathlib import Path
from typing import AsyncIterator, Iterator
from typing import AsyncIterator
from unittest.mock import AsyncMock

import pytest
Expand All @@ -24,12 +24,12 @@
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

_TENACITY_RETRY_PARAMS = dict(
reraise=True,
retry=retry_if_exception_type(AssertionError),
stop=stop_after_delay(10),
wait=wait_fixed(0.01),
)
_TENACITY_RETRY_PARAMS = {
"reraise": True,
"retry": retry_if_exception_type(AssertionError),
"stop": stop_after_delay(10),
"wait": wait_fixed(0.01),
}

# FIXTURES

Expand Down Expand Up @@ -75,11 +75,11 @@ async def outputs_manager(
@pytest.fixture
def mocked_port_key_content_changed(
mocker: MockerFixture, outputs_manager: OutputsManager
) -> Iterator[AsyncMock]:
) -> AsyncMock:
async def _mock_upload_outputs(*args, **kwargs) -> None:
pass

yield mocker.patch.object(
return mocker.patch.object(
outputs_manager, "port_key_content_changed", side_effect=_mock_upload_outputs
)

Expand All @@ -101,8 +101,8 @@ def get_wait_interval(self, dir_size: NonNegativeInt) -> NonNegativeFloat:


@pytest.fixture
def mock_get_directory_total_size(mocker: MockerFixture) -> Iterator[AsyncMock]:
yield mocker.patch(
def mock_get_directory_total_size(mocker: MockerFixture) -> AsyncMock:
return mocker.patch(
"simcore_service_dynamic_sidecar.modules.outputs._event_filter.get_directory_total_size",
return_value=1,
)
Expand All @@ -127,10 +127,6 @@ async def _wait_for_event_to_trigger(event_filter: EventFilter) -> None:
await asyncio.sleep(event_filter.delay_policy.get_min_interval() * 5)


async def _wait_for_event_to_trigger_big_directory(event_filter: EventFilter) -> None:
await asyncio.sleep(event_filter.delay_policy.get_wait_interval(1) * 2)


# TESTS


Expand Down Expand Up @@ -170,8 +166,9 @@ async def test_always_trigger_after_delay(
# event trigger after correct interval delay correctly
for expected_call_count in range(1, 10):
await event_filter.enqueue(port_key_1)
await _wait_for_event_to_trigger_big_directory(event_filter)
assert mocked_port_key_content_changed.call_count == expected_call_count
async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
with attempt:
assert mocked_port_key_content_changed.call_count == expected_call_count


async def test_minimum_amount_of_get_directory_total_size_calls(
Expand All @@ -190,7 +187,6 @@ async def test_minimum_amount_of_get_directory_total_size_calls(
assert mocked_port_key_content_changed.call_count == 0

# event finished processing and was dispatched
await _wait_for_event_to_trigger_big_directory(event_filter)
async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
with attempt:
assert mock_get_directory_total_size.call_count == 2
Expand Down Expand Up @@ -221,9 +217,10 @@ async def test_minimum_amount_of_get_directory_total_size_calls_with_continuous_
assert mocked_port_key_content_changed.call_count == 0

# event finished processing and was dispatched
await _wait_for_event_to_trigger_big_directory(event_filter)
assert mock_get_directory_total_size.call_count == 2
assert mocked_port_key_content_changed.call_count == 1
async for attempt in AsyncRetrying(**_TENACITY_RETRY_PARAMS):
with attempt:
assert mock_get_directory_total_size.call_count == 2
assert mocked_port_key_content_changed.call_count == 1


def test_default_delay_policy():
Expand Down

0 comments on commit 5945ea7

Please sign in to comment.