Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative to the pipe syntax #52

Open
vxgmichel opened this issue Dec 18, 2019 · 1 comment
Open

Alternative to the pipe syntax #52

vxgmichel opened this issue Dec 18, 2019 · 1 comment

Comments

@vxgmichel
Copy link
Owner

The current pipe | syntax

For the reference, here's how a long pipeline looks once blackified:

    xs = (
        stream.count(interval=0.1)
        | pipe.skip(10)
        | pipe.take(5)
        | pipe.filter(lambda x: x % 2)
        | pipe.map(lambda x: x ** 2)
        | pipe.accumulate()
    )

The aiostream pipe syntax is often (and legitimately) seen as magical. Conceptually though, it's quite simple: the pipe combinators (in aiostream.pipe.*) are just curried and flipped version of the standard combinators (in aiostream.stream.*). For instance:

ys = stream.map(xs, lambda x: x ** 2)
# is equivalent to
ys = pipe.map(lambda x: x ** 2)(xs)

Moreover, the pipe syntax is simply defined as regular function composition:

ys = xs | f | g | h
# is equivalent to
ys = h(g(f(xs)))

Combining those two ideas, we get:

ys = stream.map(xs, lambda x: x ** 2)
# is equivalent to
ys = xs | pipe.map(lambda x: x ** 2)

That's neat but we can't really expect to convince every one that uses aiostream that "it's not magical, it's simply curried and flipped combinators with syntactic sugar for function composition".

Another issue is that the pipe operator precedence does not play well with the await statement:

await stream.range(3) | pipe.print()
# is equivalent to
(await steam.range(3)) | pipe.print()
# which produces a type error

Alternative 1, the toolz way

For the reference, here's how a long pipeline would look once blackified:

    xs = pipe(
        stream.count(interval=0.1),
        pipe.skip(10),
        pipe.take(5),
        pipe.filter(lambda x: x % 2),
        pipe.map(lambda x: x ** 2),
        pipe.accumulate(),
    )

Pros:

  • Simple and explicit
  • Backward compatible

Cons:

  • Still requires a pipe namespace for flipped combinators
  • An extra pipe function has to be added, and the name conflicts with the pipe namespace

Alternative 2, the rust way

For the reference, here's how a long pipeline would look once blackified (the parentheses have to be added explicitly though):

    xs = (
        stream.count(interval=0.1)
        .skip(10)
        .take(5)
        .filter(lambda x: x % 2)
        .map(lambda x: x ** 2)
        .accumulate()
    )

Pros:

  • Do not require a pipe scope for flipped combinators, methods already do that
  • May be written in a backward compatible way

Cons:

  • How to expose user-defined combinators as methods? Using inheritance?
  • Extra parentheses need to be added for black to work properly on long pipelines
@vxgmichel vxgmichel changed the title Alternative to the pipe | syntax Alternative to the pipe syntax Dec 18, 2019
@reuben
Copy link

reuben commented Oct 4, 2024

I'd vote for option 2: it's less verbose (no need to keep repeating the pipe. module name) and familiar to people coming from languages where the builder pattern is common (JS/TS, Rust, Java, Kotlin, ...). The parenthesis issue is alleviated by code editors inserting them automatically. For user defined combinators, maybe that can be solved with inheritance plus a generic entrypoint like Stream.combine below:

from collections import OrderedDict
from typing import Callable, Concatenate, ParamSpec, TypeVar

from typing_extensions import Self

T = TypeVar("T")
P = ParamSpec("P")

CustomCombinatorT = TypeVar("CustomCombinatorT", bound="Stream")


class Stream:
    def combine(
        self,
        combinator: Callable[Concatenate["Stream", P], CustomCombinatorT],
        *args: P.args,
        **kwargs: P.kwargs,
    ) -> CustomCombinatorT:
        return combinator(self, *args, **kwargs)


def some_combinator(stream: Stream, arg1: int) -> Stream:
    return stream


def invalid_combinator(stream: Stream) -> OrderedDict:
    return OrderedDict()


class CustomStream(Stream):
    def __init__(self, stream: Stream):
        self.stream = stream

    def some_custom_method(self, foo: str) -> Self:
        return self


def to_custom_stream(stream: Stream) -> CustomStream:
    return CustomStream(stream)


xs = (
    Stream()
    .combine(some_combinator, 10)  # OK, returns Stream
    .combine(to_custom_stream)  # OK, returns CustomStream
    .some_custom_method("a")  # OK, custom method can now be called directly
    .combine(
        some_combinator, "foo"
    )  # doesn't type check, combinator args are type checked
    .combine(
        invalid_combinator
    )  # doesn't type check, returned type must inherit from Stream
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants