Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent vs Stream Async Iterables #20

Closed
ben-laird opened this issue May 10, 2024 · 45 comments
Closed

Concurrent vs Stream Async Iterables #20

ben-laird opened this issue May 10, 2024 · 45 comments

Comments

@ben-laird
Copy link

In the async iterator space, Javascript has made a very subtle distinction between two types of iterables: concurrent asynchronous iterables and stream asynchronous iterables, referred to hereafter as concurrent iterables and stream iterables, respectively. Each of these types:

  1. represent their own concepts,
  2. have their own use cases, and
  3. come with their own API's for handling them.

This issue intends to point out this subtle difference and centralize conversation around this difference with the goal of:

  1. breaking the proposal down into smaller, much more manageable problems, and
  2. allowing for other issues which propose API changes to have a common vocabulary.

Concurrent Asynchronicity

Concurrent iterables are data structures representing the concept a "pool" of promises that are related but have no concept of order. They are best represented by the following TypeScript type:

type ConcurrentIterable<T> = Iterable<Promise<T>>;

Note that while queuing these promises is portrayed as ordered--the iterable can have an order to its queuing process, e.g. a Set<Promise<T>> will queue promises in insertion order--settling the queued promises is not portrayed as ordered.

To handle concurrent iterables, there exist the "promise concurrency methods" per MDN; one can handle race conditions with Promise.race() or filter out rejections with Promise.any(), and one can handle transactions with Promise.all() or deal with any rejections with Promise.allSettled(). Promise.race() and Promise.all() especially are good for concurrent iteration because they do not suppress errors, leaning on the concurrent iterable to perform error handling. The iterable then will handle failures from the caller (through calling .throw() on an iterator the iterable produced) and from any promises it queued (through the normal Promise handling means).

Stream Asynchronicity

Stream iterables are data structures representing an asynchronous "stream" of data whose order matters. These are the "true" async iterators, and as such have the following TypeScript type:

type StreamIterable<T> = AsyncIterable<T>;

Stream iterables are used when working with the Streams API, which is especially evident when Readable Streams implement the async iterable protocol and have the ReadableStream.from() static method that can take any iterable and convert it to a stream. Streams then are used to form pipelines, preserving the order of the data pumped in while transforming the form of the data to something more useful. This purpose seems most similar to what this proposal aims to do for async iterator helpers as a whole.

Similarities and Differences

Though the two types of iterables solve similar problems, they represent different concepts, use cases, and solutions.

Firstly, concurrent iterables can be considered a subset of stream iterables, since it is possible to convert a concurrent iterable to a stream iterable. Consider the following solution, which takes a concurrent iterable, splits it into chunks, and puts those chunks in a "pool" where they are raced against one another until the pool drains:

type ConcurrentIterable<T> = Iterable<Promise<T>>;

type Indexed<T> = [number, Promise<[number, T]>];

function* take<T>(iter: Iterable<T>, amount: number) {
  let idx = 0;

  for (const element of iter) {
    if (!(idx < amount)) return;

    yield element;

    idx++;
  }
}

function index<T>(idx: number, pr: Promise<T>): Indexed<T> {
  return [idx, pr.then((v) => [idx, v])];
}

function* enumerateIndices<T>(iter: ConcurrentIterable<T>) {
  let idx = 0;

  for (const element of iter) {
    yield index(idx, element);

    idx++;
  }
}

export type OrderParams<T> = [
  concurIter: ConcurrentIterable<T>,
  options: {
    lowWaterMark: number;
    highWaterMark: number;
  },
];

export async function* toStream<T>(...params: OrderParams<T>) {
  const [concurIter, { lowWaterMark, highWaterMark }] = params;

  // Use single reference to transformed iterator to prevent accidentally resetting it
  const iter = enumerateIndices(concurIter);

  const pool = new Map(take(iter, highWaterMark));

  while (pool.size !== 0) {
    const [idx, res] = await Promise.race(pool.values());

    pool.delete(idx);

    // Refill step
    if (pool.size <= lowWaterMark) {
      take(iter, highWaterMark - pool.size);
    }

    yield res;
  }
}

If the concurrent iterable is finite (as is the case of most transactions or arguments to Promise.race()), then no refill step is needed and the implementation becomes even simpler:

export async function* PromiseOrdered<T>(values: Promise<T>[]) {
  const pool = new Map(values.map((pr, idx) => index(idx, pr)));

  while (pool.size !== 0) {
    const [idx, res] = await Promise.race(pool.values());

    pool.delete(idx);

    yield res;
  }
}

The name comes from the idea that this feels so natural to do with an iterable of promises that it feels like it should be another one of the promise concurrency methods, Promise.ordered(). The above function works exactly like Array.fromAsync(), yielding elements in the order in which they've settled, but as a stream iterable.

With all that being said, there does not currently exist a method to convert a stream iterator to a concurrent one, and the secret is in the iteratorResult.done property. In concurrent iterables, this property is immediately available, telling the concurrency method of choice (or iterable adapter) when to stop iterating. In stream iterables, this property sits behind the same promise the iteratorResult.value property does, forcing the caller to await the whole expression before yielding the value and deciding whether it should continue iterating; this is how stream iterables enforce the ordering of their data, and this is precisely what makes stream iterables a strict superset of concurrent iterables. To convert a concurrent iterable to a stream iterable, all the above function did was impose an ordering, the ordering of which promises fulfilled fastest.

Conclusion

Implementing helper methods for concurrent iterables seems very easy, as they will most likely be combinations of the promise concurrency methods and the methods of the synchronous iterator helpers. It may be so easy that such an implementation might be better served as its own proposal.

As for stream iterables, separating them from concurrent iterables (whether conceptually or physically as a separate proposal) will make conversation surrounding them more precise, as there is a lingua franca community members can leverage in the form of the Streams API, and the assumption can be made for stream iterables that the order of the yielded elements matters.

Furthermore, this proposal can directly benefit the Streams API; there could be an offshoot proposal for stream helper methods, or developers could easily convert between streams and stream iterables and take advantage of the helper methods incorporated by this proposal. Other languages, like Rust, may look to this proposal as a prior art for their own implementation of async iterators.

A separation of the concurrent and stream iterator concepts seems to reduce the async iterator helpers proposal to two smaller problems: concurrent iterator helpers, which are mostly enabled by sync iterator helpers and concurrency methods; and stream iterator helpers, which are easier to implement with ordering in mind and are mostly enabled by the Streams API. I believe this separation will create better primitives to build better abstractions on top of; create less confusion around promise concurrency, iterators, and streams; and be a simpler way to highlight the differences between the two concepts that JavaScript already acknowledges.

@bakkot
Copy link
Collaborator

bakkot commented May 10, 2024

Thanks for the discussion. I agree that the distinction between "iterator of promises" and "async iterator" is quite important.

This proposal is focused only on async iterators (which you call "stream iterators", though I'm sticking with "async iterators" since that's what they're called by everyone else). I agree it would be nice to have more helpers for working with iterators of promises, but there is no clear place to put them, and they certainly won't go on AsyncIterator.prototype, which is the focus of this proposal.

Streams are definitely not in scope for this proposal. The designs influence each other, of course, but they work in very different ways. Though note that you can in fact convert directly from a stream to an async iterable (and from).

This proposal will introduce a toAsync method which will let you go from Iterator<Promise<T>> to AsyncIterator<T> (but it is order-preserving, not based on the order of promises settling). But there will be nothing else about synchronous Iterators in this proposal.

Implementing helper methods for concurrent iterables seems very easy, as they will most likely be combinations of the promise concurrency methods and the methods of the synchronous iterator helpers. It may be so easy that such an implementation might be better served as its own proposal.

I don't think it will be easy: the types in JavaScript are such that there is no runtime representation of Iterator<Promise> as distinct from merely Iterator. That is, we can't add prototype methods which are only available to iterators of promises. We could add methods on all iterators which are only intended for iterators of promises (and have them do what when it turns out the iterator contains non-promises?), but that's pretty annoying for users, who rightly expect prototype methods to be generic.


I'm gonna close this as I don't think there's anything actionable here, but feel free to continue the discussion.

@bakkot bakkot closed this as completed May 10, 2024
@conartist6
Copy link

conartist6 commented May 10, 2024

I've recently written the most advanced stream processing engine that exists in Javascript, and unfortunately in my view the streams API is fatally flawed.

The problem is this: streaming access is a fundamental tool for encapsulation and abstraction, and streams Don't Work Like that.

I often use the example of being an assembly line worker and imagining how your job would be described to you. Perhaps you work in a factory that bottles and distributes soda. Let's imagine your job in the assembly line is that you get bottles, check that they're full of soda, and put a cap on them.

The worker to your left is filling the bottles, and once they've filled twelve bottles they put those bottles in a box and your job is to take the box by the handles (gotta be careful here, the lids aren't on yet), move it from their work station to yours, put twelve caps on, and then move the box to the side where it can wait to be picked up by the next worker.

This assembly line works fine; the problem with it is a bit subtle: the person who knows how to put caps on soda bottles doesn't know how to do it unless they're in a box. In a plastic crate? Don't know how to put caps on that. Just one bottle by itself? No idea how to put a cap on that. The first step of the instructions says "pick the box up by its handles".

The powerful abstraction is being able to describe how to put the caps on the bottles in such a way that a worker would know how to "put a cap on each bottle" no matter how the bottles are actually stored.

The streams API takes this beautiful abstract system and puts boxes back into it. Because the person ahead of you may not have the next bottle ready for you, if that is ever the case then you must give up on your beautiful transformation pipeline and start passing around boxes again, and some of the boxes are half full and sometimes your finished boxes contain only one item but at least there are always handles and half of your SOPs make implicit assumptions about boxes with handles now.

So now in order to support infinite assembly lines you have describe putting caps on bottles in terms of getting bottles one by one, but you also have to know how to do it if the things you get one by one aren't bottles but boxes of bottles. So now the cap-adder needs to know not one but two completely different procedures for putting bottles on caps, as if the process with the bottle and the cap were any different.

What I am suggesting instead is this: if there isn't bottle ready to put a cap on I just wait until there is, and then I put the cap on: the same procedure defined the same way regardless of whether the next bottle turned out to be in the box already in front of me or whether I had to wait for another box in order for there to be a bottle ready for me to work on.

In other words, instead of stream iterators being async iterators of buffers, I am strongly suggesting that streams should be implemented as iterators which return either { done, value } or Promise<{done, value}>.

@conartist6
Copy link

conartist6 commented May 11, 2024

A simpler way of thinking of it is as muddying time and space. A sequence is a time-ordering of all items, but a chunk stream is a time ordering of a space ordering of items. There are many overall time orderings of a chunk stream, just as there are many algorithms by which you might touch every bottle in a box you know contains a 3 * 4 grid of 12 bottles.

@conartist6
Copy link

conartist6 commented May 11, 2024

Of course you could counter that: the chunks do define a time-ordering of their bytes -- they are iterable. Clearly we aren't actually confused about the overall ordering of characters in a stream!

But what is the source of truth? Is it the iterator? Is it indexed array access? We actually don't have any guarantee that those two views of the data will be consistent. Byte access could show ['a', 'b', 'c'] while iterator access shows [1, 2, 3] for all we know, and that's only the least subtle kind of problem that I can imagine.

A more subtle problem is that it isn't safe to pass the data stream to another function and then use it again afterwards. Because the buffers are mutable data structures that are part of the public API, after you've given those buffers to some code you don't trust they'll never be safe to use again. A method which yields primitive values in sequence has no such limitation. In other words: if I write a method which accepts an iterator, I am really accepting arrays, strings, generator functions, perhaps even custom data structures likes trees, ropes, or hashmaps -- and none of them is required to allow me destructive access to their internals in order to provide me the data they contain.

@conartist6
Copy link

I think it's relevant here because the async spec is written in a way that requires every single item to be async. It's my understanding that the implementers understand that await would be too high-level and heavy and slow a mechanism to apply to every single character in a string or every single byte in a data stream, which is the pragmatic reason we have chunk streams.

The async spec sorta says whatever you produce has to be delivered through the postal service. You could try to create a more conceptually pure system by sending items not boxes through the postal service, but all you'd really do is clog up the postal service and cause large amounts of the work done to be overhead. You'd be crazy not to ship boxes full of items at that price...

My question is: if your output is ready right now and the next worker wants it right now, why does the postal service (event loop) need to be involved? It's a double whammy: it's creating needless costs, and it's centralizing them!

There's a more subtle kind of slowness too: because producers are incentivized to ship full boxes, delays in the system can start to propagate in a nasty way when the boundaries start to mismatch. Say there's a QC station on the assembly line that throws away 8 of every 100 items. If it always wants to be cost-efficient by filling a full box before calling the postal service, then it's quite possible that the next worker will spend as much as a whole cycle doing nothing at all because there wasn't enough work completed by the upstream worker to trigger a delivery. Of course if you deliver eagerly, the wrong combination of circumstances could find you shipping mostly boxes with only one item in them again, or even shipping empty boxes.

@conartist6
Copy link

conartist6 commented May 11, 2024

For the record, all Isaacs ever had to say about this in his now-famous Zalgo post was that you must never create a promise that sometimes resolves synchronously. His suggestion for changing such APIs to not "release Zalgo" is that they should just return the data when it is available, returning a promise when it is not.

In other words, you can't put a postal service address on the box and then just hand it to the next worker, because then that worker will not really know who they are working for. If there's a problem (such as an exception) do they tell the post office about it? Or the worker who gave them the item? If you have unleashed Zalgo by impersonating the post office (without really being the post office), then it will be deeply unclear.

If you don't impersonate the post office though, there's no problem. If the item came through the post, tell them about it so they notify the sender. If not, the issue should be raised directly (synchronously) to the source of the faulty item.

@laverdet
Copy link

I have to be honest here, I'm struggling to follow any of your metaphors about factories and mail couriers. I've read them several times and I just have no idea what you're saying.

If you desire something more pure than a stream of byte chunks then wouldn't it make sense to push for a stream of booleans? You wouldn't want to hold up a consumer in the case the producer has 3 bits ready but not enough to make a whole byte, right?

@conartist6
Copy link

conartist6 commented May 11, 2024

// I'm saying this:
const chunkStreamIterator = {
  chunkIter,
  // yields Buffer instances wrapped in promises
  next() {
     return chunkIter.next();
  }
}

// should become this:
const byteStreamIterator = {
  chunkIter,
  chunkStep: null,
  index: 0,
  // yields a byte from the stream, or a promise of the next byte
  next() {
     if (!this.chunkStep || this.chunkStep.done) {
       this.chunkStep = this.chunkIter.next():
       return this.chunkStep.then((step) => {
         this.chunkStep = step;
         return step.done ? step : { done: false, value: this.chunkStep.value[this.index++] };
       })
     } else {
       // Don't get the event loop (postal service) involved here to avoid overhead!
       return step.done ? step : { done: false, value: this.chunkStep.value[this.index++] };
     }
  }
}

The courier or postal service is my way of talking about the event loop, because when you await you must give control back to the central event loop and wait for it to "deliver" control back to you along with the item you awaited.

If you desire something more pure than a stream of byte chunks then wouldn't it make sense to push for a stream of booleans?

If you're really capable of computing results one bit at a time and incurring processing delays on bit 3 but not bit 2, then yes. In that case though I don't see why a stream of booleans wouldn't do just fine. With this design a stream of booleans exists since a stream is a unique type of iterator rather than being defined by what is stored in the iterator (chunk buffers).

@laverdet
Copy link

I think this discussion is way out of scope for this repository.

Computers have been reading chunks of bytes for a very long time. This is what the underlying network or filesystem controllers do, they dump out a chunk of bytes into the memory controller and then fire an interrupt. Operating on a char[16] has always been faster than operating on 16 individual char*.

@conartist6
Copy link

In other words, memory offset access as a primary API for text processing is the unquestioned outgrowth of extremely archaic performance limitations...

@laverdet
Copy link

Cache locality isn't archaic, it is mature and well-understood. It is also a product of the physical world we live in. I respect the desire for abstract mathematical purity (really, I do) but unless you have a solution for "violating the laws of thermodynamics" then you're going to have to live with chunks of bytes for the foreseeable future.

@conartist6
Copy link

conartist6 commented May 11, 2024

Please don't forget that I built the system I'm describing -- a system that can parse over any character stream (yes, one-by-one). I know that it doesn't violate the laws of thermodynamics.

My parser implemented character-by-character is easily fast enough, particularly considering that it can do most of its parsing work while it's waiting for data. A batch parser can be blazing fast in how quickly it carries out its computations and yet still be slower to produce a result than my stream parser, because my stream parser is actually doing the parsing work as the data arrives over the pipe. It may be using more processing power to do the work, but it's fitting that work into wall clock time that was otherwise wasted.

@ben-laird
Copy link
Author

ben-laird commented May 11, 2024

Hello, thanks so much for your input. If I'm understanding you correctly, you believe the Streams API needs some work because it treats Promises of values differently from the values themselves; it makes the distinction between boxes of bottles and just bottles, per your metaphor. If that interpretation is correct, then I'd have to disagree, as you can transform any iterable into a ReadableStream using ReadableStream.from(), and therefore you can use the same TransformStreams for synchronous code and asynchronous code. You can even input iterable data structures and collect your pipeline outputs to iterable data structures. Consider the following code snippet where I attempt to represent your bottle assembly line metaphor in code:

Bottle assembly line example
import { assertEquals } from "jsr:@std/assert";
import { delay } from "jsr:@std/async";
// ^ Testing utilities, pls disregard

function mapStream<T, U>(handler: (value: T) => U) {
  return new TransformStream<T, U>({
    transform(ch, ctrl) {
      ctrl.enqueue(handler(ch));
    },
  });
}

interface Bottle {
  id: number;
  capOn: boolean;
}

function* generateBottles(count: number) {
  let idx = 0;

  while (idx < count) {
    const next: Bottle = { id: idx, capOn: false };
    yield next;

    idx++;
  }
}

async function* generateBottlesAsync(count: number) {
  let idx = 0;

  while (idx < count) {
    // Using delay from Deno standard library @std/async
    await delay(Math.random() * 500);

    const next: Bottle = { id: idx, capOn: false };
    yield next;

    idx++;
  }
}

Deno.test("Bottle assembly", async () => {
  // Defining assembly line
  const putCapOn = mapStream((bottle: Bottle) => {
    bottle.capOn = true;

    return bottle;
  });

  const tagBottle = mapStream((bottle: Bottle): [number, Bottle] => {
    return [bottle.id, bottle];
  });

  // 1. This pipeline can take any iterable data structure in...
  type Inputs = AsyncIterable<Bottle> | Iterable<Bottle | PromiseLike<Bottle>>;
  function pipeline(iter: Inputs) {
    return ReadableStream.from(iter)
      .pipeThrough(putCapOn)
      .pipeThrough(tagBottle);
  }

  // Assembly line

  const inputSync = generateBottles(10);

  // 2. and you can pipe it to any iterable data structure out...
  const outputSync = new Map(await Array.fromAsync(pipeline(inputSync)));

  const inputAsync = generateBottlesAsync(10);

  // 3. even if the input iterable is async. The code is the exact same
  const outputAsync = new Map(await Array.fromAsync(pipeline(inputAsync)));

  // Tests for deep structural equality
  assertEquals(outputSync, outputAsync);
});

Each call to mapStream() represents a worker or assembly line step in your metaphor; they can do their job completely synchronously without ever having to worry about promises, Maps, Sets, or the inner workings of any data structure, as you pointed out in a later comment. To use your words, the pipeline works the same regardless of how we "muddy time and space". In fact, it works better than most naive implementations of the same concepts because the Streams API factors in the concept of backpressure and lock ownership. Backpressure asserts that if one step in the assembly line is slower than the others and becomes the bottleneck of the whole assembly line, no pun intended, then any earlier parts of the assembly line stop and wait for the slowest link to complete each task; lock ownership asserts that the stream itself is now the source of truth for the order in which each chunk is pumped through the pipeline, as no part of the code that does not own the lock to the stream may write to or read from the stream. This allows you to actually feed multiple data structures through the same pipeline one after another and collect their outputs into one singular data structure if you'd like. If my interpretation of your comments is correct, I believe the Streams API actually solves all of the problems you mentioned, from time/space independence to source of ordering truth.


I think in my opening comment on this issue I did not define my terms very succinctly, so allow me to do so here so we're all on the same page.

  • Streams API: the official Web API dedicated to the concept of streaming, most notably consisting of the following objects:
    • WritableStream: an object representing a stream sink, or the end of a pipeline
    • ReadableStream: an object representing a stream source, or the beginning of a pipeline
    • TransformStream: an object representing a pipe/conversion, or an intermediary of a pipeline
  • Iterable: any object conforming to the iterable protocol as officially defined
    • Promise concurrency methods: the four official static methods Promise.race(), Promise.any(), Promise.all(), and Promise.allSettled(), intended to be synchronization checkpoints for async code dealing with concurrency
    • Concurrent iterable:
      • an Iterable whose [@@iterator]().next().value property may be (and most likely will be) a promise when [@@iterator]().next().done is false or undefined
      • Represents a purposefully unordered collection of values or results of concurrent computations
      • Accepted by any of the promise concurrency methods
      • Can be converted to a Stream iterable
      • New concept introduced in this issue
  • Async iterable: any object conforming to the async iterable protocol as officially defined
    • Stream iterable:
      • An Async iterable representing an ordered sequence of values or computation results
      • Separate from the Streams API, though intended to interop closely with the Streams API
      • Analogous to "async generators" mentioned in README
      • New concept introduced in this issue

Hopefully this set of definitions should reign in the scope creep of this issue I accidentally introduced, my apologies for doing so. I'd like to keep this issue as generalized, as focused, and as relevant to all async iterators as possible.

@conartist6
Copy link

conartist6 commented May 11, 2024

The problem I was pointing to is generateBottlesAsync. Because it's an async generator you can't get a bottle synchronously. Instead you have to allow your call stack to unwind back to the event loop, process the contents of the event loop, rebuild the async stack trace, and then finally you can continue with a bottle.

If instead of bottle you have characters of text, a single chunk might be 64KB of text, or ~64,000 characters available synchronously. You might have 10 layers of stream processing using for .. await of loops (which wait twice per iteration) so that ends up being 65,535 (64KiB, a standard chunk size for read streams) times 10 times 2 which comes out to 1,310,700 trips to and from the event loop to rebuild the call stack, out of which 1,310,699 of those would be pointless as the data would be available synchronously.

@conartist6
Copy link

conartist6 commented May 11, 2024

It seems to me that when people say per-character iterators are too slow to process streams of text, really they're referring to the cost of literally millions of useless awaits and their bookkeeping. When you eliminate those costs, per-character stream processing actually has quite low overhead, I'm guessing something like 5% as opposed to more like 50% - 80% with iterators that await every character.

See: nodejs/node#31979

@ben-laird
Copy link
Author

ben-laird commented May 12, 2024

I see where you're coming from; having to rebuild the entire call stack just to resume execution can certainly add up over time, and employing a batching strategy to minimize giving back control to the event loop can save a lot in terms of performance. I do also agree that batching isn't the greatest when the data is available immediately for processing; for instance, it wouldn't be a great idea to batch a stream of characters into a string if you have the methods to operate on every character.

With that being said, I think you're missing the point of why I made this issue in the first place and why I named the concept of stream iterables that way. My ultimate goal in raising this issue is to split the concepts of concurrency and async iteration into two separate proposals, with helper methods for each, with the purposes of simplifying this proposal and creating better primitives for all JavaScript developers, not just the ones concerned with byte streams.

