From b993bb5d153020b7b1ad3a01eb0954ecc35753c9 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 21 Nov 2023 14:35:31 -0600 Subject: [PATCH 01/15] Adds explaination of `flatMap` semantics --- README.md | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/README.md b/README.md index 4a42a24..3610f07 100644 --- a/README.md +++ b/README.md @@ -507,6 +507,50 @@ We propose the following operators in addition to the `Observable` interface: - `finally()` - Like `Promise.finally()`, it takes a callback which gets fired after the observable completes in any way (`complete()`/`error()`) +- `flatMap()` + + - Similar to `Iterator.prototype.flatMap`, however, because the types are different, + there are some semantics to note. + + Given the following example: + +```ts +const result = source.flatMap((value) => getNextInnerObservable(value)); +``` + +1. When you subscribe to `result`, it subscribes to `source` immediately. +2. Let there be a queue of values that is empty. +3. Let there be an `innerSignal` that is either `undefined` or an `AbortSignal` +4. Let there be an `isSourceComplete` that is `false`. +5. When the `source` emits a value: + a. If `innerSignal` is `undefined` + 1. Call the mapping function with the the value. + 2. Then pass the return value of the mapping function to `Observable.from()` to convert it to + an "inner observable" if it's not already. + 3. Then create an `AbortSignal` that follows the subscriber's + signal, and set `innerSignal`. + 4. pass the `innerSignal` to the subscribe for the inner observable. + emit all values from the inner observable to the `result` observer. + 5. If the inner observable completes, check to see if there are any queued values. + a. If there is, take the first one from the queue, and move to 5.a.1. + b. If there's not + 1. If `isSourceComplete` is `true` + a. Complete `result`. + 2. If `isSourceComplete` is `false` + a. Move to 5 and wait. + 6. If the inner observable errors: + a. Forward the error to `result`. + b. If `innerSignal` is `AbortSignal` + 7. Add the value to the queue and wait. +6. If the `source` completes: + a. If `innerSignal` is `undefined` + 1. Complete `result`. + a. If `innerSignal` is `AbortSignal` + 1. set `isSourceComplete` to `true`. +7. If the `source` errors: + a. Forward the error to `result`. +8. If the user aborts the signal passed to the subscription of `result` + a. Abort any `innerSignal` that exists, and terminate subscription. Versions of the above are often present in userland implementations of observables as they are useful for observable-specific reasons, but in addition From a7191c1b55ddebcf379ea24559d25572d218e587 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 21 Nov 2023 14:46:46 -0600 Subject: [PATCH 02/15] fix formatting a bit --- README.md | 66 +++++++++++++++++++++++++++---------------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 3610f07..d297c41 100644 --- a/README.md +++ b/README.md @@ -518,39 +518,39 @@ We propose the following operators in addition to the `Observable` interface: const result = source.flatMap((value) => getNextInnerObservable(value)); ``` -1. When you subscribe to `result`, it subscribes to `source` immediately. -2. Let there be a queue of values that is empty. -3. Let there be an `innerSignal` that is either `undefined` or an `AbortSignal` -4. Let there be an `isSourceComplete` that is `false`. -5. When the `source` emits a value: - a. If `innerSignal` is `undefined` - 1. Call the mapping function with the the value. - 2. Then pass the return value of the mapping function to `Observable.from()` to convert it to - an "inner observable" if it's not already. - 3. Then create an `AbortSignal` that follows the subscriber's - signal, and set `innerSignal`. - 4. pass the `innerSignal` to the subscribe for the inner observable. - emit all values from the inner observable to the `result` observer. - 5. If the inner observable completes, check to see if there are any queued values. - a. If there is, take the first one from the queue, and move to 5.a.1. - b. If there's not - 1. If `isSourceComplete` is `true` - a. Complete `result`. - 2. If `isSourceComplete` is `false` - a. Move to 5 and wait. - 6. If the inner observable errors: - a. Forward the error to `result`. - b. If `innerSignal` is `AbortSignal` - 7. Add the value to the queue and wait. -6. If the `source` completes: - a. If `innerSignal` is `undefined` - 1. Complete `result`. - a. If `innerSignal` is `AbortSignal` - 1. set `isSourceComplete` to `true`. -7. If the `source` errors: - a. Forward the error to `result`. -8. If the user aborts the signal passed to the subscription of `result` - a. Abort any `innerSignal` that exists, and terminate subscription. +- When you subscribe to `result`, it subscribes to `source` immediately. +- Let there be a `queue` of values that is empty. +- Let there be an `innerSignal` that is either `undefined` or an `AbortSignal` +- Let there be an `isSourceComplete` that is `false`. +- When the `source` emits a value: + - If `innerSignal` is `undefined` + - Call the mapping function with the the value. + - Then pass the return value of the mapping function to `Observable.from()` to convert it to + "inner observable" if it's not already. + - Then create an `AbortSignal` that follows the subscriber's and set `innerSignal`. + - pass the `innerSignal` to the subscribe for the inner observable. + - Forward all values emitted by the inner observable to the `result` observer. + - If the inner observable completes + - If there are values in the `queue` + - take the first one from the `queue` and return the the mapping step. + - If the `queue` is empty + - If `isSourceComplete` is `true` + - Complete `result`. + - If `isSourceComplete` is `false` + - Wait + - If the inner observable errors + - Forward the error to `result`. + - If `innerSignal` is `AbortSignal` + - Add the value to the `queue` and wait. +- If the `source` completes: + - If `innerSignal` is `undefined` + - Complete `result`. + - If `innerSignal` is `AbortSignal` + - set `isSourceComplete` to `true`. +- If the `source` errors: + - Forward the error to `result`. +- If the user aborts the signal passed to the subscription of `result` + - Abort any `innerSignal` that exists, and terminate subscription. Versions of the above are often present in userland implementations of observables as they are useful for observable-specific reasons, but in addition From 3fa9c0d7f85b309abbd5e64fd6b13a430c3aac71 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 21 Nov 2023 14:50:43 -0600 Subject: [PATCH 03/15] Better organization and explanation --- README.md | 89 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index d297c41..3f0e09c 100644 --- a/README.md +++ b/README.md @@ -512,46 +512,6 @@ We propose the following operators in addition to the `Observable` interface: - Similar to `Iterator.prototype.flatMap`, however, because the types are different, there are some semantics to note. - Given the following example: - -```ts -const result = source.flatMap((value) => getNextInnerObservable(value)); -``` - -- When you subscribe to `result`, it subscribes to `source` immediately. -- Let there be a `queue` of values that is empty. -- Let there be an `innerSignal` that is either `undefined` or an `AbortSignal` -- Let there be an `isSourceComplete` that is `false`. -- When the `source` emits a value: - - If `innerSignal` is `undefined` - - Call the mapping function with the the value. - - Then pass the return value of the mapping function to `Observable.from()` to convert it to - "inner observable" if it's not already. - - Then create an `AbortSignal` that follows the subscriber's and set `innerSignal`. - - pass the `innerSignal` to the subscribe for the inner observable. - - Forward all values emitted by the inner observable to the `result` observer. - - If the inner observable completes - - If there are values in the `queue` - - take the first one from the `queue` and return the the mapping step. - - If the `queue` is empty - - If `isSourceComplete` is `true` - - Complete `result`. - - If `isSourceComplete` is `false` - - Wait - - If the inner observable errors - - Forward the error to `result`. - - If `innerSignal` is `AbortSignal` - - Add the value to the `queue` and wait. -- If the `source` completes: - - If `innerSignal` is `undefined` - - Complete `result`. - - If `innerSignal` is `AbortSignal` - - set `isSourceComplete` to `true`. -- If the `source` errors: - - Forward the error to `result`. -- If the user aborts the signal passed to the subscription of `result` - - Abort any `innerSignal` that exists, and terminate subscription. - Versions of the above are often present in userland implementations of observables as they are useful for observable-specific reasons, but in addition to these we offer a set of common operators that follow existing platform @@ -598,6 +558,55 @@ Promises whose scheduling differs from that of Observables, which sometimes means event handlers that call `e.preventDefault()` will run too late. See the [Concerns](#concerns) section which goes into more detail. +### flatMap semantics + +`flatMap` generally behaves like `Iterator.prototype.flatMap`, however, since it's push-based, +there can be a temporal element. Given that, it behaves much like RxJS's `concatMap`, which is +one of the most useful operators from the library. + +At a high-level, it subscribes to the source, and then maps to, and emits values from "inner +observables", one at a time, ensuring they're subscribed to in sequence. + +Given the following example: + +```ts +const result = source.flatMap((value) => getNextInnerObservable(value)); +``` + +- When you subscribe to `result`, it subscribes to `source` immediately. +- Let there be a `queue` of values that is empty. +- Let there be an `innerSignal` that is either `undefined` or an `AbortSignal` +- Let there be an `isSourceComplete` that is `false`. +- When the `source` emits a value: + - If `innerSignal` is `undefined` + - Call the mapping function with the the value. + - Then pass the return value of the mapping function to `Observable.from()` to convert it to + "inner observable" if it's not already. + - Then create an `AbortSignal` that follows the subscriber's and set `innerSignal`. + - pass the `innerSignal` to the subscribe for the inner observable. + - Forward all values emitted by the inner observable to the `result` observer. + - If the inner observable completes + - If there are values in the `queue` + - take the first one from the `queue` and return the the mapping step. + - If the `queue` is empty + - If `isSourceComplete` is `true` + - Complete `result`. + - If `isSourceComplete` is `false` + - Wait + - If the inner observable errors + - Forward the error to `result`. + - If `innerSignal` is `AbortSignal` + - Add the value to the `queue` and wait. +- If the `source` completes: + - If `innerSignal` is `undefined` + - Complete `result`. + - If `innerSignal` is `AbortSignal` + - set `isSourceComplete` to `true`. +- If the `source` errors: + - Forward the error to `result`. +- If the user aborts the signal passed to the subscription of `result` + - Abort any `innerSignal` that exists, and terminate subscription. + ## Background & landscape To illustrate how Observables fit into the current landscape of other reactive From bb15499361e1043d9d2ab1ec7ab7c54474fb119c Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 28 Nov 2023 18:09:21 -0600 Subject: [PATCH 04/15] Just have flatMap under iterator helpers --- README.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/README.md b/README.md index 3f0e09c..c0ab932 100644 --- a/README.md +++ b/README.md @@ -507,10 +507,6 @@ We propose the following operators in addition to the `Observable` interface: - `finally()` - Like `Promise.finally()`, it takes a callback which gets fired after the observable completes in any way (`complete()`/`error()`) -- `flatMap()` - - - Similar to `Iterator.prototype.flatMap`, however, because the types are different, - there are some semantics to note. Versions of the above are often present in userland implementations of observables as they are useful for observable-specific reasons, but in addition @@ -526,7 +522,7 @@ methods](https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype) to - `filter()` - `take()` - `drop()` -- `flatMap()` +- `flatMap()` (Because the types are different, there are [some semantics to note](#flatmap-semantics).) - `reduce()` - `toArray()` - `forEach()` From 521407b8c71c210426501e16964ffed44ab92882 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 19 Dec 2023 11:18:52 -0600 Subject: [PATCH 05/15] Add more detail to conditional Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c0ab932..94db680 100644 --- a/README.md +++ b/README.md @@ -591,7 +591,7 @@ const result = source.flatMap((value) => getNextInnerObservable(value)); - Wait - If the inner observable errors - Forward the error to `result`. - - If `innerSignal` is `AbortSignal` + - Otherwise, if `innerSignal` is `AbortSignal` - Add the value to the `queue` and wait. - If the `source` completes: - If `innerSignal` is `undefined` From ec753d98b86502626ce0b5e7a611c35363c5d7ba Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 19 Dec 2023 11:19:08 -0600 Subject: [PATCH 06/15] Correct punctuation Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 94db680..d56594c 100644 --- a/README.md +++ b/README.md @@ -579,7 +579,7 @@ const result = source.flatMap((value) => getNextInnerObservable(value)); - Then pass the return value of the mapping function to `Observable.from()` to convert it to "inner observable" if it's not already. - Then create an `AbortSignal` that follows the subscriber's and set `innerSignal`. - - pass the `innerSignal` to the subscribe for the inner observable. + - Pass the `innerSignal` to the subscribe for the inner observable. - Forward all values emitted by the inner observable to the `result` observer. - If the inner observable completes - If there are values in the `queue` From c7f3867342b41dffc3a361135dabc5cc92f888e6 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 19 Dec 2023 11:19:26 -0600 Subject: [PATCH 07/15] Correct capitalization Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d56594c..1d38354 100644 --- a/README.md +++ b/README.md @@ -583,7 +583,7 @@ const result = source.flatMap((value) => getNextInnerObservable(value)); - Forward all values emitted by the inner observable to the `result` observer. - If the inner observable completes - If there are values in the `queue` - - take the first one from the `queue` and return the the mapping step. + - Take the first one from the `queue` and return the the mapping step. - If the `queue` is empty - If `isSourceComplete` is `true` - Complete `result`. From 114ebd5680de07fee7ffe88897d1c6a3d11c3d03 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 19 Dec 2023 11:19:38 -0600 Subject: [PATCH 08/15] Correct capitalization Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1d38354..ed82639 100644 --- a/README.md +++ b/README.md @@ -597,7 +597,7 @@ const result = source.flatMap((value) => getNextInnerObservable(value)); - If `innerSignal` is `undefined` - Complete `result`. - If `innerSignal` is `AbortSignal` - - set `isSourceComplete` to `true`. + - Set `isSourceComplete` to `true`. - If the `source` errors: - Forward the error to `result`. - If the user aborts the signal passed to the subscription of `result` From 31e791a3329a4dc1c8d6ed43668a18507205ca49 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 19 Dec 2023 11:24:27 -0600 Subject: [PATCH 09/15] More descriptive grammar Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ed82639..4c7f491 100644 --- a/README.md +++ b/README.md @@ -578,7 +578,7 @@ const result = source.flatMap((value) => getNextInnerObservable(value)); - Call the mapping function with the the value. - Then pass the return value of the mapping function to `Observable.from()` to convert it to "inner observable" if it's not already. - - Then create an `AbortSignal` that follows the subscriber's and set `innerSignal`. + - Then create an `AbortSignal` that is dependent on the subscriber's and set `innerSignal`. - Pass the `innerSignal` to the subscribe for the inner observable. - Forward all values emitted by the inner observable to the `result` observer. - If the inner observable completes From 00e07ffc3a297b8a49a8003f3aa09e576acd0f5c Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 19 Dec 2023 11:29:02 -0600 Subject: [PATCH 10/15] Add more detail --- README.md | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 4c7f491..cea0084 100644 --- a/README.md +++ b/README.md @@ -566,31 +566,41 @@ observables", one at a time, ensuring they're subscribed to in sequence. Given the following example: ```ts -const result = source.flatMap((value) => getNextInnerObservable(value)); +const result = source.flatMap((value, index) => + getNextInnerObservable(value, index), +); ``` +- `flatMap` is a method on `Observable` that is called with a `mapping function`, which takes a + value from the source observable, and a zero-based counter (or "index") of that value, and + returns a value that can be converted to an observable with `Observable.from`. `flatMap` returns + an obeservable we'll call `result`. - When you subscribe to `result`, it subscribes to `source` immediately. - Let there be a `queue` of values that is empty. -- Let there be an `innerSignal` that is either `undefined` or an `AbortSignal` +- Let there be an integer `current index` that is `0`. +- Let there be an `innerSignal` that is either `undefined` or an `AbortSignal`. - Let there be an `isSourceComplete` that is `false`. -- When the `source` emits a value: +- When the `source` emits a `value`: - If `innerSignal` is `undefined` - - Call the mapping function with the the value. - - Then pass the return value of the mapping function to `Observable.from()` to convert it to - "inner observable" if it's not already. - - Then create an `AbortSignal` that is dependent on the subscriber's and set `innerSignal`. - - Pass the `innerSignal` to the subscribe for the inner observable. - - Forward all values emitted by the inner observable to the `result` observer. - - If the inner observable completes + - Begin **"mapping step"**: + - Copy the `current index` into an `index` variable. + - Increment the `current index`. + - Call the `mapping function` with the the `value` and the `index`. + - Then pass the return value of the mapping function to `Observable.from()` to convert it to + "inner observable" if it's not already. + - Then create an `AbortSignal` that is dependent on the subscriber's and set `innerSignal`. + - Pass the `innerSignal` to the subscribe for the inner observable. + - Forward all values emitted by the inner observable to the `result` observer. + - If the inner observable completes - If there are values in the `queue` - - Take the first one from the `queue` and return the the mapping step. + - Take the first one from the `queue` and return to the **"mapping step"**. - If the `queue` is empty - If `isSourceComplete` is `true` - - Complete `result`. + - Complete `result`. - If `isSourceComplete` is `false` - - Wait - - If the inner observable errors - - Forward the error to `result`. + - Wait + - If the inner observable errors + - Forward the error to `result`. - Otherwise, if `innerSignal` is `AbortSignal` - Add the value to the `queue` and wait. - If the `source` completes: From b1f3b1f2d699b7a5a912d83af5bc8ef6c36a4d46 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 5 Feb 2024 11:40:59 -0600 Subject: [PATCH 11/15] Add a missing `an`. Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index cea0084..c38e01f 100644 --- a/README.md +++ b/README.md @@ -601,7 +601,7 @@ const result = source.flatMap((value, index) => - Wait - If the inner observable errors - Forward the error to `result`. - - Otherwise, if `innerSignal` is `AbortSignal` + - Otherwise, if `innerSignal` is an `AbortSignal` - Add the value to the `queue` and wait. - If the `source` completes: - If `innerSignal` is `undefined` From 1f7146ee1e9a52d583a13aaf79da082869bbd1be Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 5 Feb 2024 11:41:19 -0600 Subject: [PATCH 12/15] Add missing `;` Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c38e01f..5dccffa 100644 --- a/README.md +++ b/README.md @@ -591,7 +591,7 @@ const result = source.flatMap((value, index) => - Then create an `AbortSignal` that is dependent on the subscriber's and set `innerSignal`. - Pass the `innerSignal` to the subscribe for the inner observable. - Forward all values emitted by the inner observable to the `result` observer. - - If the inner observable completes + - If the inner observable completes; - If there are values in the `queue` - Take the first one from the `queue` and return to the **"mapping step"**. - If the `queue` is empty From f29432dd7f7e555f2cfefb0d87a974d15afa0494 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 5 Feb 2024 11:42:06 -0600 Subject: [PATCH 13/15] Clarify inner subscription in `flatMap` algorithm Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5dccffa..4632876 100644 --- a/README.md +++ b/README.md @@ -589,7 +589,7 @@ const result = source.flatMap((value, index) => - Then pass the return value of the mapping function to `Observable.from()` to convert it to "inner observable" if it's not already. - Then create an `AbortSignal` that is dependent on the subscriber's and set `innerSignal`. - - Pass the `innerSignal` to the subscribe for the inner observable. + - Subscribe to the inner observable, passing `innerSignal` - Forward all values emitted by the inner observable to the `result` observer. - If the inner observable completes; - If there are values in the `queue` From 63102e45e23ed32faa365b2a9d13b617190302e4 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 5 Feb 2024 11:45:23 -0600 Subject: [PATCH 14/15] Add detail. `isSourceComplete` is a `boolean`. Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4632876..adb9770 100644 --- a/README.md +++ b/README.md @@ -579,7 +579,7 @@ const result = source.flatMap((value, index) => - Let there be a `queue` of values that is empty. - Let there be an integer `current index` that is `0`. - Let there be an `innerSignal` that is either `undefined` or an `AbortSignal`. -- Let there be an `isSourceComplete` that is `false`. +- Let there be a boolean `isSourceComplete` that is `false`. - When the `source` emits a `value`: - If `innerSignal` is `undefined` - Begin **"mapping step"**: From 3fab15964be8afce04baece731cd61774899af19 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 5 Feb 2024 11:45:42 -0600 Subject: [PATCH 15/15] Fix typo Co-authored-by: Dominic Farolino --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index adb9770..0716e82 100644 --- a/README.md +++ b/README.md @@ -585,7 +585,7 @@ const result = source.flatMap((value, index) => - Begin **"mapping step"**: - Copy the `current index` into an `index` variable. - Increment the `current index`. - - Call the `mapping function` with the the `value` and the `index`. + - Call the `mapping function` with `value` and `index`. - Then pass the return value of the mapping function to `Observable.from()` to convert it to "inner observable" if it's not already. - Then create an `AbortSignal` that is dependent on the subscriber's and set `innerSignal`.