From 7784d1f02fa884f6b23fdf01aa8b757f12259a09 Mon Sep 17 00:00:00 2001 From: Vincent Michel Date: Mon, 25 Sep 2023 20:13:05 +0200 Subject: [PATCH] Add task_limit argument to action operator --- aiostream/stream/combine.py | 2 +- aiostream/stream/misc.py | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/aiostream/stream/combine.py b/aiostream/stream/combine.py index e7990a2..a782a73 100644 --- a/aiostream/stream/combine.py +++ b/aiostream/stream/combine.py @@ -23,7 +23,7 @@ from . import advanced from . import aggregate -__all__ = ["chain", "zip", "map", "merge", "ziplatest"] +__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"] T = TypeVar("T") U = TypeVar("U") diff --git a/aiostream/stream/misc.py b/aiostream/stream/misc.py index aedd59b..1be834e 100644 --- a/aiostream/stream/misc.py +++ b/aiostream/stream/misc.py @@ -6,7 +6,7 @@ from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any -from .transform import map +from .combine import amap, smap from ..core import pipable_operator __all__ = ["action", "print"] @@ -17,12 +17,24 @@ @pipable_operator def action( - source: AsyncIterable[T], func: Callable[[T], Awaitable[Any] | Any] + source: AsyncIterable[T], + func: Callable[[T], Awaitable[Any] | Any], + ordered: bool = True, + task_limit: int | None = None, ) -> AsyncIterator[T]: """Perform an action for each element of an asynchronous sequence without modifying it. The given function can be synchronous or asynchronous. + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. This argument is ignored if the + provided function is synchronous. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. This argument is ignored if the provided function + is synchronous. """ if asyncio.iscoroutinefunction(func): @@ -32,7 +44,7 @@ async def ainnerfunc(arg: T, *_: object) -> T: await awaitable return arg - return map.raw(source, ainnerfunc) + return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit) else: @@ -40,7 +52,7 @@ def innerfunc(arg: T, *_: object) -> T: func(arg) return arg - return map.raw(source, innerfunc) + return smap.raw(source, innerfunc) @pipable_operator