Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncio.Event for graceful/early termination #97

Open
mbbyn opened this issue Apr 30, 2024 · 7 comments
Open

asyncio.Event for graceful/early termination #97

mbbyn opened this issue Apr 30, 2024 · 7 comments

Comments

@mbbyn
Copy link

mbbyn commented Apr 30, 2024

Thanks for building this awesome library!

I was wondering if you'd be open to using asyncio.Event in combination with asyncio.wait_for to replace asyncio.sleep?

My use case is that I would like to be able to gracefully stop async operations at the iterator level only, never affecting the body. By using an event, I can set it, and be sure that the loop would terminate at the exact spot I expect it to. The alternative, is to use cancel, which might affect another async operation that might be running in the body of a loop.

e.g.

async def bg_task(stop_event: asyncio.Event):
  async for _ in aio.stream.count(interval=1, stop_event=stop_event):
    await perform_long_running_task()

By wrapping bg_task in an asyncio.Task, I would be able to set the stop_event, and be sure the long running task would be affected.

Perhaps I could use asyncio.shield, but this is a simple example, and I feel stop_event would be useful in general.

@mbbyn mbbyn changed the title asyncio.Event for graceful termination asyncio.Event for graceful/early termination Apr 30, 2024
@mbbyn
Copy link
Author

mbbyn commented Apr 30, 2024

in the spirit of composability, I came up with this wrapper to handle early termination using stop event:

_STOP_SENTINEL: t.Any = object()


async def async_gen_loop(
    *streams: aio.core.Stream[TGenValue], stop_event: asyncio.Event | None = None
) -> t.AsyncIterator[TGenValue]:
    """Async generator loop wrapper with stop event."""

    stop_event = stop_event or asyncio.Event()

    @aio.operator
    async def stop_event_gen():
        await stop_event.wait()
        yield _STOP_SENTINEL

    stream = aio.stream.merge(*streams)
    chained_stream = aio.stream.chain(stream, aio.stream.just(_STOP_SENTINEL))
    merged_stream = aio.stream.merge(chained_stream, stop_event_gen())

    async with merged_stream.stream() as iterator:
        async for value in iterator:
            if value is _STOP_SENTINEL:
                break

            yield value

Unit tests:

class TestAsyncGens:
    @pytest.mark.parametrize("stop", [False, True])
    async def test_async_gen_loop(self, stop: bool):

        stop_event = asyncio.Event()

        interval_stream = aio.stream.count() | aio.pipe.take(2)
        async_loop = util.async_gen_loop(interval_stream, stop_event=stop_event)

        items = []
        async for item in async_loop:
            items.append(item)

            if stop:
                stop_event.set()

        assert items == [0] if stop else items == [0, 1], f"{items}"

    async def test_async_gen_loop_multiple(self):
        odd_numbers = aio.stream.range(1, 10, 2)
        even_numbers = aio.stream.range(2, 10, 2)

        async_loop = util.async_gen_loop(odd_numbers, even_numbers)
        items = await aio.stream.list(async_loop)

        assert set(items) == set(range(1, 10)), f"{items}"

@vxgmichel
Copy link
Owner

Thanks @mbbyn for creating this issue :)

My use case is that I would like to be able to gracefully stop async operations at the iterator level only, never affecting the body. By using an event, I can set it, and be sure that the loop would terminate at the exact spot I expect it to. The alternative, is to use cancel, which might affect another async operation that might be running in the body of a loop.

Interesting. So basically you would need a way to insert a breakpoint in the processing pipeline where the producer can be stopped or cancelled, but the consumer would still be able to run to completion with the items produced before the stop signal.

For instance, with the following operation pipeline:

A -> B -> [breakpoint] -> C -> D

a stop signal would cancel the task running A -> B and let C -> D run to completion.

In this case, I would write the stop_when operator as such:

import asyncio
from typing import AsyncIterable, AsyncIterator, TypeVar
from aiostream import stream, pipe, pipable_operator, streamcontext, aiter_utils

T = TypeVar("T")


@pipable_operator
async def stop_when(source: AsyncIterable[T], event: asyncio.Event) -> AsyncIterator[T]:
    async with streamcontext(source) as streamer:
        while True:
            if event.is_set():
                return
            try:
                task: asyncio.Task[T] = asyncio.create_task(aiter_utils.anext(streamer))
                event_task = asyncio.create_task(event.wait())
                (done, _) = await asyncio.wait(
                    [task, event_task], return_when=asyncio.FIRST_COMPLETED
                )
                if task in done:
                    yield task.result()
                else:
                    task.cancel()
                    return
            except StopAsyncIteration:
                return

Here's a test that demonstrates its behavior:

@pytest.mark.asyncio
async def test_stop_when():
    stop_event = asyncio.Event()

    async def process(item: int) -> int:
        await asyncio.sleep(0.4)
        return item

    xs = (
        stream.count(interval=0.1)
        | stop_when.pipe(stop_event)
        | pipe.map(process)
    )

    items = []
    async with xs.stream() as streamer:
        async for item in streamer:
            items.append(item)
            if item == 10:
                stop_event.set()

    assert items == list(range(14))

Note how the xs stream produces items faster than the map operation can process them. For this reason, the map operator has already started processing items 11, 12 and 13 when the stop signal is emitted. So the count operator gets cancelled and no longer emits items but the map operator is still able to finish producing its last items, resulting in 14 items produced in total.

Did I get this right?

@mbbyn
Copy link
Author

mbbyn commented Apr 30, 2024

Very nice, I think you reframed the idea to better fit the library's conventions. I haven't thought that far, TBH.

I was also surprised when running your sample code that it worked the way it did, producing 14 elements. I was using the library assuming there is back-pressure support where the producer would only generate an item after the whole pipe is executed (i.e. count is blocked from generating the next number until the async for body completes).

With that said, maybe there is space for two ideas:

  1. stop_when: modular breakpoint affecting a specific part of the pipeline (similar to how timeout works)
  2. shortest: simply stop the iterator as soon as any of the streams is exhausted (which is why I went with _SENTINEL on both streams, to detect if any of the streams finished, and stop early).

But as someone who knows the importance of keeping OSS libraries focused and consistent, the first option seems to better fit the library and would work for my scenario as well. I just need to convince myself first why your unit test passes, and think about how the streams work accordingly.

@vxgmichel
Copy link
Owner

I was using the library assuming there is back-pressure support where the producer would only generate an item after the whole pipe is executed

This is a good assumption in general. I would go as far as calling what you describe as sequential execution, in the sense that by default, there is no concurrency involved over a single pipeline (even though two pipelines could run concurrently, as they are async-friendly). On the other hand, I would call back-pressure a mechanism that would limit the producing of items if the consuming process is slower, despite both producer and consumer running concurrently (similar to a linux pipe when running a | b: the pipe has an internal buffer that will block a from writing once it is full). But I'm also OK with characterizing sequential execution as a strong form of back-pressure. Hopefully this clarification wasn't too confusing 😅

So now, what about aiostream?

By default, and contrary to linux pipes, chaining aiostream operators does not imply that operators run concurrently. As you correctly assumed, combining operators is similar to nesting async for loops, meaning that the producer and the consumer typically run sequentially in the same task.

However, some aiostream operators do add concurrency to the pipeline execution. This is the case for:

  • map / starmap (when the passed function is async)
  • zip / ziplatest
  • merged
  • all the advanced operators as their the building blocks for the above operators

For zip, ziplatest and merged, all the provided sources run concurrently. However, back-pressure is applied so that if the consumer no longer asks for new items, those sources will only produce one extra item to be picked up and block.

For map and starmap with an async function, new tasks will be created for each async function call. This is a very common use case when doing an async map. Here as well, a back-pressure mechanism is provided with the task_limit argument. So if the consumer no longer asks for new items, the task_limit tasks will all block waiting for their produced items to be picked up. Since no more task can be spawn, the source producer will also block.

I was also surprised when running your sample code that it worked the way it did, producing 14 elements.

Since the sample code uses map without a task_limit, you should now see why 4 more elements are produced after the stop_event is set: those 4 items corresponds to 4 tasks that have been started before, since the count() operator produces items faster than the process function. I had to use a concurrent operator in order to properly test that only the part on left of the pipeline gets cancelled.

Now you might think that the fact that some operators do add concurrency to the pipeline and some don't is confusing, and that's my opinion as well. I had a plan to write a sequential version of those map/zip/merge operators and rename the concurrent operators accordingly in order to make clear which ones are going to add concurrency to the pipeline. I also wanted to add a buffer(max_size) operator that would explicitly work as a Linux pipe. Sadly, making those changes while dealing with backward compatibility was too much work and I gave up on it.

shortest: simply stop the iterator as soon as any of the streams is exhausted (which is why I went with _SENTINEL on both streams, to detect if any of the streams finished, and stop early).

That's an interesting use case too :)

Here's a possible implementation for it:

@pipable_operator
def shortest(source: AsyncIterable[T], *more_sources: AsyncIterable[T]) -> AsyncIterator[T]:
    sentinel = object()
    new_sources = [
        stream.chain.raw(source, stream.just.raw(sentinel))
        for source in [source, *more_sources]
    ]
    merged = stream.merge.raw(*new_sources)
    result = stream.takewhile.raw(merged, lambda x: x is not sentinel)
    return cast(AsyncIterator[T], result)



@pytest.mark.asyncio
async def test_shortest():
    items = []
    xs = stream.range(0, 5, interval=0.1)
    ys = stream.range(10, 15, interval=0.2)
    zs = shortest(xs, ys)
    async with zs.stream() as streamer:
        async for item in streamer:
            items.append(item)

    assert items == [0, 10, 1, 2, 11, 3, 4]

Wow that was a long post, let me know what you think about all that :)

@vxgmichel
Copy link
Owner

Oh and as you noticed stop_when can easily be implemented from shortest:

@pipable_operator
def stop_when(source: AsyncIterable[T], event: asyncio.Event) -> AsyncIterator[T]:
    stop = stream.call.raw(event.wait)
    filtered = stream.filter.raw(stop, lambda x: False)
    return shortest.raw(source, filtered)

@mbbyn
Copy link
Author

mbbyn commented May 6, 2024

Makes perfect sense, appreciate the elaborate rundown. The nuances of reactive streams are always tricky, leading to hidden assumptions that are not so obvious for the uninitiated (👋) (by hidden, I mean not present in the typing system nor naming convention, ... etc). I'm glad you have it already in mind.

The fact that implementing the proposed use cases on the fly is a testament to the versatile design.

With this realization, I think I figured out how to implement a use case I had in mind but slated for later because I thought it wouldn't fit the design well: Scaling an async generator using an PoolExecutor. pool.submit is an async function that can run concurrently, thanks to map's concurrency, and I can still handle task_limit as needed. That would be perfect.

@vxgmichel
Copy link
Owner

Scaling an async generator using an PoolExecutor. pool.submit is an async function that can run concurrently, thanks to map's concurrency, and I can still handle task_limit as needed.

Oh you mean loop.run_in_executor? That's a great idea, I've never thought of that.

It can easily be turned into an operator as well:

import asyncio
from functools import partial
from aiostream import pipable_operator, stream, pipe
from aiostream.stream.combine import SmapCallable
from typing import AsyncIterator, AsyncIterable, TypeVar

T = TypeVar("T")
U = TypeVar("U")


@pipable_operator
def executor_map(
    source: AsyncIterable[T],
    corofn: SmapCallable[T, U],
    *more_sources: AsyncIterable[T],
    ordered: bool = True,
    task_limit: int | None = None,
) -> AsyncIterator[U]:
    loop = asyncio.get_event_loop()

    async def wrapped(*args: T) -> U:
        return await loop.run_in_executor(None, partial(corofn, *args))

    return stream.amap.raw(
        source, wrapped, *more_sources, ordered=ordered, task_limit=task_limit
    )

Here's the corresponding test:

import time
import random
import pytest


@pytest.mark.asyncio
async def test_executor_map():

    def target(x: int, *_) -> int:
        time.sleep(random.random())
        return x

    xs = (
        stream.iterate(range(10))
        | executor_map.pipe(target, task_limit=5)
        | pipe.list()
    )
    assert await xs == list(range(10))

Now that I think about it, this nicely completes the following map operator matrix:

map operator sync function async function
sequential stream.combine.smap does not exist but easy to write
concurrent executor_map stream.combine.amap

Maybe all those could be exposed using a sequentialmap and a concurrentmap operator, both of them either taking a sync or async callable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants