Additional languages
Managing concurrency in real-world applications can be quite tricky. Developers must grapple with concurrency control, back pressure, error handling, and more. Thankfully, Rust provides us with the async/await mechanism, and on top of that, the Stream API. The Stream methods allow us to elegantly define a pipeline of asynchronous operations with a nifty abstraction addressing common use cases.
Unfortunately, elegance sometimes masks complexity. Can you look at a streaming pipeline and understand how many operations would run in parallel? What about the order? I found it trickier than it seems, so naturally, as a complete overkill, I wrote a Bevy visualization to investigate it. This investigation brought light to some truly unexpected results - so unexpected, that in some cases, you might want to reconsider using this API.
Let's start with a brief overview of the Stream API. The following code defines an async pipeline that iterates over integers from 0 to 10 and executes the async_work
method with a concurrency limit of 3. The result is then filtered using the async_predicate
method. This is awesome! With just a few lines of code, we've created a non-trivial async control flow.
async fn async_work(i32) -> i32 {...}
async fn async_predicate(i32) -> Option<i32> {...}
async fn buffered_filter_example() {
let stream = stream::iter(0..10)
.map(async_work) // async_work returns a future. The output of this stage is a stream of futures
.buffered(3) // polls stream of futures and runs at most 3 concurrently
.filter_map(async_predicate); // filters out the results of the previous stage using async_predicate function
pin!(stream);
while let Some(next) = stream.next().await {
println!("finished working on: {}", next);
}
}
Amm, we can already see some complex elements. For instance, why did I use filter_map
instead of filter
? What's this pesky pin!(stream)
doing? I won't digress into these questions. Instead, here are some informative links:
The goal of this investigation is to get a better understanding of the execution order, concurrency, and back pressure characteristics of such pipelines. For example, in the code above, the map method executes 3 async_work
concurrently, but what if async_predicate
is a long operation? will it continue to concurrently run more async_work
? Supposedly after it completes 3 invocations, it should be able to run more while async_predicate
runs in the background right? If so, will it take an unbounded amount of memory?
What about filter_map
? it does not have a clear concurrency parameter. Does it runs the provided method serially? or with unlimited concurrency?
The documentation leaves some of these questions unclear. We need to see it with our own eyes.
I used Bevy to visualize the flow of data in a streaming pipeline. The idea involves defining a streaming pipeline with methods that report their progress via a channel. I used Bevy's EventWriter
to forward this information to a Bevy rendering system.
Here's how it looks:
In the visualization, we see a representation of each streaming item navigating through different stages of the pipeline. Units of work start from the source and move to the map(..).buffered(..)
stage. To simulate real world async work I used a small loop of sleep()
calls. This represents real world scenarios where async methods have multiple await
calls and allows us to visualize the future run progress.
for i in 0..5 {
tokio::time::sleep(duration / 5).await;
tx.send(/* update bevy rendering system */).unwrap();
}
We visualize future progress via a tiny progress bar on each item. After an item completes the buffered
stage, it proceeds to the sink and finishes its journey.
It is important to note that the visualization is sourced from actual running Rust code. This isn't a simulation; it is a real-time visualization of the Rust Stream pipeline.
You can find the source code here.
Experiment 1: buffered
stream::iter(0..10)
.map(async_work)
.buffered(5);
buffer up to at most n futures and then return the outputs in the same order as the underlying stream. No more than n futures will be buffered at any point in time
- Will the
buffered
method fetch a new unit of work from the source stream as soon as any unit of work completes, or only when the earliest unit of work completes and sent as output to the next stage?
All right! look at it purr! As expected, each item goes through async_work
. The .buffered(5)
step runs at most 5 futures concurrently, retaining completed features until their predecessors completes as well.
The buffered
method does not acquire a new unit of work once an arbitrary item completes. Instead, it only does so once the earliest item is completed and advances to the next stage. This makes sense. A different behavior would require the buffered
method to store the results of an unbounded number of futures, which could lead to excessive memory usage.
I wonder if there's a case to be made for a buffered_with_back_pressure(n: usize, b: usize)
method that will allow some items to be taken from the source stream, up to b
times.
Experiment 2: buffer_unordered
stream::iter(0..10)
.map(async_work)
.buffer_unordered(5);
buffer up to n futures and then return the outputs in the order in which they complete. No more than n futures will be buffered at any point in time, and less than n may also be buffered
- Will the
buffer_unordered
method take a new unit of work from the source stream as soon as any unit of work completes, or only when the earliest unit of work is completed and sent to the next stage?
Unlike buffered
, buffer_unordered
does not retain completed futures and immediately makes them available to the next stage upon completion.
The buffer_unordered
method does fetch a new unit of work as soon as any unit of work completes. Contrary to buffered
, the unordered version does not need to retain completed future results to maintain output order. This allows it to process the stream with higher throughput.
Experiment 3: filter_map
stream::iter(0..10)
.filter_map(async_predicate);
Filters the values produced by this stream while simultaneously mapping them to a different type according to the provided asynchronous closure. As values of this stream are made available, the provided function will be run on them.
- Does the
filter
method executes features in parallel or in series?
No surprises here. The filter
operator processes each future in series.
If we want to accomplish async filtering with concurrency we can use a blend of map
, buffered
, and filter_map(future::ready)
. The map().buffered()
duo would calculate the predicate concurrently while filter_map
remove failed items from the stream
stream::iter(0..10)
.map(async_predicate)
.buffered(5)
.filter_map(future::ready); // the ready function will return the predicate result wrapped in a ready future
stream::iter(0..10)
.map(async_work)
.buffered(3)
.filter_map(async_predicate);
- How will a long-running
filter_map
step affect the concurrency of thebuffered
step?
Ok, this is unexpected! The stream does not function as I initially thought. While async_predicate
is being executed, no async_work
future is progressing. Even further, no new future starts to run until the first batch of five is complete. What's going on?
Let's see what happens when we replace buffered
with buffer_unordered
.
The situation is pretty much identical. Again, the async_work
futures are suspended until async_predicate
is completed.
Could it be something to do with filter_map
? Let's attempt to stick two buffered
steps sequentially:
Nope, the behavior remains the same.
Turns out I'm not the first that encounters this difficulty. This is the same issue Barbara battled with.
To truly grasp what's happening, we need a solid understanding of Futures, async executors, and the stream API. Resources such as The async book and perhaps fasterthanlime's Understanding Rust futures by going way too deep can serve as good starting points.
I'll attempt to give you some intuition.
The first clue comes from the question - when does Rust run two futures concurrently? There's the join! and select! macros, and the ability to spawn new async tasks. However, the Stream API neither join
nor select
over futures created by different pipeline steps, nor does it spawn
new tasks each time it executes a future.
Let's take a closer look at our example and try to analyze the control flow.
let stream = stream::iter(0..10)
.map(async_work)
.buffered(5)
.filter_map(async_predicate);
pin!(stream);
while let Some(next) = stream.next().await {
println!("finished working on: {}", next);
}
First we create the stream instance. Futures in Rust aren't executed until they are await
ed. Therefore, the first line of the example has no standalone effect.
Lets look at the type definition of the stream
variable:
FilterMap<
Buffered<Map<Iter<Range<i32>>, fn async_work(i32) -> impl Future<Output = i32>>>,
impl Future<Output = Option<i32>>,
fn async_predicate(i32) -> impl Future<Output = Option<i32>
>
After the initial shock we find five nested structs Range
within Iter
within Map
within Buffered
within Filter
.
These types of structs are referred to as "adapters". Each adapter holds state and data and implements some trait, in our case, Stream
. They wrap their own logic around this trait.
For example, the Buffered
adapter owns a source stream
and a in_progress_queue: FuturesOrdered
to manage the buffering.
Elegantly skip over pin!
.
So, what happens on the first stream.next().await
command? The Next
future callsstream.poll_next_unpin(cx)
, where stream
is an instance of FilterMap
.
In turn, the FilterMap::poll_next
implementation polls its inner stream - the Buffered
stream - and executes async_predicate
on the result.
The Buffered::poll_next
method polls its inner stream at most max
times, until the inner buffer is filled.
For each such poll, the Map
stream fetches an item from its source stream and runs the async_work
method that returns a future.
Note that the only place where futures are executed concurrently is the FuturesOrdered
instance in the Buffered::poll_next
implementation.
We can loosely transform the example to follow this pseudo code:
let range_stream = stream::iter(0..10);
let in_progress_queue = FuturesOrdered::new()
loop {
// buffer at most 5 items to queue
while in_progress_queue.len() < 5 {
// get value from source stream and run the map step on it
let next = range_stream.next();
// note we're not `await`ing the returned future yet
let future = async_work(next);
in_progress_queue.push(future)
}
// execute buffered futures. Get the next completed future (respecting order)
// here 5 futures run concurrently
let next = in_progress_queue.poll_next().await;
// filter the result
// futures in `in_progress_queue` are not getting polled while this runs!
let predicate_result = async_predicate(next).await;
// yield result accordingly
}
When deconstructing the stream pipeline into this simple representation, the results of our experiment become clearer. While async_predicate
is being executed, we don't poll the in_progress_queue
- hence the futures are "stuck".
Furthermore, when async_predicate
is completed, we return to poll new futures from the in_progress_queue
. However, even if we succeed, the subsequent in_progress_queue.poll_next().await
will only run for a short while - just until the ongoing futures are completed. This leaves very little time for the newly polled futures to execute. In fact, based on the visualization, they might not be polled at all. Once the initial batch of futures is completed, the newly polled futures get a chance to execute.
At this point some of you might be suspicious of the results. Surly if you made a 100ms network request it would still take 100ms regardless of the hosting async executor. This is of course correct. Once a future has been polled, the underlying implementation will run to completion and wait patiently to be polled again. The effect I descried cause this final poll to be delayed.
To illustrate this effect the two following versions of async_work
will have surprisingly different run characteristics in a stream pipeline.
The first version has a single call to tokio::time:sleep(100ms)
. sleep()
returns Sleep which implements Future directly. This means that the first poll of async_work
will in turn call Sleep::poll
which will do the needed operation to sleep for 100ms. However late this future will be polled again, it will report it is Ready
and async_work
will return.
async fn async_work(x: i32) -> i32 {
sleep(Duration::from_millis(100)).await;
x
}
The second version has 5 calls to sleep(20ms)
. In this case each consequent .await
might suffer from the delayed polling again and again. This is the case for the futures visualized in this investigation, and probably a better simulacrum for real world use cases.
async fn async_work(x: i32) -> i32 {
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
sleep(Duration::from_millis(20)).await;
x
}
Our experiments revealed that the Stream API pipelines can be surprisingly suboptimal. Looking naively at a pipeline, we might imagine everything running concurrently. However, the reality doesn't meet these expectations.
Should you use the Stream API? As with many other things in our profession, this depends on the trade-offs. On one hand, this API allows us to quickly meet our needs with a clear and elegant API. On the other hand, the pipeline throughput will not be optimal.
In my opinion, in many cases, dropping this API might be considered a premature optimization. Nevertheless, these findings definitely worth your consideration.