The problem I was pointing to is generateBottlesAsync. Because it's an async generator you can't get a bottle synchronously. Instead you have to allow your call stack to unwind back to the event loop, process the contents of the event loop, rebuild the async stack trace, and then finally you can continue with a bottle.

If I was writing an application, I'd fully agree with your pointing out that generateBottlesAsync() is inefficient; the bottle objects could be made immediately, and therefore they should be for all the reasons you've mentioned previously: releasing Zalgo, rebuilding call stack, everything. Since in this case generateBottlesAsync() was purely a mockup of some async generator doing fake work, I don't see the point of your critique.

My question is: if your output is ready right now and the next worker wants it right now, why does the postal service (event loop) need to be involved? It's a double whammy: it's creating needless costs, and it's centralizing them!

To answer your rhetorical question, it doesn't, and I agree with you that it doesn't. But what happens when your output is inherently asynchronous? How would you solve making a paginated database call of, say 1,000,000 records in 1,000-record pages, and transforming the results of the query? You wouldn't want to await the whole query and flatten the pages, because you end up losing performance waiting to operate on data you already have. However, you wouldn't want to lock obtaining every record behind an async iterator either for the exact same reason; don't return a promise to a part of the page when you already have the whole page, since doing so releases Zalgo.

As it stands currently, the solution would be to represent the results as an async iterator with each page represented as a sync iterator; that way you could rip through each record without handing back control to the event loop, but allowing other code to do its job while the database is chugging along. I don't believe that's a very good solution, however.

If your output is ready right now and the next worker wants it right now, why does the postal service (event loop) need to be involved?

This statement makes perfect sense in a synchronous environment, but what happens when the assumptions of that synchronous environment are violated? What if the output isn't ready right now, or the next worker doesn't want it right now? What happens in procedures like sending network requests, or making database calls, or even reading files? I'm assuming you don't use readFileSync() or any API's like that, since synchronously reading the whole thing feels like a waste of time and memory when you can operate on small chunks of data (or individual pieces of data) as soon as they come in.

Even worse, what do you do when you have to send multiple network requests in tandem, or make several database calls at once, or make one database call with a collection of pages? These operations are inherently asynchronous, so you have to wait for them in the first place and would swallow massive performance costs if you did nothing as you waited. You don't care about the order in which they settle, though, you just need the results. A sync iterator of non-promises doesn't work (it can't wait for things to settle), and neither does an async iterator (it artificially imposes an order that is hard to reason about). What we need is some helper methods to deal with these rather common cases of unordered concurrency.


I made this issue in the first place to introduce generic, powerful, understandable solutions to the theme behind the questions you've posed: don't let unnaturally-imposed encodings of data structures get in the way of operating on the data inside. The motivation behind your question of "why wait for chunks when pieces work" is why both the sync iterators proposal and this one are gaining such traction, because with them you no longer have to put the whole iterator into an array or Set or whatever else (incurring major space/time costs) just to access some very powerful methods for manipulating every piece of data. I raised this issue to take that same line of reasoning to the next level and introduce concurrency as a first-class primitive, with helpers of its own, so that regardless of how many async tasks are kicked off, developers can operate on all of them "at once" (albeit conceptually, as there's no real concept of threading in JS currently) without waiting for the waterfall effect that is async iterators. Because after all, why wait to operate on data you already have but can't access because of a stupid thing called ordering?

@laverdet
Copy link

A batch parser can be blazing fast in how quickly it carries out its computations and yet still be slower to produce a result than my stream parser, because my stream parser is actually doing the parsing work as the data arrives over the pipe.

In any given chunk the first byte, for all practical purposes, is available at the same time as the last byte. You can check out posix read(2) or the OSI layer 2 frame delivery mechanism for more information. It should not be the case where a stream sits on some data for a period of time before delivering it to the consumer.

It may be using more processing power to do the work

Yes you are creating more work.

@conartist6
Copy link

conartist6 commented May 13, 2024

I agree that concurrency primitives are good. In the example of transforming database records you need to load 100,000 pages (of 1000 records each), so you definitely want to apply some concurrency to ensure that you can request the next page while the local page is doing synchronous transformation on the last page of data it received. Presumably the database engine itself needs to generate page one of results before it knows what will be on page two.

I think you've laid out well the existing choices of iterator: an sync iterator is fast but forces you to stop the world when data is not available and effectively prevents parallelizing remote work. An async iterator doesn't need to stop the world, but its overhead means you probably need to batch items. The only reasonable choice is a mixture of the behaviors: in this example 1% async and 99% synchronous behavior (edit: technically 0.1%). You will almost certainly implement this by having an async iterator of sync iterators somewhere, and the async part will want concurrency to be efficient.

What I'm saying is that once you've done all that you should take the final step of flattening the iterator of pages into a single iterator over all the records, because otherwise you there is no single abstraction over the complete dataset. 1% of the time (including on the first step) this flattened iterator's next method can return Promise<{done, value}> and the rest of the time it will just return synchronously: {done, value}. When consumed correctly pauses, represented by promises, only happen when needed and the necessary waiting propagates through the system.

Here's a pretty good example of what this looks like in real usage: a regex engine which matches against a stream of input and produce a stream of results. Here's the code responsible for feeding the characters from the source iterator into the engine and emitting results:

https://github.com/iter-tools/regex/blob/a35a0259bf288ccece2a1f6ffebcfdb7ac953517/lib/index.js#L43-L75

Then there are two more copies of the same logic here (async of chrs) and here (async of chunks) -- just slightly different glue code for pinning the part of the engine that is implemented over individual characters to input and output.

Compare that to the BABLR fork of the same regex engine which uses my sometimes-async "stream iterators" for input and output and so only needs one copy of that logic. Now in the case where the input stream iterator never returns any promises -- say because the input was a string -- you'll also get your matches synchronously. All that you get for free thanks to these definitions.

@laverdet
Copy link

sync iterator is fast but forces you to stop the world when data is not available and effectively prevents parallelizing remote work

JavaScript does not allow you to stop the world.

async iterator doesn't need to stop the world, but its overhead means you probably need to batch items

AsyncIterable<Uint8Array> best represents what is coming in from the filesystem or network controller. You are allowed to decay the type into AsyncIterable<number> if you are inclined but you are throwing away some very efficient operations by doing so.

Compare that to the BABLR fork of the same regex engine which uses my sometimes-async "stream iterators" for input and output and so only needs one copy of that logic.

I think this is a rehash of the red / blue functions debacle, but for iterables? Using generator functions in a way to share logic between synchronous and asynchronous code has a rich history in JS. gensync did this for basic logic, and Effection can do it for iterables. Redux Saga called them "channels".

@conartist6
Copy link

@laverdet Performance is important but so is abstraction. I'm seeing the difference between having exactly one performant solution and being able to have a whole family of solutions that are all equally performant.

Why would it not be worthy of allowing the abstract case to also be fast? It feels strange to be told that I deserve this overhead in some way.

@conartist6
Copy link

conartist6 commented May 13, 2024

This is exactly what the Zalgo article was about. Here's the relevant quote (emphasis mine):

Avoid Synthetic Deferrals

I know what you’re thinking: “But you just told me to use nextTick!”

And yes, it’s true, you should use synthetic deferrals when the only alternative is releasing Zalgo. However, these synthetic deferrals should be treated as a code smell. They are a sign that your API might not be optimally designed.

Ideally, you should know whether something is going to be immediately available, or not. Realistically, you should be able to take a pretty decent guess about whether the result is going to be immediately available most of the time, or not, and then follow this handy guide:

If the result is usually available right now, and performance matters a lot:

Check if the result is available.
If it is, return it.
If it is not, return an error code, set a flag, etc.
Provide some Zalgo-safe mechanism for handling the error case and awaiting future availability.

@ben-laird
Copy link
Author

In all honesty, I'd appreciate it if we stop discussing things like byte streams and Uint8Arrays without linking them back to the wider concept of asynchronous iteration. This issue is about providing a collection of clear and generic methods to assist all JavaScript developers navigate the subtle distinction between ordered async iteration and unordered concurrency.

Both of you have clearly thought about this a lot, especially as a maintainer of iter-tools, which is cited as a prior art for this proposal; and as the creator of node-fibers, which is literally all about giving NodeJS coroutine support. I'd like to bring your minds, and the minds of everyone else in this proposal concerned with concurrency, together to create an elegant and rock-solid concurrency primitive that JavaScript devs will know when to use and will enjoy using.

When consumed correctly pauses, represented by promises, only happen when needed and the necessary waiting propagates through the system.

This right here is the purest essence of what I'm calling the concurrency primitive. This is exactly the concept that I'm proposing separate helper methods for! Async iteration can then focus on iterating asynchronously properly, and concurrent iteration can then focus on iterating concurrently properly, without being in AsyncIterable<T>'s shadow; async iteration then doesn't have to worry about concurrency, because that concept is handled by Iterable<T | Promise<T>>, its own primitive!

This lack of distinction between ordered async iteration and unordered concurrency is a genuine problem that developers have noticed too; it's not just out-of-touch academics splitting hairs. In this proposal alone, there are several issues asking what to do about concurrency and how exactly to implement it for async iterable helper methods. In this issue alone, we've gone back and forth about the benefits and drawbacks of iterating in different ways for different use cases. The Zalgo article touches on this lack of distinction too, all the way back in 2013! Wouldn't it be nice to have two whole suites of tools to do either for you, and the only choice you have to make to use the right toolchain is whether your data is ordered or not?

Notice, though, how I never once in this comment needed to appeal to concrete structures or procedures, like byte streams or database calls, to define the concept, only to provide examples of the concept and why such a concept is so important. This issue is not about how ordered async iteration and unordered concurrency works, it's about why we need a distinction between the two and what we need to make such a distinction and the concurrency primitive possible.

Examples of this concept, like Effection and Redux Saga and iter-tools, are great; articles about async iteration and concurrency, like Zalgo or red/blue functions, are also great. Getting into the weeds about streams of booleans or chunks of characters can often serve to derail the conversation, however. This issue is about providing a collection of clear and generic methods to assist all JavaScript developers navigate the subtle distinction between ordered async iteration and unordered concurrency, and I don't want things like byte streams and Uint8Arrays clouding the discussion about the need for a first-class concurrency primitive.

@conartist6
Copy link

conartist6 commented May 14, 2024

I suppose that works -- after all I do it. Iterable<T | Promise<T>> is the intermediate representation I turn into an iterator which may yield a promise of a step object.

I think the transformation I do is important though because if the asynchrony is only at the step.value level and not at the step level, there will never be a chance for an asynchronous data source to indicate to you that it is done. Having the outermost wrapper be a purely synchronous iterable means that it must be able to yield all its values to completion before anything deferred has a chance to happen. If the goal was to encapsulate a conversation with a database in which we were asynchronously receiving pages of data, the only way that the data stream itself can set step.done is if the generation of the step object can be deferred.

@conartist6
Copy link

conartist6 commented May 14, 2024

I would call a synchronous iterable of promises a "batch iterable" because it's a great protocol when you have some known number of batches and a pool of workers to process them.

To illustrate my point about why they don't work well for streams of data:

[...batchIterableFrom(stream)] // oops, lock-up

@ben-laird
Copy link
Author

ben-laird commented May 14, 2024

...if the asynchrony is only at the step.value level and not at the step level, there will never be a chance for an asynchronous data source to indicate to you that it is done. Having the outermost wrapper be a pure synchronous iterable means that it must be able to yield all its values to completion before anything deferred has a chance to happen.

Exactly! This fact right here is why I believe concurrency warrants its own primitive all to itself. There is a fundamental and subtle difference between ordered async iteration and unordered concurrency, and that difference is modeled as [@@iterator]().next().done vs. (await [@@asyncIterator]().next()).done. I point out the same problem you did in my opening comment:

With all that being said, there does not currently exist a method to convert a stream iterator to a concurrent one, and the secret is in the iteratorResult.done property. In concurrent iterables, this property is immediately available, telling the concurrency method of choice (or iterable adapter) when to stop iterating. In stream iterables, this property sits behind the same promise the iteratorResult.value property does, forcing the caller to await the whole expression before yielding the value and deciding whether it should continue iterating; this is how stream iterables enforce the ordering of their data, and this is precisely what makes stream iterables a strict superset of concurrent iterables. To convert a concurrent iterable to a stream iterable, all the above function did was impose an ordering, the ordering of which promises fulfilled fastest.

To be more precise than I was in that opening comment, there does not currently exist a method to convert a stream iterable to a concurrent one using the stream iterable alone (precisely for the reason you and I point out). There is a way if you also specify a chunk size and just take a chunk of the async iterable at a time: you call the stream iterable n times in rapid succession, somehow detect when the underlying stream iterable is done in order to close the concurrent iterable on top, and yield n promises of the values. I played around with a couple implementations of this concept back when I opened this issue, and all of them were pretty janky and I probably accidentally released Zalgo.

In the case of encapsulating conversation with a database, yes this concurrency primitive alone is probably not enough. However, that's why I call it a primitive: you can combine it with other things to make it fit your use case. An ORM library, for instance, could implement a getCursorConcurrent() method for a query that returns a Promise<{ pageCount: number, pages: Iterable<Promise<DbRecord[]>> }>; under the hood it would probably ask the database how many pages it plans to give and use that to tell when the concurrent iterable Iterable<Promise<DbRecord[]>> is done.

This is why primitives like this are so powerful: they apply to so many use cases and can combine with other building blocks of the language to create extremely efficient abstractions and representations. This is why the Signals proposal was even started, to make a robust reactive primitive that everyone can use right out of the box.

I would call a synchronous iterable of promises a "batch iterable" because it's a great protocol when you have some known number of batches and a pool of workers to process them.

It really is. I'm calling them concurrent iterables, but we can call them batch iterables too. They represent the same underlying concept: unordered concurrent asynchronous tasks.

To illustrate my point about why they don't work well for streams of data:

[...batchIterableFrom(stream)] // oops, lock-up

They most certainly don't work for streams of data, you're right! I'd argue they're the wrong primitive for streams of data or anything handled by the Streams API, because such things are ordered. Concurrent/batch iterables are intentionally unordered so that workers/function callers can process the data in any order they'd like.

@conartist6
Copy link

conartist6 commented May 14, 2024

Thanks for including the full quote, reading it now I think it's an excellent summary of the situation!

I do believe that even if you do allow the step object to be deferred you can still design helpers that propagate backpressure through the system though. Without backpressure you tend to get a ping-pong effect that's harmful for moving serious amounts of data: the producer produces, then halts while the consumer consumes, then the consumer halts and asks the producer for more -- essentially one side of the stream is always doing nothing, where really we want both the producer and consumer to be working all the time as fast as they can.

If backpressure isn't a concern, what is the benefit of being able to process the data in any order? If I make a request to a SQL-ish database it's usually the request itself that determines how the response will order items. If I specify no ordering, I should assume the database will produce results and return them in the order that is most efficient for it, despite the fact it is producing an implicitly ordered stream of results.

@conartist6
Copy link

conartist6 commented May 14, 2024

Specifically I'm referring here to the ability to call next() again even if the first call to next() has just given you a promise. On the outside you should get two promises and both should resolve at the correct times.

The real question is whether that backpressure propagates, and whether the producer is implemented in a way that can make use of it. The async helpers defined in this proposal already do propagate backpressure! A well-implemented producer like a database would see "hey, I have requests in my inbox for the next 5 pages of data, so when I finish generating page 1 I'll just go on to page 2 because I already know who is expecting that when it's done."

Now imagine your request was made to a sharded database: the db receiving the request splits it up into 5 smaller requests and sends those off to shards. If it has not made any guarantee about the ordering of items, it can simply claim that whatever response it gets first is "page 1", even if technically the first request it fired off was to shard 1 but the first response came from shard 3. The consumer will be none the wiser.

@ben-laird
Copy link
Author

I do believe that even if you do allow the step object to be deferred you can still design helpers that propagate backpressure through the system though.

...

The async helpers defined in this proposal already do propagate backpressure!

You're correct, backpressure is very important, and it's good that this proposal implements backpressure out of the box. However, backpressure is more of an async iteration/streaming thing, so if you need to employ backpressure for a concurrent iterable pipeline, I'd put the concurrent iterable into a ReadableStream or use the async iterator adapter I wrote in my opening comment:

Code

For finite concurrent iterators:

export async function* PromiseOrdered<T>(values: Promise<T>[]) {
  const pool = new Map(values.map((pr, idx) => index(idx, pr)));

  while (pool.size !== 0) {
    const [idx, res] = await Promise.race(pool.values());

    pool.delete(idx);

    yield res;
  }
}

For general concurrent iterators:

export type OrderParams<T> = [
  concurIter: ConcurrentIterable<T>,
  options: {
    lowWaterMark: number;
    highWaterMark: number;
  }
];

export async function* toStream<T>(...params: OrderParams<T>) {
  const [concurIter, { lowWaterMark, highWaterMark }] = params;

  // Use single reference to transformed iterator to prevent accidentally resetting it
  const iter = enumerateIndices(concurIter);

  const pool = new Map(take(iter, highWaterMark));

  while (pool.size !== 0) {
    const [idx, res] = await Promise.race(pool.values());

    pool.delete(idx);

    // Refill step
    if (pool.size <= lowWaterMark) {
      take(iter, highWaterMark - pool.size);
    }

    yield res;
  }
}

// Utilities

type ConcurrentIterable<T> = Iterable<Promise<T>>;

type Indexed<T> = [number, Promise<[number, T]>];

function* take<T>(iter: Iterable<T>, amount: number) {
  let idx = 0;

  for (const element of iter) {
    if (!(idx < amount)) return;

    yield element;

    idx++;
  }
}

function index<T>(idx: number, pr: Promise<T>): Indexed<T> {
  return [idx, pr.then((v) => [idx, v])];
}

function* enumerateIndices<T>(iter: ConcurrentIterable<T>) {
  let idx = 0;

  for (const element of iter) {
    yield index(idx, element);

    idx++;
  }
}

If backpressure isn't a concern, what is the benefit of being able to process the data in any order?

The benefit of a lack of order is that you can handle the data in any order without affecting the meaning of the structure. Your example actually illustrates this perfectly. If you specify no ordering, then the database is free to give you results as fast as it can produce them and is not held up by any one page taking longer than the others. A lack of order means there's no waterfall effect that takes place as with async iteration. To use your own words (which answer your question just as well):

Now imagine your request was made to a sharded database: the db receiving the request splits it up into 5 smaller requests and sends those off to shards. If it has not made any guarantee about the ordering of items, it can simply claim that whatever response it gets first is "page 1", even if technically the first request it fired off was to shard 1 but the first response came from shard 3. The consumer will be none the wiser.

To use the converse of your example, if the user did specify the order they'd like their pages in, and say shard 3 is running slow but it has page 1, you're now bottlenecked by shard 3, and the query can't really continue until shard 3 finishes up. Concurrent iteration, by abolishing an ordering conceptually, allows the db and the user to use whatever actual ordering they'd like, and neither will be none the wiser of the other's actual ordering, because they just don't care!

Appendix illustration of conceptual vs. actual ordering

To illustrate my point about conceptual ordering and actual ordering, a Map and a Set are both conceptually unordered; you just set and get elements, you don't insert them at a given location in between other elements. You can't say one set element is "greater than" another, nor can you sort a Map or Set, therefore there is no conceptual ordering. This is different from the actual order of the elements; any Map or Set methods not concerned with iteration produce the same results regardless of the actual ordering since there's no conceptual ordering of the contained data. A binary search tree, however, does have a conceptual ordering; Deno's standard library has a BinarySearchTree<T> that takes a comparator function to sort its underlying data.

@conartist6
Copy link

conartist6 commented May 15, 2024

Concurrent iteration, by abolishing an ordering conceptually, allows the db and the user to use whatever actual ordering they'd like, and neither will be none the wiser of the other's actual ordering, because they just don't care!

If request was for ordered data, someone somewhere needs to do ordering! You seem to explaining to me how an ordered request could receive an unordered response really fast.

If at the end of all this you are, for example, inserting the data into a binary search tree which imposes its own ordering, then you would not make a request for ordered data because you'd just be forcing some lower layer to cache data that really you could have used already

Also if the request was for ordered data, the sharded database won't be able to return raw shards out of order at all: first it will have to collate the records from separate shards in the requested order. Pages of the collated stream are what JS would see, and out-of-order transmission won't be useful on the collation result because it truly is a stream with conceptual ordering

@conartist6
Copy link

It also seems to me that the DB would be in the worst of all worlds, because it would have to do the complete query just to know how many pages it should tell the frontend to make requests for. Far from having the efficiency of streaming, from the DB perspective it would have to keep all complete result sets in memory from the time when it first responded with the page count to the time when it finished sending the last of those pages.

@conartist6
Copy link

conartist6 commented May 15, 2024

I'm reading your example code very carefully now. I've written out the code the way I see it (yours had errors like take being a generator function but not called like one), and I'm not seeing any reason that the functionality you're laying out in this code does anything other than create backpressure. It should work just as well on backpressure-supporting async iterables as it does on sync iterables of promises:

export async function* buffer(iterable, n = 4) {
  const asyncIter = getIterator(iterable);

  const pool = new Array(n);

  for (let idx = 0; idx < n; idx++) {
    pool[idx] = wrapWithIdx(asyncIter.next(), idx);
  }

  let outstandingRequests = n;

  while (outstandingRequests) {
    const step = await Promise.race(pool);

    if (!step.done) {
      const { 0: idx, 1: value } step.value;

      pool[idx] = wrapWithIdx(asyncIter.next(), idx);

      yield value;
    } else {
      outstandingRequests--; // no I have not tested this code, sorry
    }
  }
}

const getIterator = (iterable) => iterable[Symbol.asyncIterator]?.() || iterable[Symbol.iterator]?.();

const wrapWithIdx = (stepPromise, idx) => {
  return Promise.resolve(stepPromise).then((step) => {
    return step.done
      ? step
      : Promise.resolve(step.value).then(value => ({ done: false, value: [idx, value] }));
  });
}

Is this not entirely more general while still doing exactly what your propose? It should still work just the same way on a sync iterator of promises...

@ben-laird
Copy link
Author

ben-laird commented May 16, 2024

I think I accidentally threw you off track. Sorry, I should've been much clearer in my examples.

To use the converse of your example, if the user did specify the order they'd like their pages in, and say shard 3 is running slow but it has page 1, you're now bottlenecked by shard 3, and the query can't really continue until shard 3 finishes up. Concurrent iteration, by abolishing an ordering conceptually, allows the db and the user to use whatever actual ordering they'd like, and neither will be none the wiser of the other's actual ordering, because they just don't care!

If request was for ordered data, someone somewhere needs to do ordering! You seem to explaining to me how an ordered request could receive an unordered response really fast.

If the user did in fact request ordered data, then yes the database must order it as specified. I should've made it clear in my converse example that requesting ordered data using a concurrent iterable should be considered a logic error, and I should've separated my sentence about concurrent iteration abolishing conceptual ordering from the rest of the example.


If at the end of all this you are, for example, inserting the data into a binary search tree which imposes its own ordering, then you would not make a request for ordered data because you'd just be forcing some lower layer to cache data that really you could have used already

I agree, and I'd go further and say using an async iterable instead of a concurrent iterable (if a concurrent iterable representation is possible) when you intend to sort the data into a conceptually ordered data structure is bad design and probably also a full-blown logic error. The choice of iteration depends on where you'd like to do your sorting, if any. This goes back to what I said about having two distinct toolchains for two distinct sets of procedures:

Wouldn't it be nice to have two whole suites of tools to do either for you, and the only choice you have to make to use the right toolchain is whether your data is ordered or not?

If the data you're requesting from the db should be ordered by the db, then you should request it using an async/stream iterable, that way the ordering is enforced client-side and no other ordering may be imposed without introducing another data structure. If, on the other hand, you intend to sort the data yourself into some conceptually ordered data structure, then yes I agree that forcing the db to sort it only for you to re-sort it is a waste of time, and therefore you should request the data with a concurrent iterable.

To reiterate, working with ordered async data using a concurrent iterable, and working with unordered concurrent data using an async iterable, are both logic errors to me. If your async data is ordered, use an async iterable; if your async data is unordered, use a concurrent iterable. Converting a concurrent iterable to an async iterable, however, I believe is perfectly fine (and maybe even desirable depending on the use case); there does not currently exist a way to do the opposite.


I'm not seeing any reason that the functionality you're laying out in this code does anything other than create backpressure.

The main functionality I laid out in my snippet starts a collection of promises immediately and yields them as they are fulfilled, thus converting a concurrent iterator into an async one. I describe the process in more detail in the code snippet below. The pooling and backpressure features are just byproducts; I can't load an infinite concurrent iterator into memory all at once, so instead I load it in chunks and set the highWaterMark option to be very high to kickstart as many promises as possible. I would recommend using the finite array implementation whenever you know the concurrent iterator is finite, or modifying the finite array implementation to take an iterable and just putting it in the pool outright.

By the way, thank you for pointing out the bugs in my code. I found the one you pointed out (the results of the take() call in the refill step were never used) and I found one where the first take() call prematurely closes the enumerateIndices() iterator. I've fixed both of those and took the opportunity to add more documentation and the ability to not specify a lowWaterMark in the function options. My fixes are implemented below:

Fixed code
type OrderParams<T> = [
  concurIter: ConcurrentIterable<T>,
  options: {
    lowWaterMark?: number;
    highWaterMark: number;
  }
];

async function* orderConcurrent<T>(...params: OrderParams<T>) {
  // Normalize params, setting `lowWaterMark` equal to `highWaterMark` when no value
  // is provided to start refilling immediately after a promise is fulfilled
  const [concurIter, { highWaterMark, lowWaterMark = highWaterMark }] = params;

  // Use single reference to transformed iterator to prevent accidentally resetting it
  const iter = enumerateIndices(concurIter);

  // Start the first n = highWaterMark promises immediately.
  // Maps tend to perform better than regular arrays, and they
  // illustrate the lack of ordering of the pool better than arrays to me.
  // These are purely design decisions and probably don't severely affect perf.
  const pool = new Map(take(iter, highWaterMark));

  while (pool.size !== 0) {
    // Iterate through all the activated promises in
    // the pool and await the first one to fulfill
    const [idx, res] = await Promise.race(pool.values());

    // Take the fulfilled promise out of the running (Promise.race() will
    // always return non-promises and fulfilled promises first)
    pool.delete(idx);

    // Refill step.
    // Specifying `lowWaterMark` allows for starting new promises off in batches of
    // highWaterMark - pool.size as opposed to immediately when a promise fulfills
    if (pool.size <= lowWaterMark) {
      const remainingCapacity = highWaterMark - pool.size;

      // Actually use the results of the `take()` call
      for (const entry of take(iter, remainingCapacity)) {
        pool.set(...entry);
      }
    }

    yield res;
  }
}

// Utilities

type ConcurrentIterable<T> = Iterable<Promise<T>>;

/**
 * Advances an iterable a number of times and yields the
 * results of each advancement. When done, it does *not* close
 * the underlying iterator.
 * @param iterable The iterable to take from
 * @param amount The number of elements to take from the iterable
 * @returns An iterable of values taken from the underlying iterable
 */
function* take<T>(iterable: Iterable<T>, amount: number) {
  const iter = iterable[Symbol.iterator]();

  let idx = 0;

  while (idx < amount) {
    const res = iter.next();

    if (res.done) return;

    yield res.value;

    idx++;
  }
}

function* enumerateIndices<T>(iter: ConcurrentIterable<T>) {
  let idx = 0;

  for (const element of iter) {
    yield index(idx, element);

    idx++;
  }
}

function index<T>(idx: number, pr: Promise<T>): Indexed<T> {
  return [idx, pr.then((v) => [idx, v])];
}

type Indexed<T> = [number, Promise<[number, T]>];

To reiterate, the main feature is the fact that it starts those promises all at once, and it does not only create backpressure.

The idea of starting a collection of tasks all at once is also unique to concurrent iterators, so neither my current implementation nor the functionality it implements would carry over well to async iterators I think; the main issue with doing so is there is no feasible way in a concurrent pool/context to detect when the async iterable is done. Your current implementation, for instance, actually leaks promises when given a concurrent iterable, as you probably already figured out given your edits. The test snippet below demonstrates this:

Test snippet
import { delay } from "jsr:@std/async";

export async function* buffer(iterable, n = 4) {
  const asyncIter = getIterator(iterable);

  const pool = new Array(n);

  for (let idx = 0; idx < n; idx++) {
    pool[idx] = wrapWithIdx(asyncIter.next(), idx);
  }

  let outstandingRequests = n;

  while (outstandingRequests) {
    const step = await Promise.race(pool);

    if (!step.done) {
      const { 0: idx, 1: value } = step.value; // Changed here: missing equals sign

      pool[idx] = wrapWithIdx(asyncIter.next(), idx);

      yield value;
    } else {
      outstandingRequests--; // no I have not tested this code, sorry
    }
  }
}

const getIterator = (iterable) =>
  iterable[Symbol.asyncIterator]?.() || iterable[Symbol.iterator]?.();

const wrapWithIdx = (stepPromise, idx) => {
  return Promise.resolve(stepPromise).then((step) => {
    return step.done
      ? step
      : Promise.resolve(step.value).then((value) => ({
          done: false,
          value: [idx, value],
        }));
  });
};

// Testing

Deno.test("Async generation", async () => {
  async function* generateAsync(n = Number.POSITIVE_INFINITY) {
    let idx = 0;

    while (idx < n) {
      await delay(Math.random() * 500);

      yield "data";

      idx++;
    }
  }

  for await (const element of buffer(generateAsync(5))) {
    console.log(element);
  }
});

Deno.test("Concurrent generation", async () => {
  function* generateConcurrent(n = Number.POSITIVE_INFINITY) {
    let idx = 0;

    while (idx < n) {
      yield step();

      idx++;
    }

    async function step() {
      await delay(Math.random() * 500);

      return "data";
    }
  }

  for await (const element of buffer(generateConcurrent(5))) {
    console.log(element);
  }
});

Result of running deno test:

running 2 tests from ./test/lab.test.js
Async generation ...
------- output -------
data
data
data
data
data
----- output end -----
Async generation ... ok (1s)
Concurrent generation ...
------- output -------
data
data
----- output end -----
Concurrent generation ... FAILED (101ms)

 ERRORS

Concurrent generation => ./test/lab.test.js:59:6
error: Leaks detected:
  - 3 timers were started in this test, but never completed. This is often caused by not calling `clearTimeout`.
To get more details where leaks occurred, run again with the --trace-leaks flag.

 FAILURES

Concurrent generation => ./test/lab.test.js:59:6

FAILED | 1 passed | 1 failed | 14 filtered out (1s)

error: Test failed

Trying to concurrently execute an async iterable necessitates somehow converting it to a concurrent iterable I think, which we've already established isn't really possible.


Zooming back out to the big picture, these fundamental issues we're having with concurrent and async iterators I believe means the two types deserve different toolchains. That makes this proposal easier to deal with because it no longer needs to worry about concurrency, and it makes this new concurrent iterators proposal easier because it no longer needs to worry about async iterators.

That brings me to a question. Would you all want to be a part of putting this concurrent primitive idea before the whole team presenting this proposal? I'd very much appreciate all your help in doing so. I believe making this distinction clearer to JavaScript devs will allow for better innovations built upon simpler, easier-to-understand primitives, and I'd sincerely appreciate anyone willing to help.

@conartist6
Copy link

To reiterate, the main feature is the fact that it starts those promises all at once, and it does not only create backpressure.

This, then, is the source of the confusion. If you look carefully at my version of your code you will see that mine also starts the initial batch of promises all at once (synchronously) just the same as yours.

That is what backpressure means.

@conartist6
Copy link

I want to give this code example some more time and consideration because I'm sure I still don't completely understand everything that's going on and I don't yet have time to try everything I want to try.

@conartist6
Copy link

conartist6 commented May 19, 2024

Here's an updated version of the code I shared that you can now see working correctly: https://gist.github.com/conartist6/61217bd5e767c0ef843b0e267e29a0dc

This is still one step short of what I really want, which is the same but with an implementation that also does not defensively inject deferrals (as this implementation does)

@conartist6
Copy link

conartist6 commented May 19, 2024

I intend the code I shared to be the example that I think @ben-laird really wanted: true concurrent execution, including out-of-order delivery of results (while using a regular async iterator, which is how this code is different).

I'm still not sure how I feel about the idea in general. For this example code I could quite easily add an ordering guarantee for the result, but it would have some real algorithmic costs for more complex pipelines. That is to say, some time would potentially be wasted that the next stage of the processing pipeline could be using to do work on a step delivered out of order, so long as it has complete confidence that order was irrelevant

@bakkot
Copy link
Collaborator

bakkot commented May 21, 2024

Re: order, Rust has both buffered and bufferUnordered. bufferUnordered is like buffered but can yield results out of order.

This is often faster, when the underlying operator can yield promises which can settle out of order. But it can't give you out-of-order results for operators like filter, which is constrained to settle its (before-the-end) promises in order (since it doesn't know how many results there will be), and buffering can't help you there. See #7 for the possibility of having an "unordered filter" operator.

@laverdet
Copy link

@bakkot I know this is probably not the direction you'll want to take the proposal but I did want to share the opposite of divide (from our discussion in #4), collect. Both implementations can be found here: https://gist.github.com/laverdet/1b7d94943deb452437d485809db7580f

With these two primitives any combination of operations becomes concurrent and unordered. Even the most naive implementations of map, filter, reduce, etc can be made concurrent (though reduce has more interesting opportunities for concurrency e.g. for pure associative commutative operations). So you don't need separate unordered versions of any function, you can just drop into a context where you don't care about order:

async function *range(upTo: number) {
	for (let ii = 0; ii < upTo; ++ii) {
		yield ii;
	}
}

async function *filter<Type>(asyncIterable: AsyncIterable<Type>, predicate: (value: Type) => boolean | PromiseLike<unknown>): AsyncIterable<Type> {
	for await (const value of asyncIterable) {
		if (await predicate(value)) {
			yield value;
		}
	}
}

async function *map<Type, Result>(asyncIterable: AsyncIterable<Type>, callback: (value: Type) => Result | PromiseLike<Result>): AsyncIterable<Result> {
	for await (const value of asyncIterable) {
		yield callback(value);
	}
}

const waitVaried = async (value: number) => {
	const timeout = value % 2 ? 10 : 100;
	await new Promise(resolve => { setTimeout(resolve, timeout); });
	return value;
};

const filterEven = async (value: number) => {
	await new Promise(resolve => { setTimeout(resolve, Math.random() * 100); });
	return value % 2 === 0;
};

const iterable = range(1000);
const concurrency = 10;
const processed = collect(divide(iterable, concurrency).map(iterable => {
	const mapped = map(iterable, waitVaried);
	const filtered = filter(mapped, filterEven);
	return filtered;
}));

console.time();
console.log(await Array.fromAsync(processed));
console.timeEnd();

We would expect the series of operations to take 10.5s:
map: 500ms * 10 + 500ms * 100
filter: 1000ms * 50
total: 105,000ms / 10 = 10.5s

This is quicker than what the ordered version would take with bufferAhead(10). That's to be expected since every other invocation to waitVaried is slower which would reduce the effective concurrency of the ordered version by half.

[
    6,   2,   0,   8,   4,  10,  12,  16,  22,  18,  14,  26,
   30,  20,  24,  28,  32,  36,  34,  38,  44,  40,  42,  48,
   46,  50,  52,  54,  56,  58,  62,  60,  70,  68,  64,  66,
   74,  72,  78,  80,  76,  82,  86,  88,  84,  92,  90,  94,
  100, 102,  96,  98, 106, 104, 108, 112, 116, 114, 110, 120,
  118, 122, 124, 126, 130, 134, 128, 132, 140, 136, 138, 144,
  142, 146, 152, 154, 150, 148, 160, 156, 158, 162, 170, 164,
  172, 166, 168, 174, 176, 178, 180, 184, 186, 182, 188, 192,
  196, 198, 190, 200,
  ... 400 more items
]
default: 10.680s

@bakkot
Copy link
Collaborator

bakkot commented May 22, 2024

@laverdet I know you've seen it, but just to tie the threads together, I do think something like your collect is probably worth having. divide is definitely an interesting idea, but you're right that it's a pretty different direction. I think it's definitely not the right direction for the basic helpers, which are / ought to be order-preserving, but for cases where you don't care about order it's got some interesting tradeoffs relative to bufferUnordered and friends.

@conartist6
Copy link

conartist6 commented May 22, 2024

divide seems morally equivalent to unorderedBuffer. Well, plus the fact that it make many iterables out of one, so it really looks more like divide(n, unorderedBuffer(n, source))

@bakkot
Copy link
Collaborator

bakkot commented May 22, 2024

@conartist6 It's morally equivalent if you're just doing map, but consider the case of filter.

Suppose you have the following:

// first item takes one second, remainder settle instantly
let slowThenFast = {
  [Symbol.asyncIterator]() {
    let count = 0;
    return {
      next() {
        ++count;
        if (count === 1) {
          return new Promise(res => setTimeout(() => res({ done: false, value: 0 }), 1000));
        } else if (count > 10) {
          return Promise.resolve({ done: true });
        } else {
          return Promise.resolve({ done: false, value: count });
        }
      }
    }
  }
};

let evens = AsyncIterator.from(slowThenFast).filter(x => x % 2 === 0).bufferUnordered(5);
for await (let item of evens) {
  console.log(item);
}

There is no way to have this start printing anything before a full second has passed, because filter needs to know the result of the first promise from the underlying iterator in order to know whether the value === 2 case will end up being the first or second value. That is, filter is constrained to resolve its promises in order, which means that bufferUnordered doesn't let you actually start subsequent work any earlier than buffer. To do better than this, you need filter to know that you don't care about the order of the results (hence issue #7).

By contrast, with the divide/collect approach, you are basically creating multiple workers, and then doing work in each one. So you can start printing as soon as one worker is finished processing a result.

@conartist6
Copy link

Why would filter be constrained to resolve promises in order? Something like takeWhile, yes, that needs ordered results.

@bakkot
Copy link
Collaborator

bakkot commented May 22, 2024

Consider

let evens = AsyncIterator.from(slowThenFast).filter(x => x % 2 === 0).buffered(5);
for await (let item of evens) {
  console.log(item);
}

That needs to print 0, 2, 4, 6, 8, 10, in that order. In order to do that, the second promise needs to resolve with 2. But it's not possible to know whether 2 will be the second promise or the first until you know what the result of calling the predicate on the first item in the underlying iterator is, which is to say, until after the first result is ready.

@conartist6
Copy link

conartist6 commented May 23, 2024

@bakkot nothing about your example needs to be in order. It's certainly possible to produce the integers out of order, and it's equally possible to produce a correct-but-out-of-order set of evens out of a correct-but-out-of-order set of integers.

I would have tended more towards an example predicate like person => person.isAlive to make the (potential) irrelevance of ordering clearer

@bakkot
Copy link
Collaborator

bakkot commented May 23, 2024

Of course it is possible to do anything we want, but we should try to match user expectations, and I think the user would reasonably expect results to be ordered in my example. There is no way to tell, from this code, whether there is a dependence on the order being preserved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants