Skip to content

Commit

Permalink
Add task_limit argument to action operator (issue #85, PR #86)
Browse files Browse the repository at this point in the history
  • Loading branch information
vxgmichel authored Sep 26, 2023
2 parents 1f88589 + 7784d1f commit 06c86e2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
2 changes: 1 addition & 1 deletion aiostream/stream/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from . import advanced
from . import aggregate

__all__ = ["chain", "zip", "map", "merge", "ziplatest"]
__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"]

T = TypeVar("T")
U = TypeVar("U")
Expand Down
20 changes: 16 additions & 4 deletions aiostream/stream/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any

from .transform import map
from .combine import amap, smap
from ..core import pipable_operator

__all__ = ["action", "print"]
Expand All @@ -17,12 +17,24 @@

@pipable_operator
def action(
source: AsyncIterable[T], func: Callable[[T], Awaitable[Any] | Any]
source: AsyncIterable[T],
func: Callable[[T], Awaitable[Any] | Any],
ordered: bool = True,
task_limit: int | None = None,
) -> AsyncIterator[T]:
"""Perform an action for each element of an asynchronous sequence
without modifying it.
The given function can be synchronous or asynchronous.
The results can either be returned in or out of order, depending on
the corresponding ``ordered`` argument. This argument is ignored if the
provided function is synchronous.
The coroutines run concurrently but their amount can be limited using
the ``task_limit`` argument. A value of ``1`` will cause the coroutines
to run sequentially. This argument is ignored if the provided function
is synchronous.
"""
if asyncio.iscoroutinefunction(func):

Expand All @@ -32,15 +44,15 @@ async def ainnerfunc(arg: T, *_: object) -> T:
await awaitable
return arg

return map.raw(source, ainnerfunc)
return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)

else:

def innerfunc(arg: T, *_: object) -> T:
func(arg)
return arg

return map.raw(source, innerfunc)
return smap.raw(source, innerfunc)


@pipable_operator
Expand Down

0 comments on commit 06c86e2

Please sign in to comment.