diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4e61c8c..3e0a5c8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,9 +10,9 @@ jobs: Quality: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 - - uses: pre-commit/action@v2.0.0 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + - uses: pre-commit/action@v3.0.1 Tests: runs-on: ${{ matrix.os }} @@ -26,28 +26,29 @@ jobs: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install test requirements run: pip install -r test-requirements.txt - name: Run tests - run: python setup.py test --addopts "--cov-report xml" + run: python setup.py test - name: Upload coverage - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v4 with: env_vars: OS,PYTHON + token: ${{ secrets.CODECOV_TOKEN }} Release: runs-on: ubuntu-latest needs: [Quality, Tests] if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags') steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: 3.8 - name: Build source distribution @@ -57,7 +58,7 @@ jobs: pip install wheel python setup.py bdist_wheel - name: Publish source package on PyPI - uses: pypa/gh-action-pypi-publish@master + uses: pypa/gh-action-pypi-publish@release/v1 with: user: __token__ password: ${{ secrets.pypi_password }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9162756..42f8e13 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,11 +18,19 @@ repos: - --extend-ignore=E704 # https://black.readthedocs.io/en/stable/guides/using_black_with_other_tools.html#flake8 - repo: https://github.com/pre-commit/mirrors-mypy + # We can't use the latest version due to a regression in mypy 1.7.0 + # See https://github.com/python/mypy/issues/17191 for more information rev: v1.6.1 hooks: - - id: mypy - files: ^(?!tests) - types: [python] + - id: mypy + files: ^(?!tests) + types: [python] +- repo: https://github.com/RobertCraigie/pyright-python + rev: v1.1.361 + hooks: + - id: pyright + additional_dependencies: [pytest, typing-extensions] + types: [python] - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. rev: v0.0.272 diff --git a/README.rst b/README.rst index 8dd4f90..24c0e57 100644 --- a/README.rst +++ b/README.rst @@ -2,25 +2,7 @@ aiostream ========= -.. image:: https://readthedocs.org/projects/aiostream/badge/?version=latest - :target: http://aiostream.readthedocs.io/en/latest/?badge=latest - :alt: - -.. image:: https://codecov.io/gh/vxgmichel/aiostream/branch/master/graph/badge.svg - :target: https://codecov.io/gh/vxgmichel/aiostream - :alt: - -.. image:: https://travis-ci.org/vxgmichel/aiostream.svg?branch=master - :target: https://travis-ci.org/vxgmichel/aiostream - :alt: - -.. image:: https://img.shields.io/pypi/v/aiostream.svg - :target: https://pypi.python.org/pypi/aiostream - :alt: - -.. image:: https://img.shields.io/pypi/pyversions/aiostream.svg - :target: https://pypi.python.org/pypi/aiostream/ - :alt: +|docs-badge| |cov-badge| |ci-badge| |version-badge| |pyversion-badge| Generator-based operators for asynchronous iteration @@ -43,14 +25,6 @@ A stream is an enhanced asynchronous iterable providing the following features: - **Concatenation** - using addition symbol ``+`` -Requirements ------------- - -The stream operators rely heavily on asynchronous generators (`PEP 525`_): - -- python >= 3.6 - - Stream operators ---------------- @@ -80,7 +54,7 @@ Demonstration The following example demonstrates most of the streams capabilities: -.. sourcecode:: python +.. code:: python import asyncio from aiostream import stream, pipe @@ -191,3 +165,19 @@ Vincent Michel: vxgmichel@gmail.com .. _action: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.action .. _print: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.print + +.. |docs-badge| image:: https://readthedocs.org/projects/aiostream/badge/?version=latest + :target: http://aiostream.readthedocs.io/en/latest/?badge=latest + :alt: +.. |cov-badge| image:: https://codecov.io/gh/vxgmichel/aiostream/branch/main/graph/badge.svg + :target: https://codecov.io/gh/vxgmichel/aiostream + :alt: +.. |ci-badge| image:: https://github.com/vxgmichel/aiostream/workflows/CI/badge.svg + :target: https://github.com/vxgmichel/aiostream/actions/workflows/ci.yml?query=branch%3Amain + :alt: +.. |version-badge| image:: https://img.shields.io/pypi/v/aiostream.svg + :target: https://pypi.python.org/pypi/aiostream + :alt: +.. |pyversion-badge| image:: https://img.shields.io/pypi/pyversions/aiostream.svg + :target: https://pypi.python.org/pypi/aiostream/ + :alt: diff --git a/aiostream/aiter_utils.py b/aiostream/aiter_utils.py index 9055ee1..94d1083 100644 --- a/aiostream/aiter_utils.py +++ b/aiostream/aiter_utils.py @@ -1,6 +1,8 @@ """Utilities for asynchronous iteration.""" from __future__ import annotations + +import sys from types import TracebackType import warnings @@ -106,7 +108,7 @@ def assert_async_iterator(obj: object) -> None: # Async iterator context -T = TypeVar("T") +T = TypeVar("T", covariant=True) Self = TypeVar("Self", bound="AsyncIteratorContext[Any]") @@ -200,10 +202,12 @@ async def __aexit__( # Throw try: assert isinstance(self._aiterator, AsyncGenerator) - await self._aiterator.athrow(typ, value, traceback) - raise RuntimeError( - "Async iterator didn't stop after athrow()" - ) + if sys.version_info >= (3, 12): + assert value is not None + await self._aiterator.athrow(value) + else: + await self._aiterator.athrow(typ, value, traceback) + raise RuntimeError("Async iterator didn't stop after athrow()") # Exception has been (most probably) silenced except StopAsyncIteration as exc: diff --git a/aiostream/core.py b/aiostream/core.py index c787300..845fdfd 100644 --- a/aiostream/core.py +++ b/aiostream/core.py @@ -39,7 +39,7 @@ class StreamEmpty(Exception): # Helpers -T = TypeVar("T") +T = TypeVar("T", covariant=True) X = TypeVar("X") A = TypeVar("A", contravariant=True) P = ParamSpec("P") @@ -345,17 +345,6 @@ async def random(offset=0., width=1.): "since the decorated function becomes an operator class" ) - # Look for "more_sources" - for i, p in enumerate(parameters): - if ( - p.name == "more_sources" - and p.kind == inspect.Parameter.VAR_POSITIONAL - ): - more_sources_index = i - break - else: - more_sources_index = None - # Injected parameters self_parameter = inspect.Parameter( "self", inspect.Parameter.POSITIONAL_OR_KEYWORD @@ -372,9 +361,6 @@ async def random(offset=0., width=1.): # Init method def init(self: BaseStream[T], *args: P.args, **kwargs: P.kwargs) -> None: - if more_sources_index is not None: - for source in args[more_sources_index:]: - assert_async_iterable(source) factory = functools.partial(raw, *args, **kwargs) return BaseStream.__init__(self, factory) diff --git a/aiostream/stream/create.py b/aiostream/stream/create.py index f01ff0d..a3ecb86 100644 --- a/aiostream/stream/create.py +++ b/aiostream/stream/create.py @@ -16,8 +16,9 @@ TypeVar, AsyncIterator, cast, + Type, ) -from typing_extensions import ParamSpec +from typing_extensions import ParamSpec, Never from ..stream import time from ..core import operator, streamcontext @@ -122,7 +123,7 @@ async def call( @operator -async def throw(exc: Exception) -> AsyncIterator[None]: +async def throw(exc: Exception | Type[Exception]) -> AsyncIterator[Never]: """Throw an exception without generating any value.""" if False: yield @@ -130,14 +131,14 @@ async def throw(exc: Exception) -> AsyncIterator[None]: @operator -async def empty() -> AsyncIterator[None]: +async def empty() -> AsyncIterator[Never]: """Terminate without generating any value.""" if False: yield @operator -async def never() -> AsyncIterator[None]: +async def never() -> AsyncIterator[Never]: """Hang forever without generating any value.""" if False: yield diff --git a/aiostream/stream/select.py b/aiostream/stream/select.py index 87b256d..3489132 100644 --- a/aiostream/stream/select.py +++ b/aiostream/stream/select.py @@ -111,7 +111,7 @@ async def filterindex( @pipable_operator -def slice(source: AsyncIterable[T], *args: int) -> AsyncIterator[T]: +def slice(source: AsyncIterable[T], *args: int | None) -> AsyncIterator[T]: """Slice an asynchronous sequence. The arguments are the same as the builtin type slice. diff --git a/aiostream/test_utils.py b/aiostream/test_utils.py index fb33b4e..bfd0f08 100644 --- a/aiostream/test_utils.py +++ b/aiostream/test_utils.py @@ -3,26 +3,41 @@ from __future__ import annotations import asyncio -from unittest.mock import Mock from contextlib import contextmanager +from unittest.mock import Mock +from typing import ( + TYPE_CHECKING, + Any, + Callable, + List, + TypeVar, + AsyncIterable, + AsyncIterator, + ContextManager, + Iterator, +) import pytest from .core import StreamEmpty, streamcontext, pipable_operator -from typing import TYPE_CHECKING, Any, Callable, List if TYPE_CHECKING: from _pytest.fixtures import SubRequest from aiostream.core import Stream -__all__ = ["add_resource", "assert_run", "event_loop"] +__all__ = ["add_resource", "assert_run", "event_loop_policy", "assert_cleanup"] + + +T = TypeVar("T") @pipable_operator -async def add_resource(source, cleanup_time): +async def add_resource( + source: AsyncIterable[T], cleanup_time: float +) -> AsyncIterator[T]: """Simulate an open resource in a stream operator.""" try: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() loop.open_resources += 1 loop.resources += 1 async with streamcontext(source) as streamer: @@ -94,7 +109,7 @@ def assert_run(request: SubRequest) -> Callable: @pytest.fixture -def event_loop(): +def event_loop_policy() -> TimeTrackingTestLoopPolicy: """Fixture providing a test event loop. The event loop simulate and records the sleep operation, @@ -103,78 +118,84 @@ def event_loop(): It also tracks simulated resources and make sure they are all released before the loop is closed. """ + return TimeTrackingTestLoopPolicy() + - class TimeTrackingTestLoop(asyncio.BaseEventLoop): - stuck_threshold = 100 - - def __init__(self): - super().__init__() - self._time = 0 - self._timers = [] - self._selector = Mock() - self.clear() - - # Loop internals - - def _run_once(self): - super()._run_once() - # Update internals - self.busy_count += 1 - self._timers = sorted( - when for when in self._timers if when > loop.time() - ) - # Time advance - if self.time_to_go: - when = self._timers.pop(0) - step = when - loop.time() - self.steps.append(step) - self.advance_time(step) - self.busy_count = 0 - - def _process_events(self, event_list): - return - - def _write_to_self(self): - return - - # Time management - - def time(self): - return self._time - - def advance_time(self, advance): - if advance: - self._time += advance - - def call_at(self, when, callback, *args, **kwargs): - self._timers.append(when) - return super().call_at(when, callback, *args, **kwargs) - - @property - def stuck(self): - return self.busy_count > self.stuck_threshold - - @property - def time_to_go(self): - return self._timers and (self.stuck or not self._ready) - - # Resource management - - def clear(self): - self.steps = [] - self.open_resources = 0 - self.resources = 0 +@pytest.fixture +def assert_cleanup( + event_loop: TimeTrackingTestLoop, +) -> Callable[[], ContextManager[TimeTrackingTestLoop]]: + """Fixture to assert cleanup of resources.""" + return event_loop.assert_cleanup + + +class TimeTrackingTestLoop(asyncio.BaseEventLoop): + stuck_threshold: int = 100 + + def __init__(self): + super().__init__() + self._time: float = 0.0 + self._timers: list[float] = [] + self._selector = Mock() + self.clear() + + # Loop internals + + def _run_once(self) -> None: + super()._run_once() + # Update internals + self.busy_count += 1 + self._timers = sorted(when for when in self._timers if when > self.time()) + # Time advance + if self.time_to_go: + when = self._timers.pop(0) + step = when - self.time() + self.steps.append(step) + self.advance_time(step) self.busy_count = 0 - @contextmanager - def assert_cleanup(self): - self.clear() - yield self - assert self.open_resources == 0 - self.clear() - - loop = TimeTrackingTestLoop() - asyncio.set_event_loop(loop) - with loop.assert_cleanup(): - yield loop - loop.close() + def _process_events(self, event_list) -> None: + return + + def _write_to_self(self) -> None: + return + + # Time management + + def time(self) -> float: + return self._time + + def advance_time(self, advance: float) -> None: + if advance: + self._time += advance + + def call_at(self, when, callback, *args, **kwargs): + self._timers.append(when) + return super().call_at(when, callback, *args, **kwargs) + + @property + def stuck(self) -> bool: + return self.busy_count > self.stuck_threshold + + @property + def time_to_go(self) -> bool: + return self._timers and (self.stuck or not self._ready) + + # Resource management + + def clear(self) -> None: + self.steps = [] + self.open_resources = 0 + self.resources = 0 + self.busy_count = 0 + + @contextmanager + def assert_cleanup(self) -> Iterator[TimeTrackingTestLoop]: + self.clear() + yield self + assert self.open_resources == 0 + self.clear() + + +class TimeTrackingTestLoopPolicy(asyncio.DefaultEventLoopPolicy): + _loop_factory = TimeTrackingTestLoop diff --git a/docs/index.rst b/docs/index.rst index 21b169b..bed01ec 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -6,12 +6,12 @@ aiostream :target: http://aiostream.readthedocs.io/en/latest/?badge=latest :alt: -.. image:: https://coveralls.io/repos/github/vxgmichel/aiostream/badge.svg?branch=master - :target: https://coveralls.io/github/vxgmichel/aiostream?branch=master +.. image:: https://codecov.io/gh/vxgmichel/aiostream/branch/main/graph/badge.svg + :target: https://codecov.io/gh/vxgmichel/aiostream :alt: -.. image:: https://travis-ci.org/vxgmichel/aiostream.svg?branch=master - :target: https://travis-ci.org/vxgmichel/aiostream +.. image:: https://github.com/vxgmichel/aiostream/workflows/CI/badge.svg + :target: https://github.com/vxgmichel/aiostream/actions/workflows/ci.yml?query=branch%3Amain :alt: .. image:: https://img.shields.io/pypi/v/aiostream.svg diff --git a/docs/presentation.rst b/docs/presentation.rst index 461507d..6359895 100644 --- a/docs/presentation.rst +++ b/docs/presentation.rst @@ -16,14 +16,6 @@ A stream is an enhanced asynchronous iterable providing the following features: - **Concatenation** - using addition symbol ``+`` -Requirements ------------- - -The stream operators rely heavily on asynchronous generators (`PEP 525`_): - -- python >= 3.6 - - Stream operators ---------------- diff --git a/examples/norm_server.py b/examples/norm_server.py index a1283f2..8106523 100644 --- a/examples/norm_server.py +++ b/examples/norm_server.py @@ -51,6 +51,9 @@ def strip(x: bytes, *_: object) -> str: def nonempty(x: str) -> bool: return x != "" + def to_float(x: str, *_: object) -> float: + return float(x) + def square(x: float, *_: object) -> float: return x**2 @@ -66,7 +69,7 @@ def square_root(x: float, *_: object) -> float: | pipe.print("string: {}") | pipe.map(strip) | pipe.takewhile(nonempty) - | pipe.map(float) + | pipe.map(to_float) | pipe.map(square) | pipe.print("square: {:.2f}") | pipe.action(write_cursor) diff --git a/pyproject.toml b/pyproject.toml index b6d2487..138f514 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ +[tool.pyright] +ignore = ["aiostream/test_utils.py"] + [tool.black] line-length = 80 target_version = ["py38", "py39", "py310", "py311", "py312"] diff --git a/setup.cfg b/setup.cfg index e9876ef..d3728a1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,11 +1,11 @@ [tool:pytest] -addopts = tests --strict-markers --cov aiostream --cov-report html --cov-report term +addopts = tests --strict-markers --cov aiostream [coverage:report] exclude_lines = pragma: no cover if TYPE_CHECKING: - ... + \.\.\. [aliases] test = pytest diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a787d8c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,13 @@ +from aiostream.test_utils import ( + add_resource, + assert_run, + assert_cleanup, + event_loop_policy, +) + +__all__ = [ + "add_resource", + "assert_run", + "assert_cleanup", + "event_loop_policy", +] diff --git a/tests/test_advanced.py b/tests/test_advanced.py index 90fdead..e085d59 100644 --- a/tests/test_advanced.py +++ b/tests/test_advanced.py @@ -1,102 +1,103 @@ import pytest from aiostream import stream, pipe -from aiostream.test_utils import assert_run, event_loop - -# Pytest fixtures -assert_run, event_loop +from aiostream.core import Stream @pytest.mark.asyncio -async def test_concatmap(assert_run, event_loop): +async def test_concatmap(assert_run, assert_cleanup): + def target1(x: int, *_) -> Stream[int]: + return stream.range(x, x + 2, interval=5) + + def target2(x: int, *_) -> Stream[int]: + return stream.range(x, x + 4, interval=1) + + def target3(x: int, *_) -> Stream[int]: + return stream.range(0, 3, interval=1) if x else stream.throw(ZeroDivisionError) + # Concurrent run - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(0, 6, 2, interval=1) - ys = xs | pipe.concatmap(lambda x: stream.range(x, x + 2, interval=5)) + ys = xs | pipe.concatmap(target1) await assert_run(ys, [0, 1, 2, 3, 4, 5]) - assert event_loop.steps == [1, 1, 3, 5, 5] + assert loop.steps == [1, 1, 3, 5, 5] # Sequential run - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(0, 6, 2, interval=1) - ys = xs | pipe.concatmap( - lambda x: stream.range(x, x + 2, interval=5), task_limit=1 - ) + ys = xs | pipe.concatmap(target1, task_limit=1) await assert_run(ys, [0, 1, 2, 3, 4, 5]) - assert event_loop.steps == [5, 1, 5, 1, 5] + assert loop.steps == [5, 1, 5, 1, 5] # Limited run - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(0, 6, 2, interval=1) - ys = xs | pipe.concatmap( - lambda x: stream.range(x, x + 2, interval=5), task_limit=2 - ) + ys = xs | pipe.concatmap(target1, task_limit=2) await assert_run(ys, [0, 1, 2, 3, 4, 5]) - assert event_loop.steps == [1, 4, 1, 4, 5] + assert loop.steps == [1, 4, 1, 4, 5] # Make sure item arrive as soon as possible - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.just(2) - ys = xs | pipe.concatmap(lambda x: stream.range(x, x + 4, interval=1)) + ys = xs | pipe.concatmap(target2) zs = ys | pipe.timeout(2) # Sould NOT raise await assert_run(zs, [2, 3, 4, 5]) - assert event_loop.steps == [1, 1, 1] + assert loop.steps == [1, 1, 1] # An exception might get discarded if the result can be produced before the # processing of the exception is required - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.iterate([True, False]) - ys = xs | pipe.concatmap( - lambda x: ( - stream.range(0, 3, interval=1) - if x - else stream.throw(ZeroDivisionError) - ) - ) + ys = xs | pipe.concatmap(target3) zs = ys | pipe.take(3) await assert_run(zs, [0, 1, 2]) - assert event_loop.steps == [1, 1] + assert loop.steps == [1, 1] @pytest.mark.asyncio -async def test_flatmap(assert_run, event_loop): +async def test_flatmap(assert_run, assert_cleanup): + def target1(x: int, *_) -> Stream[int]: + return stream.range(x, x + 2, interval=5) + # Concurrent run - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(0, 6, 2, interval=1) - ys = xs | pipe.flatmap(lambda x: stream.range(x, x + 2, interval=5)) + ys = xs | pipe.flatmap(target1) await assert_run(ys, [0, 2, 4, 1, 3, 5]) - assert event_loop.steps == [1, 1, 3, 1, 1] + assert loop.steps == [1, 1, 3, 1, 1] # Sequential run - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(0, 6, 2, interval=1) - ys = xs | pipe.flatmap( - lambda x: stream.range(x, x + 2, interval=5), task_limit=1 - ) + ys = xs | pipe.flatmap(target1, task_limit=1) await assert_run(ys, [0, 1, 2, 3, 4, 5]) - assert event_loop.steps == [5, 1, 5, 1, 5] + assert loop.steps == [5, 1, 5, 1, 5] # Limited run - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(0, 6, 2, interval=1) - ys = xs | pipe.flatmap( - lambda x: stream.range(x, x + 2, interval=5), task_limit=2 - ) + ys = xs | pipe.flatmap(target1, task_limit=2) await assert_run(ys, [0, 2, 1, 3, 4, 5]) - assert event_loop.steps == [1, 4, 1, 5] + assert loop.steps == [1, 4, 1, 5] @pytest.mark.asyncio -async def test_switchmap(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_switchmap(assert_run, assert_cleanup): + def target1(x: int, *_) -> Stream[int]: + return stream.range(x, x + 5, interval=1) + + def target2(x: int, *_) -> Stream[int]: + return stream.range(x, x + 2, interval=2) + + with assert_cleanup() as loop: xs = stream.range(0, 30, 10, interval=3) - ys = xs | pipe.switchmap(lambda x: stream.range(x, x + 5, interval=1)) + ys = xs | pipe.switchmap(target1) await assert_run(ys, [0, 1, 2, 10, 11, 12, 20, 21, 22, 23, 24]) - assert event_loop.steps == [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] + assert loop.steps == [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] # Test cleanup procedure - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(0, 5, interval=1) - ys = xs | pipe.switchmap(lambda x: stream.range(x, x + 2, interval=2)) + ys = xs | pipe.switchmap(target2) await assert_run(ys[:3], [0, 1, 2]) - assert event_loop.steps == [1, 1] + assert loop.steps == [1, 1] diff --git a/tests/test_aggregate.py b/tests/test_aggregate.py index 1dafeb4..7068629 100644 --- a/tests/test_aggregate.py +++ b/tests/test_aggregate.py @@ -3,19 +3,16 @@ import operator from aiostream import stream, pipe -from aiostream.test_utils import assert_run, event_loop, add_resource - -# Pytest fixtures -assert_run, event_loop +from aiostream.test_utils import add_resource @pytest.mark.asyncio -async def test_aggregate(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_aggregate(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) | pipe.accumulate() await assert_run(xs, [0, 1, 3, 6, 10]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = ( stream.range(2, 4) | add_resource.pipe(1) @@ -23,41 +20,41 @@ async def test_aggregate(assert_run, event_loop): ) await assert_run(xs, [2, 4, 12]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(0) | add_resource.pipe(1) | pipe.accumulate() await assert_run(xs, []) async def sleepmax(x, y): return await asyncio.sleep(1, result=max(x, y)) - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(3) | add_resource.pipe(1) | pipe.accumulate(sleepmax) await assert_run(xs, [0, 1, 2]) - assert event_loop.steps == [1] * 3 + assert loop.steps == [1] * 3 @pytest.mark.asyncio -async def test_reduce(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_reduce(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) | pipe.reduce(min) await assert_run(xs, [0]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) | pipe.reduce(max) await assert_run(xs, [4]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(0) | add_resource.pipe(1) | pipe.reduce(max) await assert_run(xs, [], IndexError("Index out of range")) @pytest.mark.asyncio -async def test_list(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_list(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.range(3) | add_resource.pipe(1) | pipe.list() # The same list object is yielded at each step await assert_run(xs, [[0, 1, 2], [0, 1, 2], [0, 1, 2], [0, 1, 2]]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(0) | add_resource.pipe(1) | pipe.list() await assert_run(xs, [[]]) diff --git a/tests/test_aiter.py b/tests/test_aiter.py index 319a2b2..938faab 100644 --- a/tests/test_aiter.py +++ b/tests/test_aiter.py @@ -1,12 +1,8 @@ import pytest import asyncio -from aiostream.test_utils import event_loop from aiostream.aiter_utils import AsyncIteratorContext, aitercontext, anext -# Pytest fixtures -event_loop - # Some async iterators for testing @@ -55,17 +51,18 @@ async def __anext__(self): @pytest.mark.asyncio -async def test_simple_aitercontext(event_loop): - async with aitercontext(agen()) as safe_gen: - # Cannot enter twice - with pytest.raises(RuntimeError): - async with safe_gen: - pass +async def test_simple_aitercontext(assert_cleanup): + with assert_cleanup() as loop: + async with aitercontext(agen()) as safe_gen: + # Cannot enter twice + with pytest.raises(RuntimeError): + async with safe_gen: + pass - it = iter(range(5)) - async for item in safe_gen: - assert item == next(it) - assert event_loop.steps == [1] * 5 + it = iter(range(5)) + async for item in safe_gen: + assert item == next(it) + assert loop.steps == [1] * 5 # Exiting is idempotent await safe_gen.__aexit__(None, None, None) @@ -83,7 +80,7 @@ async def test_simple_aitercontext(event_loop): @pytest.mark.asyncio -async def test_athrow_in_aitercontext(event_loop): +async def test_athrow_in_aitercontext(): async with aitercontext(agen()) as safe_gen: assert await safe_gen.__anext__() == 0 with pytest.raises(ZeroDivisionError): @@ -93,30 +90,30 @@ async def test_athrow_in_aitercontext(event_loop): @pytest.mark.asyncio -async def test_aitercontext_wrong_usage(event_loop): +async def test_aitercontext_wrong_usage(): safe_gen = aitercontext(agen()) with pytest.warns(UserWarning): await anext(safe_gen) with pytest.raises(TypeError): - AsyncIteratorContext(None) + AsyncIteratorContext(None) # type: ignore with pytest.raises(TypeError): AsyncIteratorContext(safe_gen) @pytest.mark.asyncio -async def test_raise_in_aitercontext(event_loop): +async def test_raise_in_aitercontext(): with pytest.raises(ZeroDivisionError): async with aitercontext(agen()) as safe_gen: async for _ in safe_gen: - 1 / 0 + raise ZeroDivisionError with pytest.raises(ZeroDivisionError): async with aitercontext(agen()) as safe_gen: async for _ in safe_gen: pass - 1 / 0 + raise ZeroDivisionError with pytest.raises(GeneratorExit): async with aitercontext(agen()) as safe_gen: @@ -131,11 +128,11 @@ async def test_raise_in_aitercontext(event_loop): @pytest.mark.asyncio -async def test_silence_exception_in_aitercontext(event_loop): +async def test_silence_exception_in_aitercontext(): async with aitercontext(silence_agen()) as safe_gen: async for item in safe_gen: assert item == 1 - 1 / 0 + raise ZeroDivisionError # Silencing a generator exit is forbidden with pytest.raises(GeneratorExit): @@ -145,12 +142,12 @@ async def test_silence_exception_in_aitercontext(event_loop): @pytest.mark.asyncio -async def test_reraise_exception_in_aitercontext(event_loop): +async def test_reraise_exception_in_aitercontext(): with pytest.raises(RuntimeError) as info: async with aitercontext(reraise_agen()) as safe_gen: async for item in safe_gen: assert item == 1 - 1 / 0 + raise ZeroDivisionError assert type(info.value.__cause__) is ZeroDivisionError with pytest.raises(RuntimeError) as info: @@ -162,12 +159,12 @@ async def test_reraise_exception_in_aitercontext(event_loop): @pytest.mark.asyncio -async def test_stuck_in_aitercontext(event_loop): +async def test_stuck_in_aitercontext(): with pytest.raises(RuntimeError) as info: async with aitercontext(stuck_agen()) as safe_gen: async for item in safe_gen: assert item == 1 - 1 / 0 + raise ZeroDivisionError assert "didn't stop after athrow" in str(info.value) with pytest.raises(RuntimeError) as info: @@ -181,7 +178,7 @@ async def test_stuck_in_aitercontext(event_loop): @pytest.mark.asyncio -async def test_not_an_agen_in_aitercontext(event_loop): +async def test_not_an_agen_in_aitercontext(): async with aitercontext(not_an_agen([1])) as safe_gen: async for item in safe_gen: assert item == 1 @@ -189,4 +186,4 @@ async def test_not_an_agen_in_aitercontext(event_loop): with pytest.raises(ZeroDivisionError): async with aitercontext(not_an_agen([1])) as safe_gen: async for item in safe_gen: - 1 / 0 + raise ZeroDivisionError diff --git a/tests/test_combine.py b/tests/test_combine.py index c21afda..161a66b 100644 --- a/tests/test_combine.py +++ b/tests/test_combine.py @@ -1,27 +1,25 @@ +from typing import Awaitable import pytest import asyncio from aiostream import stream, pipe, async_, await_ -from aiostream.test_utils import assert_run, event_loop, add_resource - -# Pytest fixtures -assert_run, event_loop +from aiostream.test_utils import add_resource @pytest.mark.asyncio -async def test_chain(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_chain(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.range(5) + stream.range(5, 10) await assert_run(xs, list(range(10))) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(10, 15) | add_resource.pipe(1) xs += stream.range(15, 20) | add_resource.pipe(1) await assert_run(xs, list(range(10, 20))) @pytest.mark.asyncio -async def test_zip(assert_run, event_loop): +async def test_zip(assert_run): xs = stream.range(5) | add_resource.pipe(1.0) ys = xs | pipe.zip(xs, xs) expected = [(x,) * 3 for x in range(5)] @@ -29,117 +27,132 @@ async def test_zip(assert_run, event_loop): @pytest.mark.asyncio -async def test_map(assert_run, event_loop): +async def test_map(assert_run, assert_cleanup): + def square_target(arg: int, *_) -> int: + return arg**2 + + def sum_target(a: int, b: int, *_) -> int: + return a + b + + async def sleep_only(arg: float, *_) -> None: + return await asyncio.sleep(arg) + + async def sleep_and_result(arg: float, result: int, *_) -> int: + return await asyncio.sleep(arg, result) + + def not_a_coro_function(arg: int, *_) -> Awaitable[int]: + return asyncio.sleep(arg, arg) + # Synchronous/simple - with event_loop.assert_cleanup(): - xs = stream.range(5) | pipe.map(lambda x: x**2) + with assert_cleanup(): + xs = stream.range(5) | pipe.map(square_target) expected = [x**2 for x in range(5)] await assert_run(xs, expected) # Synchronous/multiple - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) - ys = xs | pipe.map(lambda x, y: x + y, xs) + ys = xs | pipe.map(sum_target, xs) expected = [x * 2 for x in range(5)] await assert_run(ys, expected) # Asynchronous/simple/concurrent - with event_loop.assert_cleanup(): - xs = stream.range(1, 4) | pipe.map(asyncio.sleep) + with assert_cleanup() as loop: + xs = stream.range(1, 4) | pipe.map(sleep_only) expected = [None] * 3 await assert_run(xs, expected) - assert event_loop.steps == [1, 1, 1] + assert loop.steps == [1, 1, 1] # Asynchronous/simple/sequential - with event_loop.assert_cleanup(): - xs = stream.range(1, 4) | pipe.map(asyncio.sleep, task_limit=1) + with assert_cleanup() as loop: + xs = stream.range(1, 4) | pipe.map(sleep_only, task_limit=1) expected = [None] * 3 await assert_run(xs, expected) - assert event_loop.steps == [1, 2, 3] + assert loop.steps == [1, 2, 3] # Asynchronous/multiple/concurrent - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(1, 4) - ys = xs | pipe.map(asyncio.sleep, xs) + ys = xs | pipe.map(sleep_and_result, xs) await assert_run(ys, [1, 2, 3]) - assert event_loop.steps == [1, 1, 1] + assert loop.steps == [1, 1, 1] # Asynchronous/multiple/sequential - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(1, 4) - ys = xs | pipe.map(asyncio.sleep, xs, task_limit=1) + ys = xs | pipe.map(sleep_and_result, xs, task_limit=1) await assert_run(ys, [1, 2, 3]) - assert event_loop.steps == [1, 2, 3] + assert loop.steps == [1, 2, 3] # As completed - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.iterate([2, 4, 1, 3, 5]) - ys = xs | pipe.map(asyncio.sleep, xs, ordered=False) + ys = xs | pipe.map(sleep_and_result, xs, ordered=False) await assert_run(ys, [1, 2, 3, 4, 5]) - assert event_loop.steps == [1, 1, 1, 1, 1] + assert loop.steps == [1, 1, 1, 1, 1] # Invalid argument with pytest.raises(ValueError): - await (stream.range(1, 4) | pipe.map(asyncio.sleep, task_limit=0)) + await (stream.range(1, 4) | pipe.map(sleep_only, task_limit=0)) # Break - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.count(1) - ys = xs | pipe.map(asyncio.sleep, xs, task_limit=10) + ys = xs | pipe.map(sleep_and_result, xs, task_limit=10) await assert_run(ys[:3], [1, 2, 3]) - assert event_loop.steps == [1, 1, 1] + assert loop.steps == [1, 1, 1] # Stuck - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.count(1) - ys = xs | pipe.map(asyncio.sleep, xs, task_limit=1) | pipe.timeout(5) + ys = xs | pipe.map(sleep_and_result, xs, task_limit=1) | pipe.timeout(5) await assert_run(ys, [1, 2, 3, 4], asyncio.TimeoutError()) # Force await - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.iterate([1, 2, 3]) - ys = xs | pipe.map(async_(lambda x: asyncio.sleep(x, x))) + ys = xs | pipe.map(async_(not_a_coro_function)) await assert_run(ys, [1, 2, 3]) - assert event_loop.steps == [1, 1, 1] + assert loop.steps == [1, 1, 1] # Map await_ - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.iterate(map(lambda x: asyncio.sleep(x, x), [1, 2, 3])) - ys = xs | pipe.map(await_) + ys = xs | pipe.map(await_) # type: ignore await assert_run(ys, [1, 2, 3]) - assert event_loop.steps == [1, 1, 1] + assert loop.steps == [1, 1, 1] @pytest.mark.asyncio -async def test_merge(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_merge(assert_run, assert_cleanup): + with assert_cleanup() as loop: xs = stream.range(1, 5, 2, interval=2) | pipe.delay(1) ys = stream.range(0, 5, 2, interval=2) | pipe.merge(xs) await assert_run(ys, [0, 1, 2, 3, 4]) - assert event_loop.steps == [1, 1, 1, 1] + assert loop.steps == [1, 1, 1, 1] - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(1, 5, 2, interval=2) | pipe.delay(1) ys = stream.range(0, 5, 2, interval=2) | pipe.merge(xs) await assert_run(ys[:3], [0, 1, 2]) - assert event_loop.steps == [1, 1] + assert loop.steps == [1, 1] - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.just(1) + stream.never() ys = xs | pipe.merge(xs) | pipe.timeout(1) await assert_run(ys, [1, 1], asyncio.TimeoutError()) - assert event_loop.steps == [1] + assert loop.steps == [1] # Reproduce issue #65 - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.iterate([1, 2]) ys = stream.iterate([3, 4]) zs = stream.merge(xs, ys) | pipe.take(3) await assert_run(zs, [1, 2, 3]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.iterate([1, 2, 3]) - ys = stream.throw(ZeroDivisionError) + ys = stream.throw(ZeroDivisionError()) zs = stream.merge(xs, ys) | pipe.delay(1) | pipe.take(3) await assert_run(zs, [1, 2, 3]) @@ -156,23 +169,23 @@ async def agen1(): async def agen2(): yield 1 - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.merge(agen1(), agen2()) | pipe.delay(1) | pipe.take(1) await assert_run(xs, [1]) @pytest.mark.asyncio -async def test_ziplatest(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_ziplatest(assert_run, assert_cleanup): + with assert_cleanup() as loop: xs = stream.range(0, 5, 2, interval=2) ys = stream.range(1, 5, 2, interval=2) | pipe.delay(1) zs = stream.ziplatest(xs, ys, default="▲") await assert_run(zs, [(0, "▲"), (0, 1), (2, 1), (2, 3), (4, 3)]) - assert event_loop.steps == [1, 1, 1, 1] + assert loop.steps == [1, 1, 1, 1] - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(0, 5, 2, interval=2) ys = stream.range(1, 5, 2, interval=2) | pipe.delay(1) zs = stream.ziplatest(xs, ys, partial=False) await assert_run(zs, [(0, 1), (2, 1), (2, 3), (4, 3)]) - assert event_loop.steps == [1, 1, 1, 1] + assert loop.steps == [1, 1, 1, 1] diff --git a/tests/test_core.py b/tests/test_core.py index a97a35e..275d5f8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,29 +1,26 @@ import pytest -from aiostream.test_utils import event_loop, add_resource +from aiostream.test_utils import add_resource from aiostream import stream, streamcontext, operator -# Pytest fixtures -event_loop - @pytest.mark.asyncio -async def test_streamcontext(event_loop): - with event_loop.assert_cleanup(): +async def test_streamcontext(assert_cleanup): + with assert_cleanup() as loop: xs = stream.range(3) | add_resource.pipe(1) async with streamcontext(xs) as streamer: it = iter(range(3)) async for item in streamer: assert item == next(it) - assert event_loop.steps == [1] + assert loop.steps == [1] - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(5) | add_resource.pipe(1) async with xs.stream() as streamer: it = iter(range(5)) async for item in streamer: assert item == next(it) - assert event_loop.steps == [1] + assert loop.steps == [1] def test_operator_from_method(): @@ -51,7 +48,7 @@ async def method(cls, arg): @pytest.mark.asyncio -async def test_error_on_sync_iteration(event_loop): +async def test_error_on_sync_iteration(): xs = stream.range(3) # Stream raises a TypeError @@ -67,12 +64,12 @@ async def test_error_on_sync_iteration(event_loop): @pytest.mark.asyncio -async def test_error_on_entering_a_stream(event_loop): +async def test_error_on_entering_a_stream(): xs = stream.range(3) # Stream raises a TypeError with pytest.raises(TypeError) as ctx: - async with xs: + async with xs: # type: ignore assert False assert "Use the `stream` method" in str(ctx.value) @@ -84,7 +81,7 @@ async def test1(): yield 1 with pytest.raises(AttributeError): - test1.pipe + test1.pipe # type: ignore match = "The `pipable` argument is deprecated." with pytest.warns(DeprecationWarning, match=match): @@ -94,7 +91,7 @@ async def test2(): yield 1 with pytest.raises(AttributeError): - test2.pipe + test2.pipe # type: ignore with pytest.warns(DeprecationWarning, match=match): @@ -103,7 +100,7 @@ async def test3(): yield 1 with pytest.raises(AttributeError): - test3.pipe + test3.pipe # type: ignore with pytest.warns(DeprecationWarning, match=match): @@ -111,4 +108,4 @@ async def test3(): async def test4(source): yield 1 - test4.pipe + test4.pipe # type: ignore diff --git a/tests/test_create.py b/tests/test_create.py index 3c6724d..abc0c49 100644 --- a/tests/test_create.py +++ b/tests/test_create.py @@ -1,10 +1,6 @@ import pytest import asyncio from aiostream import stream, pipe -from aiostream.test_utils import assert_run, event_loop - -# Pytest fixtures -assert_run, event_loop @pytest.mark.asyncio @@ -22,13 +18,13 @@ async def four(): @pytest.mark.asyncio async def test_call(assert_run): - def myfunc(a, b, c=0, d=4): + def myfunc(a: int, b: int, c: int = 0, d: int = 4): return a, b, c, d xs = stream.call(myfunc, 1, 2, c=3) await assert_run(xs, [(1, 2, 3, 4)]) - async def myasyncfunc(a, b, c=0, d=4): + async def myasyncfunc(a: int, b: int, c: int = 0, d: int = 4): return a, b, c, d xs = stream.call(myasyncfunc, 1, 2, c=3) @@ -49,10 +45,11 @@ async def test_empty(assert_run): @pytest.mark.asyncio -async def test_never(assert_run, event_loop): - xs = stream.never() | pipe.timeout(30.0) - await assert_run(xs, [], asyncio.TimeoutError()) - assert event_loop.steps == [30.0] +async def test_never(assert_run, assert_cleanup): + with assert_cleanup() as loop: + xs = stream.never() | pipe.timeout(30.0) + await assert_run(xs, [], asyncio.TimeoutError()) + assert loop.steps == [30.0] @pytest.mark.asyncio @@ -65,10 +62,11 @@ async def test_repeat(assert_run): @pytest.mark.asyncio -async def test_range(assert_run, event_loop): - xs = stream.range(3, 10, 2, interval=1.0) - await assert_run(xs, [3, 5, 7, 9]) - assert event_loop.steps == [1, 1, 1] +async def test_range(assert_run, assert_cleanup): + with assert_cleanup() as loop: + xs = stream.range(3, 10, 2, interval=1.0) + await assert_run(xs, [3, 5, 7, 9]) + assert loop.steps == [1, 1, 1] @pytest.mark.asyncio @@ -89,31 +87,33 @@ async def test_iterable(assert_run): @pytest.mark.asyncio -async def test_async_iterable(assert_run, event_loop): +async def test_async_iterable(assert_run, assert_cleanup): async def agen(): for x in range(2, 5): yield await asyncio.sleep(1.0, result=x**2) - xs = stream.create.from_async_iterable(agen()) - await assert_run(xs, [4, 9, 16]) - assert event_loop.steps == [1.0, 1.0, 1.0] + with assert_cleanup() as loop: + xs = stream.create.from_async_iterable(agen()) + await assert_run(xs, [4, 9, 16]) + assert loop.steps == [1.0, 1.0, 1.0] - xs = stream.iterate(agen()) - await assert_run(xs, [4, 9, 16]) - assert event_loop.steps == [1.0, 1.0, 1.0] * 2 + with assert_cleanup() as loop: + xs = stream.iterate(agen()) + await assert_run(xs, [4, 9, 16]) + assert loop.steps == [1.0, 1.0, 1.0] @pytest.mark.asyncio -async def test_non_iterable(assert_run): +async def test_non_iterable(): with pytest.raises(TypeError): - stream.iterate(None) + stream.iterate(None) # type: ignore with pytest.raises(TypeError): - stream.create.from_async_iterable(None) + stream.create.from_async_iterable(None) # type: ignore @pytest.mark.asyncio -async def test_preserve(assert_run, event_loop): +async def test_preserve(assert_run): async def agen(): yield 1 yield 2 diff --git a/tests/test_misc.py b/tests/test_misc.py index 2f2ff42..71275c0 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -3,21 +3,18 @@ import asyncio from aiostream import stream, pipe -from aiostream.test_utils import assert_run, event_loop, add_resource - -# Pytest fixtures -assert_run, event_loop +from aiostream.test_utils import add_resource @pytest.mark.asyncio -async def test_action(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_action(assert_run, assert_cleanup): + with assert_cleanup(): lst = [] xs = stream.range(3) | add_resource.pipe(1) | pipe.action(lst.append) await assert_run(xs, [0, 1, 2]) assert lst == [0, 1, 2] - with event_loop.assert_cleanup(): + with assert_cleanup(): queue = asyncio.Queue() xs = stream.range(3) | add_resource.pipe(1) | pipe.action(queue.put) await assert_run(xs, [0, 1, 2]) @@ -27,14 +24,14 @@ async def test_action(assert_run, event_loop): @pytest.mark.asyncio -async def test_print(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_print(assert_run, assert_cleanup): + with assert_cleanup(): f = io.StringIO() xs = stream.range(3) | add_resource.pipe(1) | pipe.print(file=f) await assert_run(xs, [0, 1, 2]) assert f.getvalue() == "0\n1\n2\n" - with event_loop.assert_cleanup(): + with assert_cleanup(): f = io.StringIO() xs = ( stream.range(3) diff --git a/tests/test_select.py b/tests/test_select.py index 665a62b..401b191 100644 --- a/tests/test_select.py +++ b/tests/test_select.py @@ -2,48 +2,45 @@ import pytest from aiostream import stream, pipe -from aiostream.test_utils import assert_run, event_loop, add_resource - -# Pytest fixtures -assert_run, event_loop +from aiostream.test_utils import add_resource @pytest.mark.asyncio -async def test_take(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_take(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.count() | add_resource.pipe(1) | pipe.take(3) await assert_run(xs, [0, 1, 2]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.count() | add_resource.pipe(1) | pipe.take(0) await assert_run(xs, []) @pytest.mark.asyncio -async def test_takelast(assert_run, event_loop): +async def test_takelast(assert_run): xs = stream.range(10) | add_resource.pipe(1) | pipe.takelast(3) await assert_run(xs, [7, 8, 9]) @pytest.mark.asyncio -async def test_skip(assert_run, event_loop): +async def test_skip(assert_run, assert_cleanup): xs = stream.range(10) | add_resource.pipe(1) | pipe.skip(8) await assert_run(xs, [8, 9]) @pytest.mark.asyncio -async def test_skiplast(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_skiplast(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.range(10) | add_resource.pipe(1) | pipe.skiplast(8) await assert_run(xs, [0, 1]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(10) | add_resource.pipe(1) | pipe.skiplast(0) await assert_run(xs, list(range(10))) @pytest.mark.asyncio -async def test_filterindex(assert_run, event_loop): +async def test_filterindex(assert_run, assert_cleanup): filterindex = stream.select.filterindex xs = ( stream.range(10) @@ -54,22 +51,22 @@ async def test_filterindex(assert_run, event_loop): @pytest.mark.asyncio -async def test_slice(assert_run, event_loop): +async def test_slice(assert_run, assert_cleanup): slice = stream.select.slice - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(10, 20) | add_resource.pipe(1) | slice.pipe(2) await assert_run(xs, [10, 11]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(10, 20) | add_resource.pipe(1) | slice.pipe(8, None) await assert_run(xs, [18, 19]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(10, 20) | add_resource.pipe(1) | slice.pipe(-3, -1) await assert_run(xs, [17, 18]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(10, 20) | add_resource.pipe(1) | slice.pipe(-5, -1, 2) await assert_run(xs, [15, 17]) @@ -81,25 +78,25 @@ async def test_slice(assert_run, event_loop): @pytest.mark.asyncio -async def test_item(assert_run, event_loop): +async def test_item(assert_run, assert_cleanup): item = stream.select.item - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) | item.pipe(2) await assert_run(xs, [2]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) | item.pipe(-2) await assert_run(xs, [3]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) | item.pipe(10) exception = IndexError( "Index out of range", ) await assert_run(xs, [], exception) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) | item.pipe(-10) exception = IndexError( "Index out of range", @@ -108,40 +105,40 @@ async def test_item(assert_run, event_loop): @pytest.mark.asyncio -async def test_getitem(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_getitem(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) | pipe.getitem(2) await assert_run(xs, [2]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) await assert_run(xs[2], [2]) - with event_loop.assert_cleanup(): + with assert_cleanup(): s = slice(1, 3) xs = stream.range(5) | add_resource.pipe(1) | pipe.getitem(s) await assert_run(xs, [1, 2]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) await assert_run(xs[1:3], [1, 2]) - with event_loop.assert_cleanup(): + with assert_cleanup(): s = slice(1, 5, 2) xs = stream.range(5) | add_resource.pipe(1) | pipe.getitem(s) await assert_run(xs, [1, 3]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5) | add_resource.pipe(1) await assert_run(xs[1:5:2], [1, 3]) with pytest.raises(TypeError): - xs = stream.range(5)[None] + xs = stream.range(5)[None] # type: ignore @pytest.mark.asyncio -async def test_filter(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_filter(assert_run, assert_cleanup): + with assert_cleanup(): xs = ( stream.range(1, 10) | add_resource.pipe(1) @@ -153,67 +150,61 @@ async def afunc(x): await asyncio.sleep(1) return x in [3, 6, 9] - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(1, 10) | add_resource.pipe(1) | pipe.filter(afunc) await assert_run(xs, [3, 6, 9]) - assert event_loop.steps == [1] * 10 + assert loop.steps == [1] * 10 @pytest.mark.asyncio -async def test_until(assert_run, event_loop): - with event_loop.assert_cleanup(): - xs = ( - stream.range(1, 10) - | add_resource.pipe(1) - | pipe.until(lambda x: x == 3) - ) +async def test_until(assert_run, assert_cleanup): + with assert_cleanup(): + xs = stream.range(1, 10) | add_resource.pipe(1) | pipe.until(lambda x: x == 3) await assert_run(xs, [1, 2, 3]) async def afunc(x): await asyncio.sleep(1) return x == 3 - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(1, 10) | add_resource.pipe(1) | pipe.until(afunc) await assert_run(xs, [1, 2, 3]) - assert event_loop.steps == [1] * 4 + assert loop.steps == [1] * 4 @pytest.mark.asyncio -async def test_takewhile(assert_run, event_loop): - with event_loop.assert_cleanup(): - xs = ( - stream.range(1, 10) - | add_resource.pipe(1) - | pipe.takewhile(lambda x: x < 4) - ) +async def test_takewhile(assert_run, assert_cleanup): + def less_than_4(x: int) -> bool: + return x < 4 + + with assert_cleanup(): + xs = stream.range(1, 10) | add_resource.pipe(1) | pipe.takewhile(less_than_4) await assert_run(xs, [1, 2, 3]) async def afunc(x): await asyncio.sleep(1) return x < 4 - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(1, 10) | add_resource.pipe(1) | pipe.takewhile(afunc) await assert_run(xs, [1, 2, 3]) - assert event_loop.steps == [1] * 5 + assert loop.steps == [1] * 5 @pytest.mark.asyncio -async def test_dropwhile(assert_run, event_loop): - with event_loop.assert_cleanup(): - xs = ( - stream.range(1, 10) - | add_resource.pipe(1) - | pipe.dropwhile(lambda x: x < 7) - ) +async def test_dropwhile(assert_run, assert_cleanup): + def less_than_7(x: int) -> bool: + return x < 7 + + with assert_cleanup(): + xs = stream.range(1, 10) | add_resource.pipe(1) | pipe.dropwhile(less_than_7) await assert_run(xs, [7, 8, 9]) async def afunc(x): await asyncio.sleep(1) return x < 7 - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(1, 10) | add_resource.pipe(1) | pipe.dropwhile(afunc) await assert_run(xs, [7, 8, 9]) - assert event_loop.steps == [1] * 8 + assert loop.steps == [1] * 8 diff --git a/tests/test_task_group.py b/tests/test_task_group.py index 75f3ac7..f6fc86c 100644 --- a/tests/test_task_group.py +++ b/tests/test_task_group.py @@ -32,7 +32,7 @@ async def task5(): @pytest.mark.asyncio -async def test_task_group_cleanup(event_loop): +async def test_task_group_cleanup(): async with TaskGroup() as group: t1 = group.create_task(task1()) t2 = group.create_task(task2()) diff --git a/tests/test_time.py b/tests/test_time.py index 7c13e93..0f5690d 100644 --- a/tests/test_time.py +++ b/tests/test_time.py @@ -2,21 +2,17 @@ import asyncio from aiostream import stream, pipe -from aiostream.test_utils import assert_run, event_loop - -# Pytest fixtures -assert_run, event_loop @pytest.mark.asyncio -async def test_timeout(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_timeout(assert_run, assert_cleanup): + with assert_cleanup() as loop: xs = stream.range(3) | pipe.timeout(5) await assert_run(xs, [0, 1, 2]) - assert event_loop.steps == [] + assert loop.steps == [] - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(3) + stream.never() ys = xs | pipe.timeout(1) await assert_run(ys, [0, 1, 2], asyncio.TimeoutError()) - assert event_loop.steps == [1] + assert loop.steps == [1] diff --git a/tests/test_transform.py b/tests/test_transform.py index c450064..817fc79 100644 --- a/tests/test_transform.py +++ b/tests/test_transform.py @@ -2,71 +2,69 @@ import asyncio from aiostream import stream, pipe -from aiostream.test_utils import assert_run, event_loop, add_resource - -# Pytest fixtures -assert_run, event_loop +from aiostream.test_utils import add_resource @pytest.mark.asyncio -async def test_starmap(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_starmap(assert_run, assert_cleanup): + def target(a: int, b: int, *_) -> int: + return a + b + + with assert_cleanup(): xs = stream.range(5) ys = stream.range(5) - zs = xs | pipe.zip(ys) | pipe.starmap(lambda x, y: x + y) + zs = xs | pipe.zip(ys) | pipe.starmap(target) expected = [x * 2 for x in range(5)] await assert_run(zs, expected) - with event_loop.assert_cleanup(): + async def async_target(arg: float, result: int, *_) -> int: + return await asyncio.sleep(arg, result) + + with assert_cleanup() as loop: xs = stream.range(1, 4) ys = stream.range(1, 4) - zs = xs | pipe.zip(ys) | pipe.starmap(asyncio.sleep) + zs = xs | pipe.zip(ys) | pipe.starmap(async_target) await assert_run(zs, [1, 2, 3]) - assert event_loop.steps == [1, 1, 1] + assert loop.steps == [1, 1, 1] - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.range(1, 4) ys = stream.range(1, 4) - zs = xs | pipe.zip(ys) | pipe.starmap(asyncio.sleep, task_limit=1) + zs = xs | pipe.zip(ys) | pipe.starmap(async_target, task_limit=1) await assert_run(zs, [1, 2, 3]) - assert event_loop.steps == [1, 2, 3] + assert loop.steps == [1, 2, 3] @pytest.mark.asyncio -async def test_cycle(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_cycle(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.empty() | pipe.cycle() | pipe.timeout(1) await assert_run(xs, [], asyncio.TimeoutError()) - with event_loop.assert_cleanup(): - xs = ( - stream.empty() - | add_resource.pipe(1) - | pipe.cycle() - | pipe.timeout(1) - ) + with assert_cleanup(): + xs = stream.empty() | add_resource.pipe(1) | pipe.cycle() | pipe.timeout(1) await assert_run(xs, [], asyncio.TimeoutError()) - with event_loop.assert_cleanup(): + with assert_cleanup() as loop: xs = stream.just(1) | add_resource.pipe(1) | pipe.cycle() await assert_run(xs[:5], [1] * 5) - assert event_loop.steps == [1] * 5 + assert loop.steps == [1] * 5 @pytest.mark.asyncio -async def test_chunks(assert_run, event_loop): - with event_loop.assert_cleanup(): +async def test_chunks(assert_run, assert_cleanup): + with assert_cleanup(): xs = stream.range(3, interval=1) | pipe.chunks(3) await assert_run(xs, [[0, 1, 2]]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(4, interval=1) | pipe.chunks(3) await assert_run(xs, [[0, 1, 2], [3]]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.range(5, interval=1) | pipe.chunks(3) await assert_run(xs, [[0, 1, 2], [3, 4]]) - with event_loop.assert_cleanup(): + with assert_cleanup(): xs = stream.count(interval=1) | add_resource.pipe(1) | pipe.chunks(3) await assert_run(xs[:1], [[0, 1, 2]])