Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Turn a "cold" stream to a "hot" one? #35

Open
discosultan opened this issue Dec 29, 2018 · 9 comments
Open

[Question] Turn a "cold" stream to a "hot" one? #35

discosultan opened this issue Dec 29, 2018 · 9 comments

Comments

@discosultan
Copy link

I went over the documentation but couldn't find any operators to turn a "cold" stream to a "hot" one. For example, given the following code:

import asyncio
from aiostream import stream

async def main():
    xs = stream.count(interval=1)

    async def worker(nr):
        async for val in xs:
            print(f'w{nr}: {val}')

    t1 = asyncio.create_task(worker(1))
    await asyncio.sleep(1.5)
    t2 = asyncio.create_task(worker(2))

    await asyncio.gather(t1, t2)

asyncio.run(main())

Both tasks create a new iterable, resulting in the following output:

w1 0  # t=0
w1 1  # t=1
w2 0  # t=1.5
w1 2  # t=2
w2 1  # t=2.5
w1 3  # t=3

However, I'd like the second task to hook into the same stream as the first one, resulting in the following output:

w1 0  # t=0
w1 1  # t=1
w1 2  # t=2
w2 2  # t=2
w1 3  # t=3
w2 3  # t=3

Does aiostream provide such facilities (similar to rxjs share operator) and if not, what would be the best approach to implement one?


As a side note, thanks for this library! The documentation as well as the source code look very elegant.

@vxgmichel
Copy link
Owner

Thanks @discosultan!

This is a tricky topic. I'd been thinking about this a while ago but I couldn't come up with anything satisfying. I can try to explain what my opinion is at the moment.

By design, asynchronous generators in python feature a back-pressure mechanism: the generator code only runs when the __anext__ method is called, so the consumer has to finish processing the previous item before it can ask for a new one. It is quite naive but simpler to maintain because no matter how many generators one chains, only a single part of the chain is running at a time. They're asynchronous in the sense that one can run two chains concurrently, but the chain itself runs sequentially.

This is described as the "pull model" by the reactive X introduction, which itself implements the so-called "push model". I'm not a reactive X expert but as far as I understand, the concept of hot observables fits well with the push model since there is already a context for running the observables. That is not the case for python asynchronous generators (and aiostream), so there would need to be an extra task specifically dedicated to the producing of items from the stream.

This could be wrapped and exposed via an async context manager, as shown in this implementation I came up with. I think it works fine but it's still quite different from regular aiostream operators. In the included example, the chain is:

count operator -> hotstream context -> print operator

This seems usable and acceptable, but I'm still a bit confused. For the moment, I can't really tell if it conceptually makes sense. What I like about it is that this hotstream context can be used as a hub, to connect different pipelines to a single source:

pipeline A -> hotstream context -> pipeline X
                                -> pipeline Y

@discosultan You seem to be more familiar with Rx than I am, so I'd be very curious to hear your opinion on the topic :)

@jamesstidard
Copy link

Realise this might be a little off topic, but I took the example hotstream implementation and I'm using it to broadcast events to a web app. Been working spot on, apart from one thing I've not been able to figure out. I want to guard the stream from a asyncio.CancelledError. I think I want to add something like asyncio.shield but for this stream.

This happens because I'm using Sanic WebSockets which are fed asyncio.CancelledError on a disconnect. This causes my hot stream to go... cold... (sorry).

Here's what I have:

from sanic import Sanic
from sanic.response import json

import asyncio

... <hotstreamer here> ...

app = Sanic()

@app.websocket("/")
async def test(request, ws):
    async for i in request.app.hot:
        await ws.send(str(i))

@app.listener('before_server_start')
async def before_server_start(app, loop):
    xs = stream.count(interval=1)
    app.hot = await hotstream(xs).stream().__aenter__()

@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    await app.hot.__aexit__()

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

Connecting to that web socket will produce a count for how long the server has been running, but on disconnect it will raise the CancellationError within the xs' stream and kill it. I found this by reimplementing the count function as a async generator and catching the CancelledError execution.

I could suppress the CancelledError error in the counter implementation, though that would prevent the await app.hot.__aexit__() from being able to kill the generator as well.

My thought is to use something like asyncio.shield, though this doesn't appear to work on async generators. But here's an example of how I thought it might look:

...

@app.listener('before_server_start')
async def before_server_start(app, loop):
    xs = stream.count(interval=1)
    # keep hold of unshielded asyncgen for cleanup on shutdown
    app._hot = await hotstream(xs).stream().__aenter__()
    # shield the asyncgen from cancelledError for lifetime of server
    app.hot = asyncio.shield(app._hot)

@app.listener('after_server_stop')
async def after_server_stop(app, loop):
    # cleanup via unshielded access
    await app._hot.__aexit__()

...

Would appreciate any pointers on how to tackle this - been beating my head against it.

Thanks.

@vxgmichel
Copy link
Owner

vxgmichel commented Apr 24, 2019

Hi @jamesstidard ,

I took the example hotstream implementation and I'm using it to broadcast events to a web app

That's an interesting use case!

This happens because I'm using Sanic WebSockets which are fed asyncio.CancelledError on a disconnect.

It turns out to be related to a flaw in my gist: the cancellation is bubbling up to the background task instead of simply cancelling the hot async iterator. This can fixed by updating its __anext__ method:

class HotAsyncIterator:
    [...]
    async def __anext__(self):
        return await asyncio.shield(self.queue.popleft())

See the gist revisions.

@jamesstidard
Copy link

jamesstidard commented Apr 24, 2019

@vxgmichel Ah, I was somewhat on the right tracks. Still not massively confident with asyncio mechanics.

Here's the actual use case, if you're interested. I'm running a redis server for publish and subscribe along with that kind of pubsub model for WebSocket clients to the server. I had implemented this in a kinda cheap way by making a new connection - and holding it open - for every subscription that every clients makes to redis. This quickly breaks the number of max concurrent connections (at least on Heroku's free tier). So, this allows me to have a single connection per server process which subscribes to all notifications and instead have all those client subscriptions filtered out via that hotstream. Basically just shifting the many open subscriptions for python to handle instead of redis.

Anyway, I very much appreciate the response. I'll give this a try this evening. Also thanks for the library, it has helped me produce much more readable code then I would have otherwise.

@vxgmichel
Copy link
Owner

@jamesstidard

Ah, I was somewhat on the right tracks. Still not massively confident with asyncio mechanics.

Well this idea that being cancelled while awaiting a background task also cancels the background task is quite confusing in my opinion. I wonder if this has been discussed somewhere or if it is just the consequence of asyncio.Task inheriting from asyncio.Future 🤔

Here's the actual use case [...]

Interesting! So do you use aiostream operators between redis and the hot stream, between the hot stream and the websocket, or both?

@jamesstidard
Copy link

jamesstidard commented Apr 25, 2019

So do you use aiostream operators between redis and the hot stream, between the hot stream and the websocket, or both?

Currently it's looking to be redis (1) -> hotstream (1) -> operators (m) -> WebSocket (m). So a single stream which is subscribed to updates published on Redis and then many operators filtering etc broadcasts back to connected WebSocket clients.

I did need to make the queues in the HotStream class asyncio friendly as I was encountering self.queue.popleft() raising a IndexError if it beat the broadcaster to the punch.

Here's the altered source if it passes a sanity check:

import asyncio

from aiostream import streamcontext
from aiostream.aiter_utils import anext
from aiostream.core import Stream, Streamer


async def cancel_and_join(task):
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass


class HotAsyncIterator:

    def __init__(self, queue):
        self.queue = queue

    def __aiter__(self):
        return self

    async def __anext__(self):
        next_ = await self.queue.get()
        value = await asyncio.shield(next_)
        return value


class HotStreamer(Streamer):

    def __init__(self, source, maxlen=1):
        self.source = source
        self.maxlen = maxlen

        self.queues = []
        self.task = None
        self.future = None
        self.started = asyncio.Event()

    async def __aenter__(self):
        self.task = asyncio.create_task(self._target())
        await self.started.wait()
        return self

    async def __aexit__(self, *args):
        await self.aclose()

    async def _target(self):
        async with streamcontext(self.source) as streamer:
            while True:
                try:
                    coro = anext(streamer)
                    self.future = asyncio.create_task(coro)
                    for queue in self.queues:
                        if queue.full():
                            _ = queue.get_nowait()
                        queue.put_nowait(self.future)
                    self.started.set()
                    await self.future
                except Exception:
                    break
                finally:
                    await cancel_and_join(self.future)

    def __aiter__(self):
        queue = asyncio.Queue(maxsize=self.maxlen)
        queue.put_nowait(self.future)
        self.queues.append(queue)
        return HotAsyncIterator(queue)

    async def aclose(self):
        await cancel_and_join(self.task)


class HotStream(Stream):

    def __init__(self, source, maxlen=1):
        self.source = source
        self.maxlen = maxlen

    def __aiter__(self):
        return HotStreamer(self.source, self.maxlen)


def hotstream(*args, **kwargs):
    return HotStream(*args, **kwargs)

@vxgmichel
Copy link
Owner

Currently it's looking to be redis (1) -> hotstream (1) -> operators (m) -> WebSocket (m)

Great, thanks for the info!

I did need to make the queues in the HotStream class asyncio friendly as I was encountering self.queue.popleft() raising a IndexError if it beat the broadcaster to the punch.

Right, I updated the gist with your changes. I'm still wondering whether the hotstream idea is mature enough to go into the lib or not 🤔

@jamesstidard
Copy link

jamesstidard commented Apr 29, 2019

I'm still wondering whether the hotstream idea is mature enough to go into the lib or not 🤔

Yeah, can't really say. It's certainly something that I've found useful for this server-broadcasts publish/subscribe type cases. It's worked out pretty well I'd probably use it in future projects, so having it in aiostream vs me copy and pasting it around would be be welcomed 😄.

The hotstream thing is a concept in rx as well - if memory serves. So should play nice with the rest of the operators and stuff aiostream has. So think the idea of it is pretty safely within the general scope.

@vxgmichel
Copy link
Owner

The hotstream thing is a concept in rx as well - if memory serves. So should play nice with the rest of the operators and stuff aiostream has. So think the idea of it is pretty safely within the general scope.

Alright, I'm convinced. I don't have the time to integrate this feature at the moment, so I'll leave this issue open as a reminder. Then we'll have to:

  • settle on a naming and interface
  • polish the implementation
  • write some tests
  • write a bit of documentation

In any case, thanks @jamesstidard for the feedback, it's much appreciated 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants