Skip to content

Commit

Permalink
Properly expose pipe methods in aiostream.pipe (PR #88, issue #87)
Browse files Browse the repository at this point in the history
  • Loading branch information
vxgmichel authored Oct 24, 2023
2 parents 9b53329 + 4db7c26 commit 0688e37
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 44 deletions.
5 changes: 3 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ repos:
hooks:
- id: flake8
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
rev: v1.6.1
hooks:
- id: mypy
files: ^aiostream/
files: ^(?!tests)
types: [python]
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.0.272
Expand Down
12 changes: 7 additions & 5 deletions aiostream/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ def __await__(self) -> Generator[Any, None, T]:
"""
return wait_stream(self).__await__()

def __or__(self, func: Callable[[BaseStream[T]], BaseStream[X]]) -> BaseStream[X]:
def __or__(self, func: Callable[[BaseStream[T]], X]) -> X:
"""Pipe protocol.
Allow to pipe stream operators.
"""
return func(self)

def __add__(self, value: BaseStream[X]) -> BaseStream[Union[X, T]]:
def __add__(self, value: AsyncIterable[X]) -> Stream[Union[X, T]]:
"""Addition protocol.
Concatenate with a given asynchronous sequence.
Expand All @@ -125,7 +125,7 @@ def __add__(self, value: BaseStream[X]) -> BaseStream[Union[X, T]]:

return chain(self, value)

def __getitem__(self, value: Union[int, slice]) -> BaseStream[T]:
def __getitem__(self, value: Union[int, slice]) -> Stream[T]:
"""Get item protocol.
Accept index or slice to extract the corresponding item(s)
Expand Down Expand Up @@ -276,7 +276,9 @@ def raw(
) -> AsyncIterator[T]:
...

def pipe(self, source: AsyncIterable[A]) -> Stream[T]:
def pipe(
self, *args: P.args, **kwargs: P.kwargs
) -> Callable[[AsyncIterable[A]], Stream[T]]:
...


Expand Down Expand Up @@ -555,7 +557,7 @@ def pipe(
"__doc__": doc,
"raw": staticmethod(raw),
"original": staticmethod(original),
"pipe": classmethod(pipe),
"pipe": classmethod(pipe), # type: ignore[arg-type]
}

# Create operator class
Expand Down
50 changes: 34 additions & 16 deletions aiostream/pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,37 @@

from . import stream

__all__: list[str] = []


def update_pipe_module() -> None:
"""Populate the pipe module dynamically."""
module_dir = __all__
operators = stream.__dict__
for key, value in operators.items():
if getattr(value, "pipe", None):
globals()[key] = value.pipe
if key not in module_dir:
module_dir.append(key)


# Populate the module
update_pipe_module()
accumulate = stream.accumulate.pipe
action = stream.action.pipe
amap = stream.amap.pipe
chain = stream.chain.pipe
chunks = stream.chunks.pipe
concat = stream.concat.pipe
concatmap = stream.concatmap.pipe
cycle = stream.cycle.pipe
delay = stream.delay.pipe
dropwhile = stream.dropwhile.pipe
enumerate = stream.enumerate.pipe
filter = stream.filter.pipe
flatmap = stream.flatmap.pipe
flatten = stream.flatten.pipe
getitem = stream.getitem.pipe
list = stream.list.pipe
map = stream.map.pipe
merge = stream.merge.pipe
print = stream.print.pipe
reduce = stream.reduce.pipe
skip = stream.skip.pipe
skiplast = stream.skiplast.pipe
smap = stream.smap.pipe
spaceout = stream.spaceout.pipe
starmap = stream.starmap.pipe
switch = stream.switch.pipe
switchmap = stream.switchmap.pipe
take = stream.take.pipe
takelast = stream.takelast.pipe
takewhile = stream.takewhile.pipe
timeout = stream.timeout.pipe
until = stream.until.pipe
zip = stream.zip.pipe
ziplatest = stream.ziplatest.pipe
8 changes: 6 additions & 2 deletions examples/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from aiostream import pipe, stream


async def main():
def square(x: int, *_: int) -> int:
return x**2


async def main() -> None:
# Create a counting stream with a 0.2 seconds interval
xs = stream.count(interval=0.2)

# Operators can be piped using '|'
ys = xs | pipe.map(lambda x: x**2)
ys = xs | pipe.map(square)

# Streams can be sliced
zs = ys[1:10:2]
Expand Down
13 changes: 9 additions & 4 deletions examples/extra.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@
import asyncio
import random as random_module
from typing import AsyncIterable, AsyncIterator

from aiostream import operator, pipable_operator, pipe, streamcontext


@operator
async def random(offset=0.0, width=1.0, interval=0.1):
async def random(
offset: float = 0.0, width: float = 1.0, interval: float = 0.1
) -> AsyncIterator[float]:
"""Generate a stream of random numbers."""
while True:
await asyncio.sleep(interval)
yield offset + width * random_module.random()


@pipable_operator
async def power(source, exponent):
async def power(
source: AsyncIterable[float], exponent: float | int
) -> AsyncIterator[float]:
"""Raise the elements of an asynchronous sequence to the given power."""
async with streamcontext(source) as streamer:
async for item in streamer:
yield item**exponent


@pipable_operator
def square(source):
def square(source: AsyncIterable[float]) -> AsyncIterator[float]:
"""Square the elements of an asynchronous sequence."""
return power.raw(source, 2)


async def main():
async def main() -> None:
xs = (
random() # Stream random numbers
| square.pipe() # Square the values
Expand Down
21 changes: 12 additions & 9 deletions examples/norm_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
[...]
"""

import math
import asyncio

from aiostream import pipe, stream
Expand All @@ -40,22 +41,24 @@
# Client handler


async def euclidean_norm_handler(reader, writer):
async def euclidean_norm_handler(
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
# Define lambdas
def strip(x):
def strip(x: bytes, *_: object) -> str:
return x.decode().strip()

def nonempty(x):
def nonempty(x: str) -> bool:
return x != ""

def square(x):
def square(x: float, *_: object) -> float:
return x**2

def write_cursor(x):
def write_cursor(_: float) -> None:
return writer.write(b"> ")

def square_root(x):
return x**0.5
def square_root(x: float, *_: object) -> float:
return math.sqrt(x)

# Create awaitable
handle_request = (
Expand All @@ -67,7 +70,7 @@ def square_root(x):
| pipe.map(square)
| pipe.print("square: {:.2f}")
| pipe.action(write_cursor)
| pipe.accumulate(initializer=0)
| pipe.accumulate(initializer=0.0)
| pipe.map(square_root)
| pipe.print("norm -> {:.2f}")
)
Expand All @@ -86,7 +89,7 @@ def square_root(x):
# Main function


async def main(bind="127.0.0.1", port=8888):
async def main(bind: str = "127.0.0.1", port: int = 8888) -> None:
# Start the server
server = await asyncio.start_server(euclidean_norm_handler, bind, port)

Expand Down
5 changes: 3 additions & 2 deletions examples/preserve.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
from typing import AsyncIterator

from aiostream import operator, stream


async def main():
async def agen():
async def main() -> None:
async def agen() -> AsyncIterator[int]:
yield 1
yield 2
yield 3
Expand Down
14 changes: 11 additions & 3 deletions examples/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@
from aiostream import pipe, stream


async def main():
def is_odd(x: int) -> bool:
return x % 2 == 1


def square(x: int, *_: object) -> int:
return x**2


async def main() -> None:
# This stream computes 11² + 13² in 1.5 second
xs = (
stream.count(interval=0.1) # Count from zero every 0.1 s
| pipe.skip(10) # Skip the first 10 numbers
| pipe.take(5) # Take the following 5
| pipe.filter(lambda x: x % 2) # Keep odd numbers
| pipe.map(lambda x: x**2) # Square the results
| pipe.filter(is_odd) # Keep odd numbers
| pipe.map(square) # Square the results
| pipe.accumulate() # Add the numbers together
)
print("11² + 13² = ", await xs)
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ ignore = F401, F403, E731, W503, E501, E203

[mypy]
strict = True
packages = aiostream
packages = aiostream, examples

[mypy-aiostream.test_utils]
ignore_errors = True
10 changes: 10 additions & 0 deletions tests/test_pipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from aiostream import stream, pipe


def test_pipe_module():
for name in dir(stream):
obj = getattr(stream, name)
pipe_method = getattr(obj, "pipe", None)
if pipe_method is None:
continue
assert getattr(pipe, name) == pipe_method

0 comments on commit 0688e37

Please sign in to comment.