Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spec the flatMap operator (attempt 2) #124

Merged
merged 7 commits into from
Mar 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 129 additions & 1 deletion spec.bs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,135 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w
<div algorithm>
The <dfn for=Observable method><code>flatMap(|mapper|)</code></dfn> method steps are:

1. <span class=XXX>TODO: Spec this and use |mapper|.</span>
1. Let |sourceObservable| be [=this=].

1. Let |observable| be a [=new=] {{Observable}} whose [=Observable/subscribe callback=] is an
algorithm that takes a {{Subscriber}} |subscriber| and does the following:

1. Let |outerSubscriptionHasCompleted| to a [=boolean=], initially false.

1. Let |queue| be a new [=list=] of {{any}} values, initially empty.

Note: This |queue| is used to store any {{Observable}}s emitted by |sourceObservable|,
while |observable| is currently subscribed to an {{Observable}} emitted earlier by
|sourceObservable| that has not yet been exhausted.

1. Let |activeInnerSubscription| be a [=boolean=], initially false.

1. Let |sourceObserver| be a new [=internal observer=], initialized as follows:

: [=internal observer/next steps=]
:: 1. If |activeInnerSubscription| is true, then:

1. [=list/Append=] |value| to |queue|.

Note: This |value| will eventually be processed once the {{Observable}} that is
currently subscribed-to (as indicated by |activeInnerSubscription|) is exhausted.

1. Otherwise:

1. Set |activeInnerSubscription| to true.

1. Run the [=flatmap process next value steps=] with |value|, |subscriber|,
|mapper|, and <b>references</b> to all of the following: |queue|,
|activeInnerSubscription|, and |outerSubscriptionHasCompleted|.

<div class=note>
<p>Note: This [=flatmap process next value steps=] will subscribe to the
{{Observable}} derived from |value| (if one such can be derived) and keep
processing values from it until its subscription becomes inactive (either by
error or completion). If this "inner" {{Observable}} completes, then the
processing steps will recursively invoke themselves with the next {{any}} in
|queue|.</p>

<p>If no such value [=list/exists=], then the processing steps will terminate,
<b>unsetting</b> |activeInnerSubscription|, so that future values emitted from
|sourceObservable| are processed correctly.</p>
</div>

: [=internal observer/error steps=]
:: Run |subscriber|'s {{Subscriber/error()}} method, given the passed in <var
ignore>error</var>.

: [=internal observer/complete steps=]
:: 1. Set |outerSubscriptionHasCompleted| to true.

Note: If |activeInnerSubscription| is true, then the below step will *not* complete
|subscriber|. In that case, the [=flatmap process next value steps=] will be
responsible for completing |subscriber| when |queue| is [=list/empty=], after the
"inner" subscription becomes inactive.

1. If |activeInnerSubscription| is false and |queue| is [=list/empty=], run
|subscriber|'s {{Subscriber/complete()}} method.

1. Let |options| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is
|subscriber|'s [=Subscriber/signal=].

1. <a for=Observable lt="subscribe to an Observable">Subscribe</a> to |sourceObservable|
given |sourceObserver| and |options|.

1. Return |observable|.
</div>

<div algorithm>
The <dfn>flatmap process next value steps</dfn>, given an {{any}} |value|, a {{Subscriber}}
|subscriber|, a {{Mapper}} |mapper|, and <b>references</b> to all of the following: a [=list=] of
{{any}} values |queue|, a [=boolean=] |activeInnerSubscription|, and a [=boolean=]
|outerSubscriptionHasCompleted|:

1. Let |mappedResult| be the result of [=invoking=] |mapper| with |value|.

If <a spec=webidl lt="an exception was thrown">an exception |E| was thrown</a>, then run
|subscriber|'s {{Subscriber/error()}} method, given |E|, and abort these steps.

1. Let |innerObservable| be the result of calling {{Observable/from()}} with |mappedResult|.

If <a spec=webidl lt="an exception was thrown">an exception |E| was thrown</a>, then run
|subscriber|'s {{Subscriber/error()}} method, given |E|, and abort these steps.

Issue: We shouldn't invoke {{Observable/from()}} directly. Rather, we should
call some internal algorithm that passes-back the exceptions for us to handle
properly here, since we want to pipe them to |subscriber|.

1. Let |innerObserver| be a new [=internal observer=], initialized as follows:

: [=internal observer/next steps=]
:: Run |subscriber|'s {{Subscriber/next()}} method, given the passed in |value|.

: [=internal observer/error steps=]
:: Run |subscriber|'s {{Subscriber/error()}} method, given the passed in <var
ignore>error</var>.

: [=internal observer/complete steps=]
:: 1. If |queue| is not empty, then:

1. Let |nextValue| be the first item in |queue|; [=list/remove=] remove this item from
|queue|.

1. Run [=flatmap process next value steps=] given |nextValue|, |subscriber|, |mapper|,
and <b>references</b> to |queue| and |activeInnerSubscription|.

1. Otherwise:

1. Set |activeInnerSubscription| to false.

Note: Because |activeInnerSubscription| is a reference, this has the effect of
ensuring that all subsequent values emitted from the "outer" {{Observable}} (called
<var ignore>sourceObservable</var>.

1. If |outerSubscriptionHasCompleted| is true, run |subscriber|'s
{{Subscriber/complete()}} method.

Note: This means the "outer" {{Observable}} has already completed, but did not
proceed to complete |subscriber| yet because there was at least one more pending
"inner" {{Observable}} (i.e., |innerObservable|) that had already been queued and
had not yet completed. Until right now!

1. Let |innerOptions| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is
|subscriber|'s [=Subscriber/signal=].

1. <a for=Observable lt="subscribe to an Observable">Subscribe</a> to |innerObservable| given
|innerObserver| and |innerOptions|.
</div>

<div algorithm>
Expand Down
Loading