Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore/format/ci: use black stable style 2024; bump flake8 version; add black config to pyproject.toml #100

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/ambv/black
rev: 23.3.0
rev: 24.3.0
hooks:
- id: black
language_version: python3
- repo: https://github.com/pycqa/flake8
rev: 5.0.4
rev: 7.0.0
hooks:
- id: flake8
args:
- --extend-ignore=E704
# https://black.readthedocs.io/en/stable/guides/using_black_with_other_tools.html#flake8
- repo: https://github.com/pre-commit/mirrors-mypy
# We can't use the latest version due to a regression in mypy 1.7.0
# See https://github.com/python/mypy/issues/17191 for more information
Expand Down
17 changes: 13 additions & 4 deletions aiostream/aiter_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Utilities for asynchronous iteration."""

from __future__ import annotations

import sys
Expand Down Expand Up @@ -100,7 +101,9 @@ def assert_async_iterator(obj: object) -> None:
asynchronous iterator.
"""
if not is_async_iterator(obj):
raise TypeError(f"{type(obj).__name__!r} object is not an async iterator")
raise TypeError(
f"{type(obj).__name__!r} object is not an async iterator"
)


# Async iterator context
Expand Down Expand Up @@ -156,7 +159,9 @@ def __anext__(self) -> Awaitable[T]:

async def __aenter__(self: Self) -> Self:
if self._state == self._RUNNING:
raise RuntimeError(f"{type(self).__name__} has already been entered")
raise RuntimeError(
f"{type(self).__name__} has already been entered"
)
if self._state == self._FINISHED:
raise RuntimeError(
f"{type(self).__name__} is closed and cannot be iterated"
Expand Down Expand Up @@ -202,7 +207,9 @@ async def __aexit__(
await self._aiterator.athrow(value)
else:
await self._aiterator.athrow(typ, value, traceback)
raise RuntimeError("Async iterator didn't stop after athrow()")
raise RuntimeError(
"Async iterator didn't stop after athrow()"
)

# Exception has been (most probably) silenced
except StopAsyncIteration as exc:
Expand Down Expand Up @@ -237,7 +244,9 @@ async def aclose(self) -> None:

async def athrow(self, exc: Exception) -> T:
if self._state == self._FINISHED:
raise RuntimeError(f"{type(self).__name__} is closed and cannot be used")
raise RuntimeError(
f"{type(self).__name__} is closed and cannot be used"
)
assert isinstance(self._aiterator, AsyncGenerator)
item: T = await self._aiterator.athrow(exc)
return item
Expand Down
38 changes: 23 additions & 15 deletions aiostream/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Core objects for stream operators."""

from __future__ import annotations

import inspect
Expand Down Expand Up @@ -258,28 +259,23 @@ def streamcontext(aiterable: AsyncIterable[T]) -> Streamer[T]:


class OperatorType(Protocol[P, T]):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]:
...
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]: ...

def raw(self, *args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]:
...
def raw(self, *args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]: ...


class PipableOperatorType(Protocol[A, P, T]):
def __call__(
self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
) -> Stream[T]:
...
) -> Stream[T]: ...

def raw(
self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
) -> AsyncIterator[T]:
...
) -> AsyncIterator[T]: ...

def pipe(
self, *args: P.args, **kwargs: P.kwargs
) -> Callable[[AsyncIterable[A]], Stream[T]]:
...
) -> Callable[[AsyncIterable[A]], Stream[T]]: ...


# Operator decorator
Expand Down Expand Up @@ -350,7 +346,9 @@ async def random(offset=0., width=1.):
)

# Injected parameters
self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
self_parameter = inspect.Parameter(
"self", inspect.Parameter.POSITIONAL_OR_KEYWORD
)
inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)

# Wrapped static method
Expand Down Expand Up @@ -462,15 +460,22 @@ def double(source):

# Look for "more_sources"
for i, p in enumerate(parameters):
if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
if (
p.name == "more_sources"
and p.kind == inspect.Parameter.VAR_POSITIONAL
):
more_sources_index = i
break
else:
more_sources_index = None

# Injected parameters
self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
cls_parameter = inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)
self_parameter = inspect.Parameter(
"self", inspect.Parameter.POSITIONAL_OR_KEYWORD
)
cls_parameter = inspect.Parameter(
"cls", inspect.Parameter.POSITIONAL_OR_KEYWORD
)

# Wrapped static method
original = func
Expand All @@ -494,7 +499,10 @@ def raw(

# Init method
def init(
self: BaseStream[T], arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
self: BaseStream[T],
arg: AsyncIterable[X],
*args: P.args,
**kwargs: P.kwargs,
) -> None:
assert_async_iterable(arg)
if more_sources_index is not None:
Expand Down
5 changes: 4 additions & 1 deletion aiostream/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Provide a context to easily manage several streamers running
concurrently.
"""

from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -119,7 +120,9 @@ async def __aexit__(
self.streamers.clear()
return await self.stack.__aexit__(typ, value, traceback)

async def enter_and_create_task(self, aiter: AsyncIterable[T]) -> Streamer[T]:
async def enter_and_create_task(
self, aiter: AsyncIterable[T]
) -> Streamer[T]:
streamer = streamcontext(aiter)
await streamer.__aenter__()
self.streamers.append(streamer)
Expand Down
1 change: 1 addition & 0 deletions aiostream/pipe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Gather the pipe operators."""

from __future__ import annotations

from . import stream
Expand Down
24 changes: 17 additions & 7 deletions aiostream/stream/advanced.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Advanced operators (to deal with streams of higher order) ."""

from __future__ import annotations

from typing import AsyncIterator, AsyncIterable, TypeVar, Union, cast
Expand Down Expand Up @@ -46,15 +47,17 @@ async def base_combine(

# Safe context
async with StreamerManager[Union[AsyncIterable[T], T]]() as manager:
main_streamer: Streamer[
AsyncIterable[T] | T
] | None = await manager.enter_and_create_task(source)
main_streamer: Streamer[AsyncIterable[T] | T] | None = (
await manager.enter_and_create_task(source)
)

# Loop over events
while manager.tasks:
# Extract streamer groups
substreamers = manager.streamers[1:]
mainstreamers = [main_streamer] if main_streamer in manager.tasks else []
mainstreamers = (
[main_streamer] if main_streamer in manager.tasks else []
)

# Switch - use the main streamer then the substreamer
if switch:
Expand Down Expand Up @@ -84,7 +87,10 @@ async def base_combine(
await manager.clean_streamer(streamer)

# Re-schedule the main streamer if necessary
if main_streamer is not None and main_streamer not in manager.tasks:
if (
main_streamer is not None
and main_streamer not in manager.tasks
):
manager.create_task(main_streamer)

# Process result
Expand Down Expand Up @@ -126,7 +132,9 @@ def concat(

Errors raised in the source or an element sequence are propagated.
"""
return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=True)
return base_combine.raw(
source, task_limit=task_limit, switch=False, ordered=True
)


@pipable_operator
Expand All @@ -141,7 +149,9 @@ def flatten(

Errors raised in the source or an element sequence are propagated.
"""
return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=False)
return base_combine.raw(
source, task_limit=task_limit, switch=False, ordered=False
)


@pipable_operator
Expand Down
10 changes: 9 additions & 1 deletion aiostream/stream/aggregate.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
"""Aggregation operators."""

from __future__ import annotations

import asyncio
import builtins
import operator as op
from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast
from typing import (
AsyncIterator,
Awaitable,
Callable,
TypeVar,
AsyncIterable,
cast,
)


from . import select
Expand Down
21 changes: 12 additions & 9 deletions aiostream/stream/combine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Combination operators."""

from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -75,7 +76,8 @@ async def zip(
async with AsyncExitStack() as stack:
# Handle resources
streamers = [
await stack.enter_async_context(streamcontext(source)) for source in sources
await stack.enter_async_context(streamcontext(source))
for source in sources
]
# Loop over items
while True:
Expand All @@ -93,18 +95,15 @@ async def zip(


class SmapCallable(Protocol[X, Y]):
def __call__(self, arg: X, /, *args: X) -> Y:
...
def __call__(self, arg: X, /, *args: X) -> Y: ...


class AmapCallable(Protocol[X, Y]):
async def __call__(self, arg: X, /, *args: X) -> Y:
...
async def __call__(self, arg: X, /, *args: X) -> Y: ...


class MapCallable(Protocol[X, Y]):
def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y:
...
def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y: ...


@pipable_operator
Expand Down Expand Up @@ -162,7 +161,9 @@ async def func(arg: T, *args: T) -> AsyncIterable[U]:
return advanced.concatmap.raw(
source, func, *more_sources, task_limit=task_limit
)
return advanced.flatmap.raw(source, func, *more_sources, task_limit=task_limit)
return advanced.flatmap.raw(
source, func, *more_sources, task_limit=task_limit
)


@pipable_operator
Expand Down Expand Up @@ -260,7 +261,9 @@ def func(x: T, *_: object) -> dict[int, T]:

return func

new_sources = [smap.raw(source, make_func(i)) for i, source in enumerate(sources)]
new_sources = [
smap.raw(source, make_func(i)) for i, source in enumerate(sources)
]

# Merge the sources
merged = merge.raw(*new_sources)
Expand Down
11 changes: 6 additions & 5 deletions aiostream/stream/create.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Non-pipable creation operators."""

from __future__ import annotations

import sys
Expand Down Expand Up @@ -96,18 +97,18 @@ async def just(value: T) -> AsyncIterator[T]:


class SyncCallable(Protocol[P, Y]):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y:
...
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y: ...


class AsyncCallable(Protocol[P, Y]):
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]:
...
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]: ...


@operator
async def call(
func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
func: SyncCallable[P, T] | AsyncCallable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> AsyncIterator[T]:
"""Call the given function and generate a single value.

Expand Down
14 changes: 12 additions & 2 deletions aiostream/stream/misc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
"""Extra operators."""

from __future__ import annotations

import asyncio
import builtins

from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any
from typing import (
TypeVar,
Awaitable,
Callable,
AsyncIterable,
AsyncIterator,
Any,
)

from .combine import amap, smap
from ..core import pipable_operator
Expand Down Expand Up @@ -44,7 +52,9 @@ async def ainnerfunc(arg: T, *_: object) -> T:
await awaitable
return arg

return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)
return amap.raw(
source, ainnerfunc, ordered=ordered, task_limit=task_limit
)

else:

Expand Down
Loading
Loading