Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to dispatch items from a single source to several concurrent streams? #54

Open
andersea opened this issue Dec 18, 2019 · 2 comments

Comments

@andersea
Copy link

First of all, I would not expect what I am doing to work. I don't think generators are supposed to work like the examples I am going to give.

I am trying to understand the meaning of 'a stream can be streamed multiple times'. See for instance #40

So what I tried was to actually stream the same generator in parallel, just to test how far 'streaming multiple times' goes.

First asyncio version:

import asyncio
import aiostream.stream as aiostream

async def producer():
    i = 1
    while True:
        yield i
        i += 1
        await asyncio.sleep(1)

async def consumer1(stream):
    async with stream.stream() as streamer:
        async for item in streamer:
            print(f'Consumer 1 got: {item}')

async def consumer2(stream):
    async with stream.stream() as streamer:
        async for item in streamer:
            print(f'Consumer 2 got: {item}')

async def main():
    stream = aiostream.iterate(producer())
    await asyncio.gather(
        consumer1(stream),
        consumer2(stream)
    )

asyncio.run(main())

This produces this result:

Consumer 1 got: 1
Consumer 2 got: 2
Consumer 1 got: 3
Consumer 2 got: 4
... continues forever

The trio version, which uses the anyio branch uses trio.sleep() and the main function looks like this:

async def main():
    stream = aiostream.iterate(producer())
    async with trio.open_nursery() as nursery:
        nursery.start_soon(consumer1, stream)
        nursery.start_soon(consumer2, stream)

This produces one result, and then crashes. I have also had it just hang after producing a single result, but I can't reproduce that after the first couple of tries.

As I said in the beginning, I wouldn't expect this to work at all, so I am kind of surprised that asyncio seems to cope.

It would be nice to pin down, what is meant by 'a stream can be streamed multiple times', because the way I see most streams, they are infinite series of items. I attach a stream processer to this infinite series and I don't feel like it would make sense to stream it multiple times, since you would never be able to get to the end of the stream.

Maybe what is meant is this:

  • A finite stream can be streamed multiple times.
  • If the stream is infinite, and you stop streaming from it, another stream can pick it up from where the first stream left off.

?

@vxgmichel
Copy link
Owner

vxgmichel commented Dec 18, 2019

Hey! Thanks for the report :)

I am trying to understand the meaning of 'a stream can be streamed multiple times'.

Oh yea that can be confusing. That simply means that a given stream can be used multiple times, as in:

    xs = stream.range(3) | pipe.list()
    assert await xs == [0, 1, 2]
    assert await xs == [0, 1, 2]

And I can confirm it is fine to run them concurrently, as they correspond to two different streamer instantiation:

    assert await asyncio.gather(xs, xs) == [[0, 1, 2], [0, 1, 2]]

Your example is a bit trickier though, as the stream you built depends on an external source, which means the two streamers iterate the same async generator concurrently. Here's how I would re-write it:

import asyncio
from functools import partial
from aiostream import stream, pipe, async_


async def produce():
    i = 1
    while True:
        yield i
        i += 1
        await asyncio.sleep(1)


async def consume(cid, item):
    print(f'Consumer {cid} got: {item}')
    await asyncio.sleep(.1)
    return item


async def main():
    producer_stream = stream.iterate(produce())
    async with producer_stream.stream() as producer:
        consumer_stream_1 = (
            stream.preserve(producer)
            | pipe.take(2)
            | pipe.map(async_(partial(consume, 1)))

        )
        consumer_stream_2 = (
            stream.preserve(producer)
            | pipe.take(4)
            | pipe.map(async_(partial(consume, 2)))
        )
        await asyncio.gather(consumer_stream_1, consumer_stream_2)


if __name__ == "__main__":
    asyncio.run(main())

Notice how stream.preserve is used to prevent consumer_stream_1 from closing the producer once it's finished so consumer_stream_2 can keep working on it.

I noticed 2 bugs while playing with this example plus the one you already noticed on the anyio branch, I'll try to do some proper reporting tomorrow.

Let me know if you have any questions :)

@vxgmichel vxgmichel changed the title Difference between asyncio and trio when streaming a generator multiple times How to dispatch items from a single source to several concurrent streams? Dec 19, 2019
@andersea
Copy link
Author

After consumer 1 finishes, this crashes for me with:

AttributeError: 'generator' object has no attribute 'cr_await'

But if I understand you correctly, there shouldn't be anything inherently wrong with trying to iterate the same external generator in parallel?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants