Skip to content
This repository has been archived by the owner on Oct 8, 2024. It is now read-only.

flatMap with concurrency parameter #254

Closed
mpodwysocki opened this issue Dec 1, 2022 · 4 comments
Closed

flatMap with concurrency parameter #254

mpodwysocki opened this issue Dec 1, 2022 · 4 comments
Labels
good follow-on proposal this would be good but doesn't need to be in the first milestone

Comments

@mpodwysocki
Copy link

In IxJS, we have a notion of both concatMap and flatMap, where concatMap is a simple cartesian product just as flatMap is with Iterable. This allows the AsyncIterable to be able to use its natural concurrency at its heart to produce a sequence of sequences.

function flatMap<TSource, TResult>(
  selector: FlattenConcurrentSelector<TSource, TResult>,
  concurrent = Infinity,
  thisArg?: any
): AsyncIterable<TResult>

Note that this is hard to bake in after the fact once shipped. The concatMap can then be modeled with concatMap(fn, 1, thisArg).

@ljharb
Copy link
Member

ljharb commented Dec 1, 2022

The proposal achieved stage 3 at today’s meeting, so i doubt this is a change that would made at this point. A follow on proposal could explore it.

@mpodwysocki
Copy link
Author

@ljharb You're probably right in that much of the AsyncIterator convenience methods already lack cancellation semantics that IxJS currently has via AbortSignal and AbortController that it would be worth following up with another proposal since an async operation without cancellation semantics and concurrency parameters isn't ideal.

@bakkot bakkot added the good follow-on proposal this would be good but doesn't need to be in the first milestone label Dec 1, 2022
@bakkot
Copy link
Collaborator

bakkot commented Dec 1, 2022

Regarding cancellation, you may be interested in #164, which would not in itself solve the problem but would provide a convenient place to trigger controller.abort() when finished with an async (or sync) iterator.

@devsnek devsnek closed this as completed Dec 31, 2022
@bakkot
Copy link
Collaborator

bakkot commented Jan 31, 2023

@mpodwysocki I have been thinking about this more in the context of #262.

I agree it's kind of hard to add "flatMap where the underlying iterator is polled multiple times before waiting for any individual inner iterator to complete" after the fact (except by adding a different method with a different name). On the other hand, with the .bufferAhead(N) helper I'm thinking of in that issue, you could do something like

asyncIter
  .map(fnProducingAsyncIters)
  .bufferAhead(2) // eagerly poll from the outer iterator up to two times
  .flatMap(x => x) // flatten
  .bufferAhead(3) // eagerly poll from the current inner iterator up to three times

without needing to introduce an extra concurrency parameter to flatMap - you get it for free from the bufferAhead helper (which would just call .next() multiple times and buffer the results). It's a little more awkward than just doing .flatMap(fnProducingAsyncIters) directly, but it does allow you to get both kinds of concurrency that you might be after.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
good follow-on proposal this would be good but doesn't need to be in the first milestone
Projects
None yet
Development

No branches or pull requests

4 participants