From 2102ed406ea787011a9ce665c837917808e68253 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 5 Mar 2024 10:42:45 -0600 Subject: [PATCH 1/3] First try at adding a flatMap spec --- spec.bs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/spec.bs b/spec.bs index 84503eb..3f409bc 100644 --- a/spec.bs +++ b/spec.bs @@ -827,9 +827,69 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w
The flatMap(|mapper|) method steps are: - 1. TODO: Spec this and use |mapper|. + 1. Let |sourceObservable| be [=this=]. + + 1. Let |observable| be a [=new=] {{Observable}} whose [=Observable/subscribe callback=] is an algorithm that takes a {{Subscriber}} |subscriber| and does the following: + + 1. Let |queue| be a new [=list=] to keep track of items emitted by the source observable that need to be processed. + + 1. Let |activeInnerSubscription| be false, indicating whether an inner subscription is currently active. + + 1. Let |sourceObserver| be a new [=internal observer=], initialized as follows: + + : [=internal observer/next steps=] + :: 1. If |activeInnerSubscription| is true, then: + + :: 1. [=list/Append=] the passed in value to |queue|. + + 1. Return. + + 1. Otherwise: + + :: 1. Set |activeInnerSubscription| to true. + + 1. Let |innerObservable| be the result of [=Invoke=] |mapper| with the passed in value. + + - If an exception |E| was thrown, then run |subscriber|'s {{Subscriber/error()}} method, given |E|, and return. + + 1. Let |innerObserver| be a new [=internal observer=], initialized as follows: + : [=internal observer/next steps=] + :: Run |subscriber|'s {{Subscriber/next()}} method, given the passed in value. + + : [=internal observer/error steps=] + :: Run |subscriber|'s {{Subscriber/error()}} method, given the passed in error. + + : [=internal observer/complete steps=] + :: 1. If |queue| is not empty, then: + + :: 1. Let |nextValue| be [=list/Remove=] the first item from |queue|. + + 1. Repeat the steps above starting from the step where |innerObservable| is obtained by invoking |mapper| with |nextValue|. + + 1. Otherwise: + + :: 1. Set |activeInnerSubscription| to false. + + 1. If |sourceObserver| has also completed, run |subscriber|'s {{Subscriber/complete()}} method. + + 1. Let |innerOptions| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is |subscriber|'s [=Subscriber/signal=]. + + 1. Subscribe to |innerObservable| given |innerObserver| and |innerOptions|. + + : [=internal observer/error steps=] + :: Run |subscriber|'s {{Subscriber/error()}} method, given the passed in error. + + : [=internal observer/complete steps=] + :: If |activeInnerSubscription| is false, and |queue| is empty, run |subscriber|'s {{Subscriber/complete()}} method. + + 1. Let |options| be a new {{SubscribeOptions}} whose {{SubscribeOptions/signal}} is |subscriber|'s [=Subscriber/signal=]. + + 1. Subscribe to |sourceObservable| given |sourceObserver| and |options|. + + 1. Return |observable|.
+
The finally(|callback|) method steps are: From a8281166763e502136d810a4481ceeb3f7496187 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 5 Mar 2024 11:51:36 -0600 Subject: [PATCH 2/3] fix bad detail markup --- spec.bs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec.bs b/spec.bs index 3f409bc..0b78feb 100644 --- a/spec.bs +++ b/spec.bs @@ -840,13 +840,13 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w : [=internal observer/next steps=] :: 1. If |activeInnerSubscription| is true, then: - :: 1. [=list/Append=] the passed in value to |queue|. + 1. [=list/Append=] the passed in value to |queue|. 1. Return. 1. Otherwise: - :: 1. Set |activeInnerSubscription| to true. + 1. Set |activeInnerSubscription| to true. 1. Let |innerObservable| be the result of [=Invoke=] |mapper| with the passed in value. @@ -862,13 +862,13 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w : [=internal observer/complete steps=] :: 1. If |queue| is not empty, then: - :: 1. Let |nextValue| be [=list/Remove=] the first item from |queue|. + 1. Let |nextValue| be [=list/Remove=] the first item from |queue|. 1. Repeat the steps above starting from the step where |innerObservable| is obtained by invoking |mapper| with |nextValue|. 1. Otherwise: - :: 1. Set |activeInnerSubscription| to false. + 1. Set |activeInnerSubscription| to false. 1. If |sourceObserver| has also completed, run |subscriber|'s {{Subscriber/complete()}} method. From bb9de2ac87f9e97570e6d7214614b226b5dca05c Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 5 Mar 2024 12:02:11 -0600 Subject: [PATCH 3/3] Try again to fix the bikeshed bug? --- spec.bs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/spec.bs b/spec.bs index 0b78feb..616e996 100644 --- a/spec.bs +++ b/spec.bs @@ -849,10 +849,13 @@ For now, see [https://github.com/wicg/observable#operators](https://github.com/w 1. Set |activeInnerSubscription| to true. 1. Let |innerObservable| be the result of [=Invoke=] |mapper| with the passed in value. - - - If an exception |E| was thrown, then run |subscriber|'s {{Subscriber/error()}} method, given |E|, and return. + + If an exception |E| was thrown, + then run |subscriber|'s {{Subscriber/error()}} method, given |E|, and abort these + steps. 1. Let |innerObserver| be a new [=internal observer=], initialized as follows: + : [=internal observer/next steps=] :: Run |subscriber|'s {{Subscriber/next()}} method, given the passed in value.