-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AsyncIterator.race/merge #15
Comments
Yep, if you're calling out existing implementations I'lll toss in mine: https://github.com/iter-tools/iter-tools/blob/d7.5/API.md#asyncinterleaveready |
@conartist6 this implementation does not forward terminations correctly. See: import { asyncInterleaveReady } from "iter-tools";
async function* range(count) {
console.log("start");
try {
for (let ii = 0; ii < count; ++ii) {
yield ii;
}
} finally {
console.log("done");
}
}
try {
for await (const ii of range(2)) {
console.log("here", ii);
throw 1;
}
} catch {}
console.log("---");
try {
for await (const ii of asyncInterleaveReady(range(2))) {
console.log("here", ii);
throw 1;
}
} catch {} Logs:
|
As for implementations, this one should fulfill all the requirements:
(although it does suffer from the potential memory leaks of |
Ooh, thanks for the bug report, I'll fix that. |
This is the one we use. It avoids the race leak and terminates on closure. Not sure if a fast iterable will starve the others, that wasn't a design goal. The specification should probably take a position on whether or not an iterator which throws should "cut the line" and resolve before other iterators which incremented normally. export function collect<Type>(iterables: readonly AsyncIterable<Type>[]): AsyncIterable<Type> {
switch (iterables.length) {
case 0: return async function*() {}();
case 1: return iterables[0]!;
}
return async function*() {
type Accept = () => Type;
let count = iterables.length;
let capability: PromiseCapability<Accept | null> | undefined;
const iterators: AsyncIterator<Type>[] = [];
const queue: Accept[] = [];
const accept = async (iterator: AsyncIterator<Type>) => {
try {
const next = await iterator.next();
if (next.done) {
if (--count === 0 && capability !== undefined) {
capability.resolve(null);
}
} else {
push(() => {
void accept(iterator);
return next.value;
});
}
} catch (error) {
push(() => { throw error; });
}
};
const push = (accept: Accept) => {
if (capability === undefined) {
queue.push(accept);
} else {
capability.resolve(accept);
}
};
try {
// Begin all iterators
for (const iterable of iterables) {
const iterator = iterable[Symbol.asyncIterator]();
iterators.push(iterator);
void accept(iterator);
}
// Delegate to iterables as results complete
while (true) {
while (true) {
const next = queue.shift();
if (next === undefined) {
break;
} else {
yield next();
}
}
if (count === 0) {
break;
} else {
capability = Promise.withResolvers();
const next = await capability.promise;
if (next === null) {
break;
} else {
capability = undefined;
yield next();
}
}
}
} catch (err) {
// Unwind remaining iterators on failure
try {
await Promise.all(iterators.map(iterator => iterator.return?.()));
} catch {}
throw err;
}
}();
} |
Oh yeah, and I believe that all implementations shared here don't fire off a next() call immediately after the previous result resolves. What we're all doing is buffering 1 result from each iterable in a preamble, but we don't continue to buffer a next result as the results come in. So you may end up with a waterfall after the first result, in the case that the consumer is asynchronous. |
Thanks for the links! I didn't mention it in the OP, but there's another requirement (or at least something to think through), which is that ideally when the consumer calls That is, suppose you have As mentioned in the readme, async iterators which support multiple calls to |
Some more prior art in this library. Also this one, which links to a bunch more. |
Wanted to write this down so I don't forget, though it won't be in the first version of this proposal.
We should have a helper for merging or racing multiple AsyncIterators. When
next
is first called you pull from all of them, and then resolve with the first promise to settle. Whennext
is called again, if any of the previously-pulled promises have already settled you immediately settle with that; otherwise you pull from all the underlying iterators which you aren't currently waiting on and resolve with the first to settle.RxJS has approximately this (apparently reasonably popular), as do some Rust libraries.
This essay points out that this is the type-theory "sum" to
zip
's "product", for async iterators. That is, it's the natural extension ofPromise.race
, wherezip
is the natural extension ofPromise.all
.The async-std library in Rust takes the interesting approach of randomizing the order it polls the underlying streams. I'm guessing that's mostly for the case where the first stream makes all of its results available immediately, which would prevent ever getting values from the second stream even if its results were also available immediately. I don't think that's relevant here since we can (and must) hold on to results from previous calls which haven't yet been merged in, and so can ensure that in in the case of two async iterators which vended immediately-settling promises we'd alternate between them.
The text was updated successfully, but these errors were encountered: