From 46a5426e8379fdb6056751631de1a3e4c36490a5 Mon Sep 17 00:00:00 2001 From: Vincent Michel Date: Mon, 6 May 2024 21:57:39 +0200 Subject: [PATCH] Operator as object --- aiostream/core.py | 217 +++++++++++++++++++++------------------------- pyproject.toml | 1 + 2 files changed, 101 insertions(+), 117 deletions(-) diff --git a/aiostream/core.py b/aiostream/core.py index 666f0e8..9beef95 100644 --- a/aiostream/core.py +++ b/aiostream/core.py @@ -17,7 +17,6 @@ Protocol, Union, TypeVar, - cast, AsyncIterable, Awaitable, ) @@ -258,23 +257,26 @@ def streamcontext(aiterable: AsyncIterable[T]) -> Streamer[T]: # Operator type protocol -class OperatorType(Protocol[P, T]): +class Operator(Protocol[P, T]): def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]: ... - def raw(self, *args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]: ... + @staticmethod + def raw(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]: ... -class PipableOperatorType(Protocol[A, P, T]): +class PipableOperator(Protocol[A, P, T]): def __call__( self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs ) -> Stream[T]: ... + @staticmethod def raw( - self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs + source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs ) -> AsyncIterator[T]: ... + @staticmethod def pipe( - self, *args: P.args, **kwargs: P.kwargs + *args: P.args, **kwargs: P.kwargs ) -> Callable[[AsyncIterable[A]], Stream[T]]: ... @@ -284,7 +286,7 @@ def pipe( def operator( func: Callable[P, AsyncIterator[T]] | None = None, pipable: bool | None = None, -) -> OperatorType[P, T]: +) -> Operator[P, T]: """Create a stream operator from an asynchronous generator (or any function returning an asynchronous iterable). @@ -330,7 +332,6 @@ async def random(offset=0., width=1.): ) # Gather data - bases = (Stream,) name = func.__name__ module = func.__module__ extra_doc = func.__doc__ @@ -345,49 +346,46 @@ async def random(offset=0., width=1.): "since the decorated function becomes an operator class" ) - # Injected parameters - self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD) - inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD) - # Wrapped static method - original = func - original.__qualname__ = name + ".original" + original_func = func + original_func.__qualname__ = name + ".original" # Raw static method - raw = func - raw.__qualname__ = name + ".raw" + raw_func = func + raw_func.__qualname__ = name + ".raw" - # Init method - def init(self: BaseStream[T], *args: P.args, **kwargs: P.kwargs) -> None: - factory = functools.partial(raw, *args, **kwargs) - return BaseStream.__init__(self, factory) + # Gather attributes + class OperatorImpl: + __qualname__ = name + __module__ = module + __doc__ = doc - # Customize init signature - new_parameters = [self_parameter] + parameters - init.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined] + original = staticmethod(original_func) - # Customize init method - init.__qualname__ = name + ".__init__" - init.__name__ = "__init__" - init.__module__ = module - init.__doc__ = f"Initialize the {name} stream." + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]: + factory = functools.partial(raw_func, *args, **kwargs) + return Stream(factory) - # Gather attributes - attrs = { - "__init__": init, - "__module__": module, - "__doc__": doc, - "raw": staticmethod(raw), - "original": staticmethod(original), - } + @staticmethod + def raw(*args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]: + return raw_func(*args, **kwargs) + + # Customize call method + self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD) + new_parameters = [self_parameter] + parameters + OperatorImpl.__call__.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined] + OperatorImpl.__call__.__qualname__ = name + ".__call__" + OperatorImpl.__call__.__name__ = "__call__" + OperatorImpl.__call__.__module__ = module + OperatorImpl.__call__.__doc__ = f"Initialize the {name} stream." # Create operator class - return cast("OperatorType[P, T]", type(name, bases, attrs)) + return OperatorImpl() def pipable_operator( func: Callable[Concatenate[AsyncIterable[X], P], AsyncIterator[T]], -) -> PipableOperatorType[X, P, T]: +) -> PipableOperator[X, P, T]: """Create a pipable stream operator from an asynchronous generator (or any function returning an asynchronous iterable). @@ -441,7 +439,6 @@ def double(source): ) # Gather data - bases = (Stream,) name = func.__name__ module = func.__module__ extra_doc = func.__doc__ @@ -456,6 +453,13 @@ def double(source): "since the decorated function becomes an operator class" ) + # Check for positional first parameter + if not parameters or parameters[0].kind not in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ): + raise ValueError("The first parameter of the operator must be positional") + # Look for "more_sources" for i, p in enumerate(parameters): if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL: @@ -464,89 +468,68 @@ def double(source): else: more_sources_index = None - # Injected parameters - self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD) - cls_parameter = inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD) - # Wrapped static method - original = func - original.__qualname__ = name + ".original" + original_func = func + original_func.__qualname__ = name + ".original" - # Raw static method - def raw( - arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs - ) -> AsyncIterator[T]: - assert_async_iterable(arg) - if more_sources_index is not None: - for source in args[more_sources_index - 1 :]: - assert_async_iterable(source) - return func(arg, *args, **kwargs) - - # Custonize raw method - raw.__signature__ = signature # type: ignore[attr-defined] - raw.__qualname__ = name + ".raw" - raw.__module__ = module - raw.__doc__ = doc - - # Init method - def init( - self: BaseStream[T], arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs - ) -> None: - assert_async_iterable(arg) - if more_sources_index is not None: - for source in args[more_sources_index - 1 :]: - assert_async_iterable(source) - factory = functools.partial(raw, arg, *args, **kwargs) - return BaseStream.__init__(self, factory) - - # Customize init signature + # Gather attributes + class PipableOperatorImpl: + __qualname__ = name + __module__ = module + __doc__ = doc + + original = staticmethod(original_func) + + @staticmethod + def raw( + arg: AsyncIterable[X], /, *args: P.args, **kwargs: P.kwargs + ) -> AsyncIterator[T]: + assert_async_iterable(arg) + if more_sources_index is not None: + for source in args[more_sources_index - 1 :]: + assert_async_iterable(source) + return func(arg, *args, **kwargs) + + def __call__( + self, arg: AsyncIterable[X], /, *args: P.args, **kwargs: P.kwargs + ) -> Stream[T]: + assert_async_iterable(arg) + if more_sources_index is not None: + for source in args[more_sources_index - 1 :]: + assert_async_iterable(source) + factory = functools.partial(self.raw, arg, *args, **kwargs) + return Stream(factory) + + @staticmethod + def pipe( + *args: P.args, + **kwargs: P.kwargs, + ) -> Callable[[AsyncIterable[X]], Stream[T]]: + return lambda source: operator_instance(source, *args, **kwargs) + + # Customize raw method + PipableOperatorImpl.raw.__signature__ = signature # type: ignore[attr-defined] + PipableOperatorImpl.raw.__qualname__ = name + ".raw" + PipableOperatorImpl.raw.__module__ = module + PipableOperatorImpl.raw.__doc__ = doc + + # Customize call method + self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD) new_parameters = [self_parameter] + parameters - init.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined] - - # Customize init method - init.__qualname__ = name + ".__init__" - init.__name__ = "__init__" - init.__module__ = module - init.__doc__ = f"Initialize the {name} stream." - - # Pipe class method - def pipe( - cls: PipableOperatorType[X, P, T], - /, - *args: P.args, - **kwargs: P.kwargs, - ) -> Callable[[AsyncIterable[X]], Stream[T]]: - return lambda source: cls(source, *args, **kwargs) - - # Customize pipe signature - if parameters and parameters[0].kind in ( - inspect.Parameter.POSITIONAL_ONLY, - inspect.Parameter.POSITIONAL_OR_KEYWORD, - ): - new_parameters = [cls_parameter] + parameters[1:] - else: - raise ValueError("The first parameter of the operator must be positional") - pipe.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined] + PipableOperatorImpl.__call__.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined] + PipableOperatorImpl.__call__.__qualname__ = name + ".__call__" + PipableOperatorImpl.__call__.__name__ = "__call__" + PipableOperatorImpl.__call__.__module__ = module + PipableOperatorImpl.__call__.__doc__ = f"Initialize the {name} stream." # Customize pipe method - pipe.__qualname__ = name + ".pipe" - pipe.__module__ = module - pipe.__doc__ = f'Pipable "{name}" stream operator.' + PipableOperatorImpl.pipe.__signature__ = signature.replace(parameters=parameters[1:]) # type: ignore[attr-defined] + PipableOperatorImpl.pipe.__qualname__ = name + ".pipe" + PipableOperatorImpl.pipe.__module__ = module + PipableOperatorImpl.pipe.__doc__ = f'Pipable "{name}" stream operator.' if extra_doc: - pipe.__doc__ += "\n\n " + extra_doc - - # Gather attributes - attrs = { - "__init__": init, - "__module__": module, - "__doc__": doc, - "raw": staticmethod(raw), - "original": staticmethod(original), - "pipe": classmethod(pipe), # type: ignore[arg-type] - } + PipableOperatorImpl.pipe.__doc__ += "\n\n " + extra_doc # Create operator class - return cast( - "PipableOperatorType[X, P, T]", - type(name, bases, attrs), - ) + operator_instance = PipableOperatorImpl() + return operator_instance diff --git a/pyproject.toml b/pyproject.toml index dd1c3eb..90002a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ strict = [ "aiostream/manager.py", "aiostream/pipe.py", "aiostream/test_utils.py", + "aiostream/core.py", ] [tool.mypy]