diff --git a/spec.bs b/spec.bs index 84503eb..616e996 100644 --- a/spec.bs +++ b/spec.bs @@ -827,9 +827,72 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w
flatMap(|mapper|)
method steps are:
- 1. TODO: Spec this and use |mapper|.
+ 1. Let |sourceObservable| be [=this=].
+
+ 1. Let |observable| be a [=new=] {{Observable}} whose [=Observable/subscribe callback=] is an algorithm that takes a {{Subscriber}} |subscriber| and does the following:
+
+ 1. Let |queue| be a new [=list=] to keep track of items emitted by the source observable that need to be processed.
+
+ 1. Let |activeInnerSubscription| be false, indicating whether an inner subscription is currently active.
+
+ 1. Let |sourceObserver| be a new [=internal observer=], initialized as follows:
+
+ : [=internal observer/next steps=]
+ :: 1. If |activeInnerSubscription| is true, then:
+
+ 1. [=list/Append=] the passed in value to |queue|.
+
+ 1. Return.
+
+ 1. Otherwise:
+
+ 1. Set |activeInnerSubscription| to true.
+
+ 1. Let |innerObservable| be the result of [=Invoke=] |mapper| with the passed in value.
+
+ If an exception |E| was thrown,
+ then run |subscriber|'s {{Subscriber/error()}} method, given |E|, and abort these
+ steps.
+
+ 1. Let |innerObserver| be a new [=internal observer=], initialized as follows:
+
+ : [=internal observer/next steps=]
+ :: Run |subscriber|'s {{Subscriber/next()}} method, given the passed in value.
+
+ : [=internal observer/error steps=]
+ :: Run |subscriber|'s {{Subscriber/error()}} method, given the passed in error.
+
+ : [=internal observer/complete steps=]
+ :: 1. If |queue| is not empty, then:
+
+ 1. Let |nextValue| be [=list/Remove=] the first item from |queue|.
+
+ 1. Repeat the steps above starting from the step where |innerObservable| is obtained by invoking |mapper| with |nextValue|.
+
+ 1. Otherwise:
+
+ 1. Set |activeInnerSubscription| to false.
+
+ 1. If |sourceObserver| has also completed, run |subscriber|'s {{Subscriber/complete()}} method.
+
+ 1. Let |innerOptions| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is |subscriber|'s [=Subscriber/signal=].
+
+ 1. Subscribe to |innerObservable| given |innerObserver| and |innerOptions|.
+
+ : [=internal observer/error steps=]
+ :: Run |subscriber|'s {{Subscriber/error()}} method, given the passed in error.
+
+ : [=internal observer/complete steps=]
+ :: If |activeInnerSubscription| is false, and |queue| is empty, run |subscriber|'s {{Subscriber/complete()}} method.
+
+ 1. Let |options| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is |subscriber|'s [=Subscriber/signal=].
+
+ 1. Subscribe to |sourceObservable| given |sourceObserver| and |options|.
+
+ 1. Return |observable|.
finally(|callback|)
method steps are: