From 6b71d64dfd3d3e1efc808786a93dd46c32c54d1c Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 1 Aug 2023 14:30:22 -0500 Subject: [PATCH 01/11] Create initial reference implementation Just a spike in TypeScript. Will add tests in a followup, add compilation to JavaScript, etc. --- impl/observable.ts | 496 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 496 insertions(+) create mode 100644 impl/observable.ts diff --git a/impl/observable.ts b/impl/observable.ts new file mode 100644 index 0000000..7276107 --- /dev/null +++ b/impl/observable.ts @@ -0,0 +1,496 @@ +/** +MIT License + +Copyright (c) 2023 Ben Lesh , Domenic Farolino + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +export interface Observer { + next(value: T): void; + error(error: any): void; + complete(): void; +} + +export interface SubscriptionOptions extends Partial> { + signal?: AbortSignal; +} + +export class Observable { + constructor(private start: (subscriber: Subscriber) => void) { } + + subscribe(options?: SubscriptionOptions) { + const observer: Observer = options + ? typeof options === 'function' + ? { next: options, error: reportError, complete: noop } + : typeof options.next === 'function' && + typeof options.error === 'function' && + typeof options.complete === 'function' + ? (options as Observer) + : { + next: (value) => options.next?.(value), + error: (error) => { + if (options.error) { + options.error(error); + } else { + reportError(error); + } + }, + complete: () => options.complete?.(), + } + : { + next: noop, + error: reportError, + complete: noop, + }; + + const subscriber = new Subscriber(options?.signal, observer); + + this.start(subscriber); + } + + forEach( + callback: (value: T) => void, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options?.signal); + + this.subscribe({ + next: (value) => { + try { + callback(value); + } catch (error) { + reject(error); + ac.abort(); + } + }, + error: reject, + complete: resolve, + signal: ac.signal, + }); + }); + } + + map(project: (value: T, index: number) => R): Observable { + return new Observable((destination) => { + let index = 0; + + this.subscribe({ + next(value) { + let result: R; + try { + result = project(value, index++); + } catch (error) { + destination.error(error); + return; + } + destination.next(result); + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + signal: destination.signal, + }); + }); + } + + filter(predicate: (value: T, index: number) => boolean): Observable { + return new Observable((destination) => { + let index = 0; + + this.subscribe({ + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + destination.error(error); + return; + } + if (result) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + signal: destination.signal, + }); + }); + } + + take(count: number): Observable { + return new Observable((destination) => { + let remaining = count; + + this.subscribe({ + next(value) { + remaining--; + if (remaining >= 0) { + destination.next(value); + } + if (remaining === 0) { + destination.complete(); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + signal: destination.signal, + }); + }); + } + + drop(count: number): Observable { + return new Observable((destination) => { + let seen = 0; + + this.subscribe({ + next(value) { + seen++; + if (seen > count) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + signal: destination.signal, + }); + }); + } + + flatMap( + project: (value: T, index: number) => Observable + ): Observable { + return new Observable((destination) => { + let queue: T[] = []; + let index = 0; + let innerAC: AbortController | undefined; + let outerComplete = false; + + const startInner = (value: T) => { + innerAC = new AbortController(); + maybeFollowSignal(innerAC, destination.signal); + + let innerObservable: Observable; + try { + innerObservable = project(value, index++); + } catch (error) { + destination.error(error); + return; + } + + innerObservable.subscribe({ + next(innerValue) { + destination.next(innerValue); + }, + error(error) { + destination.error(error); + }, + complete() { + innerAC = undefined; + if (queue.length > 0) { + startInner(queue.shift()!); + } else if (outerComplete) { + destination.complete(); + } + }, + signal: innerAC.signal, + }); + }; + + this.subscribe({ + next(value) { + if (innerAC) { + queue.push(value); + } else { + startInner(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (queue.length === 0) { + destination.complete(); + } + }, + signal: destination.signal, + }); + }); + } + + takeUntil(notifier: Observable): Observable { + return new Observable((destination) => { + notifier.subscribe({ + next() { + destination.complete(); + }, + error(error) { + destination.error(error); + }, + signal: destination.signal, + }); + this.subscribe(destination); + }); + } + + every( + predicate: (value: T, index: number) => boolean, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options.signal); + let index = 0; + this.subscribe({ + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (!result) { + resolve(false); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(true); + }, + signal: ac.signal, + }); + }); + } + + some( + predicate: (value: T, index: number) => boolean, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options.signal); + let index = 0; + this.subscribe({ + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(true); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(false); + }, + signal: ac.signal, + }); + }); + } + + find( + predicate: (value: T, index: number) => boolean, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options.signal); + let index = 0; + this.subscribe({ + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(value); + ac.abort(); + } + }, + error: reject, + complete() { + // TODO: Figure out the proper semantics here. + reject(new Error('Value not found')); + }, + signal: ac.signal, + }); + }); + } + + reduce( + reducer: (accumulated: S, value: T, index: number) => S, + initialValue?: S, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options.signal); + let hasState = arguments.length >= 2; + let state = initialValue; + let index = 0; + this.subscribe({ + next(value) { + if (hasState) { + try { + state = reducer(state, value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + } else { + state = value as any; + } + }, + error: reject, + complete() { + resolve(state); + }, + signal: ac.signal, + }); + }); + } + + toArray(options?: { signal?: AbortSignal }): Promise { + return this.reduce( + (arr, value) => { + arr.push(value); + return arr; + }, + [], + options + ); + } +} + +class Subscriber implements Observer { + #stopped = false; + + #abortController = new AbortController(); + + get signal() { + return this.#abortController.signal; + } + + get closed() { + return this.#stopped || this.#abortController.signal.aborted; + } + + constructor( + private _signal: AbortSignal | undefined, + private _observer: Observer + ) { + maybeFollowSignal(this.#abortController, this._signal); + } + + next(value: T): void { + if (!this.closed) { + this._observer.next(value); + } + } + + error(error: any): void { + if (!this.closed) { + this.#stopped = true; + this._observer.error(error); + this.#abortController.abort(); + } + } + + complete(): void { + if (!this.closed) { + this.#stopped = true; + this._observer.complete(); + this.#abortController.abort(); + } + } +} + +function noop() { } + +function maybeFollowSignal( + abortController: AbortController, + signal: AbortSignal | undefined +) { + if (signal) { + const parentAbortHandler = () => { + abortController.abort(); + }; + signal.addEventListener('abort', parentAbortHandler, { + once: true, + }); + abortController.signal.addEventListener('abort', () => { + signal.removeEventListener('abort', parentAbortHandler); + }); + } +} + +function polyfill() { + const proto = EventTarget.prototype as any; + if (typeof proto.on !== 'function') { + proto.on = function (type: Parameters[0]) { + return new Observable((subscriber) => { + this.addEventListener( + type, + (event) => { + subscriber.next(event); + }, + { signal: subscriber.signal } + ); + }); + }; + } + + if (typeof globalThis.Observable !== 'function') { + (globalThis as any).Observable = Observable; + } +} + +polyfill(); From 580a6812906a33b97f5e8bd7f5bf16f92a8baf01 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 9 Nov 2023 17:44:33 -0600 Subject: [PATCH 02/11] chore: Update reference impl - moves license to LICENSE.txt - Updates reference impl to reflect proposed changes so far - Adds catch, finally, do, and switchMap --- LICENSE.txt | 21 + impl/observable.ts | 1141 ++++++++++++++++++++++++++------------------ 2 files changed, 709 insertions(+), 453 deletions(-) create mode 100644 LICENSE.txt diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..2b54eb8 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 Ben Lesh , Domenic Farolino + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/impl/observable.ts b/impl/observable.ts index 7276107..3e3c5a6 100644 --- a/impl/observable.ts +++ b/impl/observable.ts @@ -1,496 +1,731 @@ -/** -MIT License - -Copyright (c) 2023 Ben Lesh , Domenic Farolino - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -*/ - export interface Observer { - next(value: T): void; - error(error: any): void; - complete(): void; + next(value: T): void; + error(error: any): void; + complete(): void; } -export interface SubscriptionOptions extends Partial> { - signal?: AbortSignal; +export interface SubscriptionOptions { + signal?: AbortSignal; } -export class Observable { - constructor(private start: (subscriber: Subscriber) => void) { } - - subscribe(options?: SubscriptionOptions) { - const observer: Observer = options - ? typeof options === 'function' - ? { next: options, error: reportError, complete: noop } - : typeof options.next === 'function' && - typeof options.error === 'function' && - typeof options.complete === 'function' - ? (options as Observer) - : { - next: (value) => options.next?.(value), - error: (error) => { - if (options.error) { - options.error(error); - } else { - reportError(error); - } - }, - complete: () => options.complete?.(), - } - : { - next: noop, - error: reportError, - complete: noop, - }; - - const subscriber = new Subscriber(options?.signal, observer); - - this.start(subscriber); - } +type ConvertableToObservable = + | Observable + | PromiseLike + | Iterable + | AsyncIterable; - forEach( - callback: (value: T) => void, - options?: { signal?: AbortSignal } - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - maybeFollowSignal(ac, options?.signal); - - this.subscribe({ - next: (value) => { - try { - callback(value); - } catch (error) { - reject(error); - ac.abort(); - } - }, - error: reject, - complete: resolve, - signal: ac.signal, - }); - }); +export class Observable { + static from(value: ConvertableToObservable) { + if (value instanceof Observable) { + return value; } - map(project: (value: T, index: number) => R): Observable { - return new Observable((destination) => { - let index = 0; - - this.subscribe({ - next(value) { - let result: R; - try { - result = project(value, index++); - } catch (error) { - destination.error(error); - return; - } - destination.next(result); - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - signal: destination.signal, - }); - }); + if (isPromiseLike(value)) { + return new Observable((subscriber) => { + value.then( + (value) => { + subscriber.next(value); + subscriber.complete(); + }, + (error) => { + subscriber.error(error); + } + ); + }); } - filter(predicate: (value: T, index: number) => boolean): Observable { - return new Observable((destination) => { - let index = 0; - - this.subscribe({ - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - destination.error(error); - return; - } - if (result) { - destination.next(value); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - signal: destination.signal, - }); - }); + if (Symbol.asyncIterator in value) { + return new Observable(async (subscriber) => { + try { + for await (const v of value) { + subscriber.next(v); + } + subscriber.complete(); + } catch (error) { + subscriber.error(error); + } + }); } - take(count: number): Observable { - return new Observable((destination) => { - let remaining = count; - - this.subscribe({ - next(value) { - remaining--; - if (remaining >= 0) { - destination.next(value); - } - if (remaining === 0) { - destination.complete(); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - signal: destination.signal, - }); - }); + if (Symbol.iterator in value) { + return new Observable((subscriber) => { + try { + for (const v of value) { + if (subscriber.isActive) subscriber.next(v); + } + subscriber.complete(); + } catch (error) { + subscriber.error(error); + } + }); } - drop(count: number): Observable { - return new Observable((destination) => { - let seen = 0; + throw new TypeError("Value is not observable"); + } + + constructor(private start: (subscriber: Subscriber) => void) {} + + subscribe(onNext: (value: T) => void, options?: SubscriptionOptions): void; + subscribe(observer: Partial>, options?: SubscriptionOptions); + subscribe( + fnOrObserver: ((value: T) => void) | Partial>, + options?: SubscriptionOptions + ) { + const partialObserver = + typeof fnOrObserver === "function" + ? { next: fnOrObserver } + : fnOrObserver; + + const observer = { + next: noop, + error: reportError, + complete: noop, + ...partialObserver, + }; + + const subscriber = new Subscriber(options?.signal, observer); + + this.start(subscriber); + } + + forEach( + callback: (value: T) => void, + options?: SubscriptionOptions + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + + this.subscribe( + { + next: (value) => { + try { + callback(value); + } catch (error) { + reject(error); + ac.abort(); + } + }, + error: reject, + complete: resolve, + }, + { + signal, + } + ); + }); + } + + map(project: (value: T, index: number) => R): Observable { + return new Observable((destination) => { + let index = 0; + + this.subscribe( + { + next(value) { + let result: R; + try { + result = project(value, index++); + } catch (error) { + destination.error(error); + return; + } + destination.next(result); + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + + filter(predicate: (value: T, index: number) => boolean): Observable { + return new Observable((destination) => { + let index = 0; + + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + destination.error(error); + return; + } + if (result) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + + take(count: number): Observable { + return new Observable((destination) => { + let remaining = count; + + this.subscribe( + { + next(value) { + remaining--; + if (remaining >= 0) { + destination.next(value); + } + if (remaining === 0) { + destination.complete(); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + + drop(count: number): Observable { + return new Observable((destination) => { + let seen = 0; + + this.subscribe( + { + next(value) { + seen++; + if (seen > count) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + + flatMap( + project: (value: T, index: number) => ConvertableToObservable + ): Observable { + return new Observable((destination) => { + let queue: T[] = []; + let index = 0; + let innerAC: AbortController | undefined; + let outerComplete = false; + + const startInner = (value: T) => { + innerAC = new AbortController(); + + const signal = useSignalAll([innerAC.signal, destination.signal]); + + let innerObservable: Observable; + try { + innerObservable = Observable.from(project(value, index++)); + } catch (error) { + destination.error(error); + return; + } - this.subscribe({ - next(value) { - seen++; - if (seen > count) { - destination.next(value); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - signal: destination.signal, + innerObservable.subscribe( + { + next(innerValue) { + destination.next(innerValue); + }, + error(error) { + destination.error(error); + }, + complete() { + innerAC = undefined; + if (queue.length > 0) { + startInner(queue.shift()!); + } else if (outerComplete) { + destination.complete(); + } + }, + }, + { + signal, + } + ); + }; + + this.subscribe( + { + next(value) { + if (innerAC) { + queue.push(value); + } else { + startInner(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (queue.length === 0) { + destination.complete(); + } + }, + }, + { + signal: destination.signal, + } + ); + }); + } + + takeUntil(notifier: ConvertableToObservable): Observable { + return new Observable((destination) => { + Observable.from(notifier).subscribe( + { + next() { + destination.complete(); + }, + error(error) { + destination.error(error); + }, + }, + { + signal: destination.signal, + } + ); + this.subscribe(destination, { signal: destination.signal }); + }); + } + + every( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (!result) { + resolve(false); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(true); + }, + }, + { + signal, + } + ); + }); + } + + some( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(true); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(false); + }, + }, + { + signal, + } + ); + }); + } + + find( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(value); + ac.abort(); + } + }, + error: reject, + complete() { + // TODO: Figure out the proper semantics here. + reject(new Error("Value not found")); + }, + }, + { + signal, + } + ); + }); + } + + reduce( + reducer: (accumulated: S, value: T, index: number) => S, + initialValue?: S, + options?: SubscriptionOptions + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + + let hasState = arguments.length >= 2; + let state = initialValue; + let index = 0; + this.subscribe( + { + next(value) { + if (hasState) { + try { + state = reducer(state!, value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + } else { + state = value as any; + } + hasState = true; + }, + error: reject, + complete() { + resolve(state!); + }, + }, + { + signal, + } + ); + }); + } + + toArray(options?: SubscriptionOptions): Promise { + return this.reduce( + (arr, value) => { + arr.push(value); + return arr; + }, + [] as T[], + options + ); + } + + catch( + handleError: (error: unknown) => ConvertableToObservable + ): Observable { + return new Observable((destination) => { + this.subscribe( + { + next(value) { + destination.next(value); + }, + error(error) { + Observable.from(handleError(error)).subscribe(destination, { + signal: destination.signal, }); - }); - } - - flatMap( - project: (value: T, index: number) => Observable - ): Observable { - return new Observable((destination) => { - let queue: T[] = []; - let index = 0; - let innerAC: AbortController | undefined; - let outerComplete = false; - - const startInner = (value: T) => { - innerAC = new AbortController(); - maybeFollowSignal(innerAC, destination.signal); - - let innerObservable: Observable; - try { - innerObservable = project(value, index++); - } catch (error) { - destination.error(error); - return; - } - - innerObservable.subscribe({ - next(innerValue) { - destination.next(innerValue); - }, - error(error) { - destination.error(error); - }, - complete() { - innerAC = undefined; - if (queue.length > 0) { - startInner(queue.shift()!); - } else if (outerComplete) { - destination.complete(); - } - }, - signal: innerAC.signal, - }); - }; - - this.subscribe({ - next(value) { - if (innerAC) { - queue.push(value); - } else { - startInner(value); - } + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + + finally(onFinalize: () => void): Observable { + return new Observable((destination) => { + destination.addTeardown(onFinalize); + this.subscribe(destination, { + signal: destination.signal, + }); + }); + } + + switchMap( + project: (value: T, index: number) => ConvertableToObservable + ): Observable { + return new Observable((destination) => { + let index = 0; + let outerComplete = false; + let innerAC: AbortController | undefined; + this.subscribe( + { + next: (value) => { + innerAC?.abort(); + innerAC = new AbortController(); + const signal = useSignalAll([innerAC.signal, destination.signal]); + let innerObservable: Observable; + try { + innerObservable = Observable.from(project(value, index++)); + } catch (error) { + destination.error(error); + return; + } + + innerObservable.subscribe( + { + next(innerValue) { + destination.next(innerValue); }, error(error) { - destination.error(error); + destination.error(error); }, complete() { - outerComplete = true; - if (queue.length === 0) { - destination.complete(); - } - }, - signal: destination.signal, - }); - }); - } - - takeUntil(notifier: Observable): Observable { - return new Observable((destination) => { - notifier.subscribe({ - next() { + innerAC = undefined; + if (outerComplete) { destination.complete(); + } }, - error(error) { - destination.error(error); - }, - signal: destination.signal, - }); - this.subscribe(destination); - }); - } - - every( - predicate: (value: T, index: number) => boolean, - options?: { signal?: AbortSignal } - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - maybeFollowSignal(ac, options.signal); - let index = 0; - this.subscribe({ - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (!result) { - resolve(false); - ac.abort(); - } - }, - error: reject, - complete() { - resolve(true); - }, - signal: ac.signal, - }); - }); - } - - some( - predicate: (value: T, index: number) => boolean, - options?: { signal?: AbortSignal } - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - maybeFollowSignal(ac, options.signal); - let index = 0; - this.subscribe({ - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (result) { - resolve(true); - ac.abort(); - } - }, - error: reject, - complete() { - resolve(false); - }, - signal: ac.signal, - }); - }); - } - - find( - predicate: (value: T, index: number) => boolean, - options?: { signal?: AbortSignal } - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - maybeFollowSignal(ac, options.signal); - let index = 0; - this.subscribe({ - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (result) { - resolve(value); - ac.abort(); - } - }, - error: reject, - complete() { - // TODO: Figure out the proper semantics here. - reject(new Error('Value not found')); - }, - signal: ac.signal, - }); - }); - } - - reduce( - reducer: (accumulated: S, value: T, index: number) => S, - initialValue?: S, - options?: { signal?: AbortSignal } - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - maybeFollowSignal(ac, options.signal); - let hasState = arguments.length >= 2; - let state = initialValue; - let index = 0; - this.subscribe({ - next(value) { - if (hasState) { - try { - state = reducer(state, value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - } else { - state = value as any; - } - }, - error: reject, - complete() { - resolve(state); - }, - signal: ac.signal, - }); - }); - } - - toArray(options?: { signal?: AbortSignal }): Promise { - return this.reduce( - (arr, value) => { - arr.push(value); - return arr; - }, - [], - options - ); - } + }, + { + signal, + } + ); + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (!innerAC) { + destination.complete(); + } + }, + }, + { + signal: destination.signal, + } + ); + }); + } + + do(fnOrObserver: ((value: T) => void) | Partial>): Observable { + return new Observable((destination) => { + const doObserver: Partial> = + typeof fnOrObserver === "function" + ? { next: fnOrObserver } + : fnOrObserver; + this.subscribe( + { + next(value) { + doObserver.next?.(value); + destination.next(value); + }, + error(error) { + doObserver.error?.(error); + destination.error(error); + }, + complete() { + doObserver.complete?.(); + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } } class Subscriber implements Observer { - #stopped = false; + #active = true; - #abortController = new AbortController(); + #abortController = new AbortController(); + #signal: AbortSignal; - get signal() { - return this.#abortController.signal; - } + get signal() { + return this.#signal; + } - get closed() { - return this.#stopped || this.#abortController.signal.aborted; - } + get isActive() { + return this.#active && !this.signal.aborted; + } - constructor( - private _signal: AbortSignal | undefined, - private _observer: Observer - ) { - maybeFollowSignal(this.#abortController, this._signal); - } + constructor(signal: AbortSignal | undefined, private _observer: Observer) { + const ownSignal = this.#abortController.signal; + this.#signal = signal ? useSignalAll([signal, ownSignal]) : ownSignal; + } - next(value: T): void { - if (!this.closed) { - this._observer.next(value); - } + next(value: T): void { + if (this.isActive) { + this._observer.next(value); } + } - error(error: any): void { - if (!this.closed) { - this.#stopped = true; - this._observer.error(error); - this.#abortController.abort(); - } + error(error: any): void { + if (this.isActive) { + this.#active = false; + this._observer.error(error); + this.#abortController.abort(); } + } - complete(): void { - if (!this.closed) { - this.#stopped = true; - this._observer.complete(); - this.#abortController.abort(); - } + complete(): void { + if (this.isActive) { + this.#active = false; + this._observer.complete(); + this.#abortController.abort(); } -} - -function noop() { } - -function maybeFollowSignal( - abortController: AbortController, - signal: AbortSignal | undefined -) { - if (signal) { - const parentAbortHandler = () => { - abortController.abort(); - }; - signal.addEventListener('abort', parentAbortHandler, { - once: true, - }); - abortController.signal.addEventListener('abort', () => { - signal.removeEventListener('abort', parentAbortHandler); - }); + } + + addTeardown(teardown: () => void) { + if (this.isActive) { + this.#abortController.signal.addEventListener("abort", teardown, { + once: true, + }); + } else { + teardown(); } + } + + removeTeardown(teardown: () => void) { + this.#abortController.signal.removeEventListener("abort", teardown); + } } +function noop() {} + function polyfill() { - const proto = EventTarget.prototype as any; - if (typeof proto.on !== 'function') { - proto.on = function (type: Parameters[0]) { - return new Observable((subscriber) => { - this.addEventListener( - type, - (event) => { - subscriber.next(event); - }, - { signal: subscriber.signal } - ); - }); - }; - } + const proto = EventTarget.prototype as any; + if (typeof proto.on !== "function") { + proto.on = function (type: Parameters[0]) { + return new Observable((subscriber) => { + this.addEventListener( + type, + (event) => { + subscriber.next(event); + }, + { signal: subscriber.signal } + ); + }); + }; + } - if (typeof globalThis.Observable !== 'function') { - (globalThis as any).Observable = Observable; - } + if (typeof globalThis.Observable !== "function") { + (globalThis as any).Observable = Observable; + } } polyfill(); + +function useSignalAll(signals: AbortSignal[]): AbortSignal { + if (typeof (AbortSignal as any).all === "function") { + return (AbortSignal as any).all(signals); + } else { + const ac = new AbortController(); + const handleAbort = () => { + ac.abort(); + for (const signal of signals) { + signal.removeEventListener("abort", handleAbort); + } + }; + for (const signal of signals) { + signal.addEventListener("abort", handleAbort); + } + return ac.signal; + } +} + +function isPromiseLike(value: any): value is PromiseLike { + return ( + value instanceof Promise || + (typeof value === "object" && + value !== null && + typeof value.then === "function") + ); +} From 032e31182992d4e5024f849af430300e3eabb47e Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 9 Nov 2023 18:06:08 -0600 Subject: [PATCH 03/11] stub in async iteration --- impl/observable.ts | 91 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/impl/observable.ts b/impl/observable.ts index 3e3c5a6..0bbddac 100644 --- a/impl/observable.ts +++ b/impl/observable.ts @@ -620,6 +620,97 @@ export class Observable { ); }); } + + [Symbol.asyncIterator](): AsyncGenerator { + let ac: AbortController | undefined; + let deferred: [(value: IteratorResult) => void, (error: any) => void][] = + []; + let buffer: T[] = []; + let hasError = false; + let error: any = undefined; + let isComplete = false; + + return { + next: () => { + return new Promise((resolve, reject) => { + if (buffer.length > 0) { + resolve({ value: buffer.shift()!, done: false }); + return; + } + + if (hasError) { + reject(error); + return; + } + + if (isComplete) { + resolve({ value: undefined, done: true }); + return; + } + + if (!ac) { + ac = new AbortController(); + this.subscribe( + { + next(value) { + if (deferred.length > 0) { + const [resolve] = deferred.shift()!; + resolve({ value, done: false }); + } else { + buffer.push(value); + } + }, + error(err) { + if (buffer.length > 0) { + hasError = true; + error = err; + } else { + while (deferred.length > 0) { + const [, reject] = deferred.shift()!; + reject(err); + } + } + }, + complete() {}, + }, + { + signal: ac.signal, + } + ); + } + + deferred.push([resolve, reject]); + }); + }, + + throw: (err) => { + return new Promise((_resolve, reject) => { + ac?.abort(); + hasError = true; + error = err; + for (const [, reject] of deferred) { + reject(error); + } + reject(error); + }); + }, + + return: () => { + return new Promise((resolve, reject) => { + ac?.abort(); + isComplete = true; + for (const [resolve] of deferred) { + resolve({ value: undefined, done: true }); + } + resolve({ value: undefined, done: true }); + }); + }, + + [Symbol.asyncIterator]() { + return this; + }, + }; + } } class Subscriber implements Observer { From 5febf419b44ab70c8563a2f4c70d651b560135d9 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 29 Nov 2023 13:35:07 -0600 Subject: [PATCH 04/11] WIP --- impl/observable-polyfill.js | 22 ++ impl/observable.js | 692 ++++++++++++++++++++++++++++++++++++ 2 files changed, 714 insertions(+) create mode 100644 impl/observable-polyfill.js create mode 100644 impl/observable.js diff --git a/impl/observable-polyfill.js b/impl/observable-polyfill.js new file mode 100644 index 0000000..7be6535 --- /dev/null +++ b/impl/observable-polyfill.js @@ -0,0 +1,22 @@ +import { Observable } from "./observable.js"; + +function polyfill() { + const proto = EventTarget.prototype; + if (typeof proto.on !== "function") { + proto.on = function (type) { + return new Observable((subscriber) => { + this.addEventListener( + type, + (event) => { + subscriber.next(event); + }, + { signal: subscriber.signal } + ); + }); + }; + } + if (typeof globalThis.Observable !== "function") { + globalThis.Observable = Observable; + } +} +polyfill(); diff --git a/impl/observable.js b/impl/observable.js new file mode 100644 index 0000000..bf27752 --- /dev/null +++ b/impl/observable.js @@ -0,0 +1,692 @@ +export class Observable { + static from(value) { + if (value instanceof Observable) { + return value; + } + if (isPromiseLike(value)) { + return new Observable((subscriber) => { + value.then( + (value) => { + subscriber.next(value); + subscriber.complete(); + }, + (error) => { + subscriber.error(error); + } + ); + }); + } + if (Symbol.asyncIterator in value) { + return new Observable(async (subscriber) => { + try { + for await (const v of value) { + subscriber.next(v); + } + subscriber.complete(); + } catch (error) { + subscriber.error(error); + } + }); + } + if (Symbol.iterator in value) { + return new Observable((subscriber) => { + try { + for (const v of value) { + if (subscriber.isActive) subscriber.next(v); + } + subscriber.complete(); + } catch (error) { + subscriber.error(error); + } + }); + } + throw new TypeError("Value is not observable"); + } + constructor(start) { + this.start = start; + } + subscribe(fnOrObserver, options) { + const partialObserver = + typeof fnOrObserver === "function" + ? { next: fnOrObserver } + : fnOrObserver; + const observer = { + next: noop, + error: reportError, + complete: noop, + ...partialObserver, + }; + const subscriber = new Subscriber(options?.signal, observer); + this.start(subscriber); + } + forEach(callback, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + this.subscribe( + { + next: (value) => { + try { + callback(value); + } catch (error) { + reject(error); + ac.abort(); + } + }, + error: reject, + complete: resolve, + }, + { + signal, + } + ); + }); + } + map(project) { + return new Observable((destination) => { + let index = 0; + this.subscribe( + { + next(value) { + let result; + try { + result = project(value, index++); + } catch (error) { + destination.error(error); + return; + } + destination.next(result); + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + filter(predicate) { + return new Observable((destination) => { + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + destination.error(error); + return; + } + if (result) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + take(count) { + return new Observable((destination) => { + let remaining = count; + this.subscribe( + { + next(value) { + remaining--; + if (remaining >= 0) { + destination.next(value); + } + if (remaining === 0) { + destination.complete(); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + drop(count) { + return new Observable((destination) => { + let seen = 0; + this.subscribe( + { + next(value) { + seen++; + if (seen > count) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + flatMap(project) { + return new Observable((destination) => { + let queue = []; + let index = 0; + let innerAC; + let outerComplete = false; + const startInner = (value) => { + innerAC = new AbortController(); + const signal = useSignalAll([innerAC.signal, destination.signal]); + let innerObservable; + try { + innerObservable = Observable.from(project(value, index++)); + } catch (error) { + destination.error(error); + return; + } + innerObservable.subscribe( + { + next(innerValue) { + destination.next(innerValue); + }, + error(error) { + destination.error(error); + }, + complete() { + innerAC = undefined; + if (queue.length > 0) { + startInner(queue.shift()); + } else if (outerComplete) { + destination.complete(); + } + }, + }, + { + signal, + } + ); + }; + this.subscribe( + { + next(value) { + if (innerAC) { + queue.push(value); + } else { + startInner(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (queue.length === 0) { + destination.complete(); + } + }, + }, + { + signal: destination.signal, + } + ); + }); + } + takeUntil(notifier) { + return new Observable((destination) => { + Observable.from(notifier).subscribe( + { + next() { + destination.complete(); + }, + error(error) { + destination.error(error); + }, + }, + { + signal: destination.signal, + } + ); + this.subscribe(destination, { signal: destination.signal }); + }); + } + every(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (!result) { + resolve(false); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(true); + }, + }, + { + signal, + } + ); + }); + } + some(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(true); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(false); + }, + }, + { + signal, + } + ); + }); + } + find(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(value); + ac.abort(); + } + }, + error: reject, + complete() { + // TODO: Figure out the proper semantics here. + reject(new Error("Value not found")); + }, + }, + { + signal, + } + ); + }); + } + reduce(reducer, initialValue, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + let hasState = arguments.length >= 2; + let state = initialValue; + let index = 0; + this.subscribe( + { + next(value) { + if (hasState) { + try { + state = reducer(state, value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + } else { + state = value; + } + hasState = true; + }, + error: reject, + complete() { + resolve(state); + }, + }, + { + signal, + } + ); + }); + } + toArray(options) { + return this.reduce( + (arr, value) => { + arr.push(value); + return arr; + }, + [], + options + ); + } + catch(handleError) { + return new Observable((destination) => { + this.subscribe( + { + next(value) { + destination.next(value); + }, + error(error) { + Observable.from(handleError(error)).subscribe(destination, { + signal: destination.signal, + }); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + finally(onFinalize) { + return new Observable((destination) => { + destination.addTeardown(onFinalize); + this.subscribe(destination, { + signal: destination.signal, + }); + }); + } + switchMap(project) { + return new Observable((destination) => { + let index = 0; + let outerComplete = false; + let innerAC; + this.subscribe( + { + next: (value) => { + innerAC?.abort(); + innerAC = new AbortController(); + const signal = useSignalAll([innerAC.signal, destination.signal]); + let innerObservable; + try { + innerObservable = Observable.from(project(value, index++)); + } catch (error) { + destination.error(error); + return; + } + innerObservable.subscribe( + { + next(innerValue) { + destination.next(innerValue); + }, + error(error) { + destination.error(error); + }, + complete() { + innerAC = undefined; + if (outerComplete) { + destination.complete(); + } + }, + }, + { + signal, + } + ); + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (!innerAC) { + destination.complete(); + } + }, + }, + { + signal: destination.signal, + } + ); + }); + } + do(fnOrObserver) { + return new Observable((destination) => { + const doObserver = + typeof fnOrObserver === "function" + ? { next: fnOrObserver } + : fnOrObserver; + this.subscribe( + { + next(value) { + doObserver.next?.(value); + destination.next(value); + }, + error(error) { + doObserver.error?.(error); + destination.error(error); + }, + complete() { + doObserver.complete?.(); + destination.complete(); + }, + }, + { + signal: destination.signal, + } + ); + }); + } + [Symbol.asyncIterator]() { + let ac; + let deferred = []; + let buffer = []; + let hasError = false; + let error = undefined; + let isComplete = false; + return { + next: () => { + return new Promise((resolve, reject) => { + if (buffer.length > 0) { + resolve({ value: buffer.shift(), done: false }); + return; + } + if (hasError) { + reject(error); + return; + } + if (isComplete) { + resolve({ value: undefined, done: true }); + return; + } + if (!ac) { + ac = new AbortController(); + this.subscribe( + { + next(value) { + if (deferred.length > 0) { + const [resolve] = deferred.shift(); + resolve({ value, done: false }); + } else { + buffer.push(value); + } + }, + error(err) { + if (buffer.length > 0) { + hasError = true; + error = err; + } else { + while (deferred.length > 0) { + const [, reject] = deferred.shift(); + reject(err); + } + } + }, + complete() {}, + }, + { + signal: ac.signal, + } + ); + } + deferred.push([resolve, reject]); + }); + }, + throw: (err) => { + return new Promise((_resolve, reject) => { + ac?.abort(); + hasError = true; + error = err; + for (const [, deferredReject] of deferred) { + deferredReject(error); + } + reject(error); + }); + }, + return: () => { + return new Promise((resolve) => { + ac?.abort(); + isComplete = true; + for (const [deferredResolve] of deferred) { + deferredResolve({ value: undefined, done: true }); + } + resolve({ value: undefined, done: true }); + }); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + } +} +class Subscriber { + #active = true; + #abortController = new AbortController(); + #signal; + get signal() { + return this.#signal; + } + get isActive() { + return this.#active && !this.signal.aborted; + } + constructor(signal, _observer) { + this._observer = _observer; + const ownSignal = this.#abortController.signal; + this.#signal = signal ? useSignalAll([signal, ownSignal]) : ownSignal; + } + next(value) { + if (this.isActive) { + this._observer.next(value); + } + } + error(error) { + if (this.isActive) { + this.#active = false; + this._observer.error(error); + this.#abortController.abort(); + } + } + complete() { + if (this.isActive) { + this.#active = false; + this._observer.complete(); + this.#abortController.abort(); + } + } + addTeardown(teardown) { + if (this.isActive) { + this.#abortController.signal.addEventListener("abort", teardown, { + once: true, + }); + } else { + teardown(); + } + } + removeTeardown(teardown) { + this.#abortController.signal.removeEventListener("abort", teardown); + } +} +function noop() {} + +function useSignalAll(signals) { + if (typeof AbortSignal.all === "function") { + return AbortSignal.all(signals); + } else { + const ac = new AbortController(); + const handleAbort = () => { + ac.abort(); + for (const signal of signals) { + signal.removeEventListener("abort", handleAbort); + } + }; + for (const signal of signals) { + signal.addEventListener("abort", handleAbort); + } + return ac.signal; + } +} +function isPromiseLike(value) { + return ( + value instanceof Promise || + (typeof value === "object" && + value !== null && + typeof value.then === "function") + ); +} From f14f64bd8b2656a866ad6251f708cf2648bd9a01 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 29 Nov 2023 14:33:08 -0600 Subject: [PATCH 05/11] Change implementation to pure JS with .d.ts typings --- impl/observable-polyfill.d.ts | 379 +++++++++ impl/observable-polyfill.js | 37 +- impl/observable.d.ts | 77 ++ impl/observable.js | 1364 ++++++++++++++++----------------- impl/observable.ts | 822 -------------------- interfaces.txt | 73 ++ 6 files changed, 1230 insertions(+), 1522 deletions(-) create mode 100644 impl/observable-polyfill.d.ts create mode 100644 impl/observable.d.ts delete mode 100644 impl/observable.ts create mode 100644 interfaces.txt diff --git a/impl/observable-polyfill.d.ts b/impl/observable-polyfill.d.ts new file mode 100644 index 0000000..2871499 --- /dev/null +++ b/impl/observable-polyfill.d.ts @@ -0,0 +1,379 @@ +import { Observable } from './observable'; + +declare global { + interface EventTarget { + on(eventType: string): Observable; + } + interface AbortSignal { + on( + eventType: K, + ): Observable; + } + interface AbstractWorker { + on( + eventType: K, + ): Observable; + } + interface Animation { + on( + eventType: K, + ): Observable; + } + interface AudioScheduledSourceNode { + on( + eventType: K, + ): Observable; + } + interface AudioWorkletNode { + on( + eventType: K, + ): Observable; + } + interface BaseAudioContext { + on( + eventType: K, + ): Observable; + } + interface BroadcastChannel { + on( + eventType: K, + ): Observable; + } + interface Document { + on( + eventType: K, + ): Observable; + } + interface Element { + on( + eventType: K, + ): Observable; + } + interface EventSource { + on( + eventType: K, + ): Observable; + } + interface FileReader { + on( + eventType: K, + ): Observable; + } + interface FontFaceSet { + on( + eventType: K, + ): Observable; + } + interface GlobalEventHandlers { + on( + eventType: K, + ): Observable; + } + interface HTMLBodyElement { + on( + eventType: K, + ): Observable; + } + interface HTMLElement { + on( + eventType: K, + ): Observable; + } + interface HTMLFrameSetElement { + on( + eventType: K, + ): Observable; + } + interface HTMLMediaElement { + on( + eventType: K, + ): Observable; + } + interface HTMLVideoElement { + on( + eventType: K, + ): Observable; + } + interface IDBDatabase { + on( + eventType: K, + ): Observable; + } + interface IDBOpenDBRequest { + on( + eventType: K, + ): Observable; + } + interface IDBRequest { + on( + eventType: K, + ): Observable; + } + interface IDBTransaction { + on( + eventType: K, + ): Observable; + } + interface MIDIAccess { + on( + eventType: K, + ): Observable; + } + interface MIDIInput { + on( + eventType: K, + ): Observable; + } + interface MIDIPort { + on( + eventType: K, + ): Observable; + } + interface MathMLElement { + on( + eventType: K, + ): Observable; + } + interface MediaDevices { + on( + eventType: K, + ): Observable; + } + interface MediaKeySession { + on( + eventType: K, + ): Observable; + } + interface MediaQueryList { + on( + eventType: K, + ): Observable; + } + interface MediaRecorder { + on( + eventType: K, + ): Observable; + } + interface MediaSource { + on( + eventType: K, + ): Observable; + } + interface MediaStream { + on( + eventType: K, + ): Observable; + } + interface MediaStreamTrack { + on( + eventType: K, + ): Observable; + } + interface MessagePort { + on( + eventType: K, + ): Observable; + } + interface Notification { + on( + eventType: K, + ): Observable; + } + interface OfflineAudioContext { + on( + eventType: K, + ): Observable; + } + interface OffscreenCanvas { + on( + eventType: K, + ): Observable; + } + interface PaymentRequest { + on( + eventType: K, + ): Observable; + } + interface Performance { + on( + eventType: K, + ): Observable; + } + interface PermissionStatus { + on( + eventType: K, + ): Observable; + } + interface PictureInPictureWindow { + on( + eventType: K, + ): Observable; + } + interface RTCDTMFSender { + on( + eventType: K, + ): Observable; + } + interface RTCDataChannel { + on( + eventType: K, + ): Observable; + } + interface RTCDtlsTransport { + on( + eventType: K, + ): Observable; + } + interface RTCIceTransport { + on( + eventType: K, + ): Observable; + } + interface RTCPeerConnection { + on( + eventType: K, + ): Observable; + } + interface RTCSctpTransport { + on( + eventType: K, + ): Observable; + } + interface RemotePlayback { + on( + eventType: K, + ): Observable; + } + interface SharedWorker { + on( + eventType: K, + ): Observable; + } + interface SVGElement { + on( + eventType: K, + ): Observable; + } + interface SVGSVGElement { + on( + eventType: K, + ): Observable; + } + interface ScreenOrientation { + on( + eventType: K, + ): Observable; + } + interface ScriptProcessorNode { + on( + eventType: K, + ): Observable; + } + interface ServiceWorker { + on( + eventType: K, + ): Observable; + } + interface ServiceWorkerContainer { + on( + eventType: K, + ): Observable; + } + interface ServiceWorkerRegistration { + on( + eventType: K, + ): Observable; + } + interface ShadowRoot { + on( + eventType: K, + ): Observable; + } + interface SourceBuffer { + on( + eventType: K, + ): Observable; + } + interface SourceBufferList { + on( + eventType: K, + ): Observable; + } + interface SpeechSynthesis { + on( + eventType: K, + ): Observable; + } + interface SpeechSynthesisUtterance { + on( + eventType: K, + ): Observable; + } + interface TextTrack { + on( + eventType: K, + ): Observable; + } + interface TextTrackCue { + on( + eventType: K, + ): Observable; + } + interface TextTrackList { + on( + eventType: K, + ): Observable; + } + interface VideoDecoder { + on( + eventType: K, + ): Observable; + } + interface VideoEncoder { + on( + eventType: K, + ): Observable; + } + interface VisualViewport { + on( + eventType: K, + ): Observable; + } + interface WakeLockSentinel { + on( + eventType: K, + ): Observable; + } + interface WebSocket { + on( + eventType: K, + ): Observable; + } + interface Window { + on( + eventType: K, + ): Observable; + } + interface WindowEventHandlers { + on( + eventType: K, + ): Observable; + } + interface Worker { + on( + eventType: K, + ): Observable; + } + interface XMLHttpRequest { + on( + eventType: K, + ): Observable; + } + interface XMLHttpRequestEventTarget { + on( + eventType: K, + ): Observable; + } +} + +export {}; diff --git a/impl/observable-polyfill.js b/impl/observable-polyfill.js index 7be6535..0349ea9 100644 --- a/impl/observable-polyfill.js +++ b/impl/observable-polyfill.js @@ -1,22 +1,23 @@ -import { Observable } from "./observable.js"; +import { Observable } from './observable.js'; function polyfill() { - const proto = EventTarget.prototype; - if (typeof proto.on !== "function") { - proto.on = function (type) { - return new Observable((subscriber) => { - this.addEventListener( - type, - (event) => { - subscriber.next(event); - }, - { signal: subscriber.signal } - ); - }); - }; - } - if (typeof globalThis.Observable !== "function") { - globalThis.Observable = Observable; - } + const proto = EventTarget.prototype; + if (typeof proto.on !== 'function') { + proto.on = function (type) { + return new Observable((subscriber) => { + this.addEventListener( + type, + (event) => { + subscriber.next(event); + }, + { signal: subscriber.signal }, + ); + }); + }; + } + if (typeof globalThis.Observable !== 'function') { + globalThis.Observable = Observable; + } } + polyfill(); diff --git a/impl/observable.d.ts b/impl/observable.d.ts new file mode 100644 index 0000000..5d74042 --- /dev/null +++ b/impl/observable.d.ts @@ -0,0 +1,77 @@ +export interface Observer { + next(value: T): void; + error(error: any): void; + complete(): void; +} +export interface SubscriptionOptions { + signal?: AbortSignal; +} +type ConvertableToObservable = + | Observable + | PromiseLike + | Iterable + | AsyncIterable; +export declare class Observable { + private start; + static from(value: ConvertableToObservable): Observable; + constructor(start: (subscriber: Subscriber) => void); + subscribe(onNext: (value: T) => void, options?: SubscriptionOptions): void; + subscribe( + observer: Partial>, + options?: SubscriptionOptions, + ): void; + forEach( + callback: (value: T) => void, + options?: SubscriptionOptions, + ): Promise; + map(project: (value: T, index: number) => R): Observable; + filter(predicate: (value: T, index: number) => boolean): Observable; + take(count: number): Observable; + drop(count: number): Observable; + flatMap( + project: (value: T, index: number) => ConvertableToObservable, + ): Observable; + takeUntil(notifier: ConvertableToObservable): Observable; + every( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions, + ): Promise; + some( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions, + ): Promise; + find( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions, + ): Promise; + reduce( + reducer: (accumulated: S, value: T, index: number) => S, + initialValue?: S, + options?: SubscriptionOptions, + ): Promise; + toArray(options?: SubscriptionOptions): Promise; + catch( + handleError: (error: unknown) => ConvertableToObservable, + ): Observable; + finally(onFinalize: () => void): Observable; + switchMap( + project: (value: T, index: number) => ConvertableToObservable, + ): Observable; + do(fnOrObserver: ((value: T) => void) | Partial>): Observable; + [Symbol.asyncIterator](): AsyncGenerator; +} + +declare class Subscriber implements Observer { + #private; + private _observer; + get signal(): AbortSignal; + get isActive(): boolean; + constructor(signal: AbortSignal | undefined, _observer: Observer); + next(value: T): void; + error(error: any): void; + complete(): void; + addTeardown(teardown: () => void): void; + removeTeardown(teardown: () => void): void; +} + +export {}; diff --git a/impl/observable.js b/impl/observable.js index bf27752..1f9ecc9 100644 --- a/impl/observable.js +++ b/impl/observable.js @@ -1,692 +1,692 @@ export class Observable { - static from(value) { - if (value instanceof Observable) { - return value; - } - if (isPromiseLike(value)) { - return new Observable((subscriber) => { - value.then( - (value) => { - subscriber.next(value); - subscriber.complete(); - }, - (error) => { - subscriber.error(error); - } - ); - }); - } - if (Symbol.asyncIterator in value) { - return new Observable(async (subscriber) => { - try { - for await (const v of value) { - subscriber.next(v); - } - subscriber.complete(); - } catch (error) { - subscriber.error(error); - } - }); - } - if (Symbol.iterator in value) { - return new Observable((subscriber) => { - try { - for (const v of value) { - if (subscriber.isActive) subscriber.next(v); - } - subscriber.complete(); - } catch (error) { - subscriber.error(error); - } - }); - } - throw new TypeError("Value is not observable"); - } - constructor(start) { - this.start = start; - } - subscribe(fnOrObserver, options) { - const partialObserver = - typeof fnOrObserver === "function" - ? { next: fnOrObserver } - : fnOrObserver; - const observer = { - next: noop, - error: reportError, - complete: noop, - ...partialObserver, - }; - const subscriber = new Subscriber(options?.signal, observer); - this.start(subscriber); - } - forEach(callback, options) { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - this.subscribe( - { - next: (value) => { - try { - callback(value); - } catch (error) { - reject(error); - ac.abort(); - } - }, - error: reject, - complete: resolve, - }, - { - signal, - } - ); - }); - } - map(project) { - return new Observable((destination) => { - let index = 0; - this.subscribe( - { - next(value) { - let result; - try { - result = project(value, index++); - } catch (error) { - destination.error(error); - return; - } - destination.next(result); - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - filter(predicate) { - return new Observable((destination) => { - let index = 0; - this.subscribe( - { - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - destination.error(error); - return; - } - if (result) { - destination.next(value); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - take(count) { - return new Observable((destination) => { - let remaining = count; - this.subscribe( - { - next(value) { - remaining--; - if (remaining >= 0) { - destination.next(value); - } - if (remaining === 0) { - destination.complete(); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - drop(count) { - return new Observable((destination) => { - let seen = 0; - this.subscribe( - { - next(value) { - seen++; - if (seen > count) { - destination.next(value); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - flatMap(project) { - return new Observable((destination) => { - let queue = []; - let index = 0; - let innerAC; - let outerComplete = false; - const startInner = (value) => { - innerAC = new AbortController(); - const signal = useSignalAll([innerAC.signal, destination.signal]); - let innerObservable; - try { - innerObservable = Observable.from(project(value, index++)); - } catch (error) { - destination.error(error); - return; - } - innerObservable.subscribe( - { - next(innerValue) { - destination.next(innerValue); - }, - error(error) { - destination.error(error); - }, - complete() { - innerAC = undefined; - if (queue.length > 0) { - startInner(queue.shift()); - } else if (outerComplete) { - destination.complete(); - } - }, - }, - { - signal, - } - ); - }; - this.subscribe( - { - next(value) { - if (innerAC) { - queue.push(value); - } else { - startInner(value); - } - }, - error(error) { - destination.error(error); - }, - complete() { - outerComplete = true; - if (queue.length === 0) { - destination.complete(); - } - }, - }, - { - signal: destination.signal, - } - ); - }); - } - takeUntil(notifier) { - return new Observable((destination) => { - Observable.from(notifier).subscribe( - { - next() { - destination.complete(); - }, - error(error) { - destination.error(error); - }, - }, - { - signal: destination.signal, - } - ); - this.subscribe(destination, { signal: destination.signal }); - }); - } - every(predicate, options) { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - let index = 0; - this.subscribe( - { - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (!result) { - resolve(false); - ac.abort(); - } - }, - error: reject, - complete() { - resolve(true); - }, - }, - { - signal, - } - ); - }); - } - some(predicate, options) { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - let index = 0; - this.subscribe( - { - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (result) { - resolve(true); - ac.abort(); - } - }, - error: reject, - complete() { - resolve(false); - }, - }, - { - signal, - } - ); - }); - } - find(predicate, options) { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - let index = 0; - this.subscribe( - { - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (result) { - resolve(value); - ac.abort(); - } - }, - error: reject, - complete() { - // TODO: Figure out the proper semantics here. - reject(new Error("Value not found")); - }, - }, - { - signal, - } - ); - }); - } - reduce(reducer, initialValue, options) { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - let hasState = arguments.length >= 2; - let state = initialValue; - let index = 0; - this.subscribe( - { - next(value) { - if (hasState) { - try { - state = reducer(state, value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - } else { - state = value; - } - hasState = true; - }, - error: reject, - complete() { - resolve(state); - }, - }, - { - signal, - } - ); - }); - } - toArray(options) { - return this.reduce( - (arr, value) => { - arr.push(value); - return arr; - }, - [], - options - ); - } - catch(handleError) { - return new Observable((destination) => { - this.subscribe( - { - next(value) { - destination.next(value); - }, - error(error) { - Observable.from(handleError(error)).subscribe(destination, { - signal: destination.signal, - }); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - finally(onFinalize) { - return new Observable((destination) => { - destination.addTeardown(onFinalize); - this.subscribe(destination, { - signal: destination.signal, - }); - }); - } - switchMap(project) { - return new Observable((destination) => { - let index = 0; - let outerComplete = false; - let innerAC; - this.subscribe( - { - next: (value) => { - innerAC?.abort(); - innerAC = new AbortController(); - const signal = useSignalAll([innerAC.signal, destination.signal]); - let innerObservable; - try { - innerObservable = Observable.from(project(value, index++)); - } catch (error) { - destination.error(error); - return; - } - innerObservable.subscribe( - { - next(innerValue) { - destination.next(innerValue); - }, - error(error) { - destination.error(error); - }, - complete() { - innerAC = undefined; - if (outerComplete) { - destination.complete(); - } - }, - }, - { - signal, - } - ); - }, - error(error) { - destination.error(error); - }, - complete() { - outerComplete = true; - if (!innerAC) { - destination.complete(); - } - }, - }, - { - signal: destination.signal, - } - ); - }); - } - do(fnOrObserver) { - return new Observable((destination) => { - const doObserver = - typeof fnOrObserver === "function" - ? { next: fnOrObserver } - : fnOrObserver; - this.subscribe( - { - next(value) { - doObserver.next?.(value); - destination.next(value); - }, - error(error) { - doObserver.error?.(error); - destination.error(error); - }, - complete() { - doObserver.complete?.(); - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - [Symbol.asyncIterator]() { - let ac; - let deferred = []; - let buffer = []; - let hasError = false; - let error = undefined; - let isComplete = false; - return { - next: () => { - return new Promise((resolve, reject) => { - if (buffer.length > 0) { - resolve({ value: buffer.shift(), done: false }); - return; - } - if (hasError) { - reject(error); - return; - } - if (isComplete) { - resolve({ value: undefined, done: true }); - return; - } - if (!ac) { - ac = new AbortController(); - this.subscribe( - { - next(value) { - if (deferred.length > 0) { - const [resolve] = deferred.shift(); - resolve({ value, done: false }); - } else { - buffer.push(value); - } - }, - error(err) { - if (buffer.length > 0) { - hasError = true; - error = err; - } else { - while (deferred.length > 0) { - const [, reject] = deferred.shift(); - reject(err); - } - } - }, - complete() {}, - }, - { - signal: ac.signal, - } - ); - } - deferred.push([resolve, reject]); - }); - }, - throw: (err) => { - return new Promise((_resolve, reject) => { - ac?.abort(); - hasError = true; - error = err; - for (const [, deferredReject] of deferred) { - deferredReject(error); - } - reject(error); - }); - }, - return: () => { - return new Promise((resolve) => { - ac?.abort(); - isComplete = true; - for (const [deferredResolve] of deferred) { - deferredResolve({ value: undefined, done: true }); - } - resolve({ value: undefined, done: true }); - }); - }, - [Symbol.asyncIterator]() { - return this; - }, - }; - } + static from(value) { + if (value instanceof Observable) { + return value; + } + if (isPromiseLike(value)) { + return new Observable((subscriber) => { + value.then( + (value) => { + subscriber.next(value); + subscriber.complete(); + }, + (error) => { + subscriber.error(error); + }, + ); + }); + } + if (Symbol.asyncIterator in value) { + return new Observable(async (subscriber) => { + try { + for await (const v of value) { + subscriber.next(v); + } + subscriber.complete(); + } catch (error) { + subscriber.error(error); + } + }); + } + if (Symbol.iterator in value) { + return new Observable((subscriber) => { + try { + for (const v of value) { + if (subscriber.isActive) subscriber.next(v); + } + subscriber.complete(); + } catch (error) { + subscriber.error(error); + } + }); + } + throw new TypeError('Value is not observable'); + } + constructor(start) { + this.start = start; + } + subscribe(fnOrObserver, options) { + const partialObserver = + typeof fnOrObserver === 'function' + ? { next: fnOrObserver } + : fnOrObserver; + const observer = { + next: noop, + error: reportError, + complete: noop, + ...partialObserver, + }; + const subscriber = new Subscriber(options?.signal, observer); + this.start(subscriber); + } + forEach(callback, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + this.subscribe( + { + next: (value) => { + try { + callback(value); + } catch (error) { + reject(error); + ac.abort(); + } + }, + error: reject, + complete: resolve, + }, + { + signal, + }, + ); + }); + } + map(project) { + return new Observable((destination) => { + let index = 0; + this.subscribe( + { + next(value) { + let result; + try { + result = project(value, index++); + } catch (error) { + destination.error(error); + return; + } + destination.next(result); + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + filter(predicate) { + return new Observable((destination) => { + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + destination.error(error); + return; + } + if (result) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + take(count) { + return new Observable((destination) => { + let remaining = count; + this.subscribe( + { + next(value) { + remaining--; + if (remaining >= 0) { + destination.next(value); + } + if (remaining === 0) { + destination.complete(); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + drop(count) { + return new Observable((destination) => { + let seen = 0; + this.subscribe( + { + next(value) { + seen++; + if (seen > count) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + flatMap(project) { + return new Observable((destination) => { + let queue = []; + let index = 0; + let innerAC; + let outerComplete = false; + const startInner = (value) => { + innerAC = new AbortController(); + const signal = useSignalAll([innerAC.signal, destination.signal]); + let innerObservable; + try { + innerObservable = Observable.from(project(value, index++)); + } catch (error) { + destination.error(error); + return; + } + innerObservable.subscribe( + { + next(innerValue) { + destination.next(innerValue); + }, + error(error) { + destination.error(error); + }, + complete() { + innerAC = undefined; + if (queue.length > 0) { + startInner(queue.shift()); + } else if (outerComplete) { + destination.complete(); + } + }, + }, + { + signal, + }, + ); + }; + this.subscribe( + { + next(value) { + if (innerAC) { + queue.push(value); + } else { + startInner(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (queue.length === 0) { + destination.complete(); + } + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + takeUntil(notifier) { + return new Observable((destination) => { + Observable.from(notifier).subscribe( + { + next() { + destination.complete(); + }, + error(error) { + destination.error(error); + }, + }, + { + signal: destination.signal, + }, + ); + this.subscribe(destination, { signal: destination.signal }); + }); + } + every(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (!result) { + resolve(false); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(true); + }, + }, + { + signal, + }, + ); + }); + } + some(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(true); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(false); + }, + }, + { + signal, + }, + ); + }); + } + find(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(value); + ac.abort(); + } + }, + error: reject, + complete() { + // TODO: Figure out the proper semantics here. + reject(new Error('Value not found')); + }, + }, + { + signal, + }, + ); + }); + } + reduce(reducer, initialValue, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? useSignalAll([ac.signal, options.signal]) + : ac.signal; + let hasState = arguments.length >= 2; + let state = initialValue; + let index = 0; + this.subscribe( + { + next(value) { + if (hasState) { + try { + state = reducer(state, value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + } else { + state = value; + } + hasState = true; + }, + error: reject, + complete() { + resolve(state); + }, + }, + { + signal, + }, + ); + }); + } + toArray(options) { + return this.reduce( + (arr, value) => { + arr.push(value); + return arr; + }, + [], + options, + ); + } + catch(handleError) { + return new Observable((destination) => { + this.subscribe( + { + next(value) { + destination.next(value); + }, + error(error) { + Observable.from(handleError(error)).subscribe(destination, { + signal: destination.signal, + }); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + finally(onFinalize) { + return new Observable((destination) => { + destination.addTeardown(onFinalize); + this.subscribe(destination, { + signal: destination.signal, + }); + }); + } + switchMap(project) { + return new Observable((destination) => { + let index = 0; + let outerComplete = false; + let innerAC; + this.subscribe( + { + next: (value) => { + innerAC?.abort(); + innerAC = new AbortController(); + const signal = useSignalAll([innerAC.signal, destination.signal]); + let innerObservable; + try { + innerObservable = Observable.from(project(value, index++)); + } catch (error) { + destination.error(error); + return; + } + innerObservable.subscribe( + { + next(innerValue) { + destination.next(innerValue); + }, + error(error) { + destination.error(error); + }, + complete() { + innerAC = undefined; + if (outerComplete) { + destination.complete(); + } + }, + }, + { + signal, + }, + ); + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (!innerAC) { + destination.complete(); + } + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + do(fnOrObserver) { + return new Observable((destination) => { + const doObserver = + typeof fnOrObserver === 'function' + ? { next: fnOrObserver } + : fnOrObserver; + this.subscribe( + { + next(value) { + doObserver.next?.(value); + destination.next(value); + }, + error(error) { + doObserver.error?.(error); + destination.error(error); + }, + complete() { + doObserver.complete?.(); + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + [Symbol.asyncIterator]() { + let ac; + let deferred = []; + let buffer = []; + let hasError = false; + let error = undefined; + let isComplete = false; + return { + next: () => { + return new Promise((resolve, reject) => { + if (buffer.length > 0) { + resolve({ value: buffer.shift(), done: false }); + return; + } + if (hasError) { + reject(error); + return; + } + if (isComplete) { + resolve({ value: undefined, done: true }); + return; + } + if (!ac) { + ac = new AbortController(); + this.subscribe( + { + next(value) { + if (deferred.length > 0) { + const [resolve] = deferred.shift(); + resolve({ value, done: false }); + } else { + buffer.push(value); + } + }, + error(err) { + if (buffer.length > 0) { + hasError = true; + error = err; + } else { + while (deferred.length > 0) { + const [, reject] = deferred.shift(); + reject(err); + } + } + }, + complete() {}, + }, + { + signal: ac.signal, + }, + ); + } + deferred.push([resolve, reject]); + }); + }, + throw: (err) => { + return new Promise((_resolve, reject) => { + ac?.abort(); + hasError = true; + error = err; + for (const [, deferredReject] of deferred) { + deferredReject(error); + } + reject(error); + }); + }, + return: () => { + return new Promise((resolve) => { + ac?.abort(); + isComplete = true; + for (const [deferredResolve] of deferred) { + deferredResolve({ value: undefined, done: true }); + } + resolve({ value: undefined, done: true }); + }); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + } } class Subscriber { - #active = true; - #abortController = new AbortController(); - #signal; - get signal() { - return this.#signal; - } - get isActive() { - return this.#active && !this.signal.aborted; - } - constructor(signal, _observer) { - this._observer = _observer; - const ownSignal = this.#abortController.signal; - this.#signal = signal ? useSignalAll([signal, ownSignal]) : ownSignal; - } - next(value) { - if (this.isActive) { - this._observer.next(value); - } - } - error(error) { - if (this.isActive) { - this.#active = false; - this._observer.error(error); - this.#abortController.abort(); - } - } - complete() { - if (this.isActive) { - this.#active = false; - this._observer.complete(); - this.#abortController.abort(); - } - } - addTeardown(teardown) { - if (this.isActive) { - this.#abortController.signal.addEventListener("abort", teardown, { - once: true, - }); - } else { - teardown(); - } - } - removeTeardown(teardown) { - this.#abortController.signal.removeEventListener("abort", teardown); - } + #active = true; + #abortController = new AbortController(); + #signal; + get signal() { + return this.#signal; + } + get isActive() { + return this.#active && !this.signal.aborted; + } + constructor(signal, _observer) { + this._observer = _observer; + const ownSignal = this.#abortController.signal; + this.#signal = signal ? useSignalAll([signal, ownSignal]) : ownSignal; + } + next(value) { + if (this.isActive) { + this._observer.next(value); + } + } + error(error) { + if (this.isActive) { + this.#active = false; + this._observer.error(error); + this.#abortController.abort(); + } + } + complete() { + if (this.isActive) { + this.#active = false; + this._observer.complete(); + this.#abortController.abort(); + } + } + addTeardown(teardown) { + if (this.isActive) { + this.#abortController.signal.addEventListener('abort', teardown, { + once: true, + }); + } else { + teardown(); + } + } + removeTeardown(teardown) { + this.#abortController.signal.removeEventListener('abort', teardown); + } } function noop() {} function useSignalAll(signals) { - if (typeof AbortSignal.all === "function") { - return AbortSignal.all(signals); - } else { - const ac = new AbortController(); - const handleAbort = () => { - ac.abort(); - for (const signal of signals) { - signal.removeEventListener("abort", handleAbort); - } - }; - for (const signal of signals) { - signal.addEventListener("abort", handleAbort); - } - return ac.signal; - } + if (typeof AbortSignal.all === 'function') { + return AbortSignal.all(signals); + } else { + const ac = new AbortController(); + const handleAbort = () => { + ac.abort(); + for (const signal of signals) { + signal.removeEventListener('abort', handleAbort); + } + }; + for (const signal of signals) { + signal.addEventListener('abort', handleAbort); + } + return ac.signal; + } } function isPromiseLike(value) { - return ( - value instanceof Promise || - (typeof value === "object" && - value !== null && - typeof value.then === "function") - ); + return ( + value instanceof Promise || + (typeof value === 'object' && + value !== null && + typeof value.then === 'function') + ); } diff --git a/impl/observable.ts b/impl/observable.ts deleted file mode 100644 index 0bbddac..0000000 --- a/impl/observable.ts +++ /dev/null @@ -1,822 +0,0 @@ -export interface Observer { - next(value: T): void; - error(error: any): void; - complete(): void; -} - -export interface SubscriptionOptions { - signal?: AbortSignal; -} - -type ConvertableToObservable = - | Observable - | PromiseLike - | Iterable - | AsyncIterable; - -export class Observable { - static from(value: ConvertableToObservable) { - if (value instanceof Observable) { - return value; - } - - if (isPromiseLike(value)) { - return new Observable((subscriber) => { - value.then( - (value) => { - subscriber.next(value); - subscriber.complete(); - }, - (error) => { - subscriber.error(error); - } - ); - }); - } - - if (Symbol.asyncIterator in value) { - return new Observable(async (subscriber) => { - try { - for await (const v of value) { - subscriber.next(v); - } - subscriber.complete(); - } catch (error) { - subscriber.error(error); - } - }); - } - - if (Symbol.iterator in value) { - return new Observable((subscriber) => { - try { - for (const v of value) { - if (subscriber.isActive) subscriber.next(v); - } - subscriber.complete(); - } catch (error) { - subscriber.error(error); - } - }); - } - - throw new TypeError("Value is not observable"); - } - - constructor(private start: (subscriber: Subscriber) => void) {} - - subscribe(onNext: (value: T) => void, options?: SubscriptionOptions): void; - subscribe(observer: Partial>, options?: SubscriptionOptions); - subscribe( - fnOrObserver: ((value: T) => void) | Partial>, - options?: SubscriptionOptions - ) { - const partialObserver = - typeof fnOrObserver === "function" - ? { next: fnOrObserver } - : fnOrObserver; - - const observer = { - next: noop, - error: reportError, - complete: noop, - ...partialObserver, - }; - - const subscriber = new Subscriber(options?.signal, observer); - - this.start(subscriber); - } - - forEach( - callback: (value: T) => void, - options?: SubscriptionOptions - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - - this.subscribe( - { - next: (value) => { - try { - callback(value); - } catch (error) { - reject(error); - ac.abort(); - } - }, - error: reject, - complete: resolve, - }, - { - signal, - } - ); - }); - } - - map(project: (value: T, index: number) => R): Observable { - return new Observable((destination) => { - let index = 0; - - this.subscribe( - { - next(value) { - let result: R; - try { - result = project(value, index++); - } catch (error) { - destination.error(error); - return; - } - destination.next(result); - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - - filter(predicate: (value: T, index: number) => boolean): Observable { - return new Observable((destination) => { - let index = 0; - - this.subscribe( - { - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - destination.error(error); - return; - } - if (result) { - destination.next(value); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - - take(count: number): Observable { - return new Observable((destination) => { - let remaining = count; - - this.subscribe( - { - next(value) { - remaining--; - if (remaining >= 0) { - destination.next(value); - } - if (remaining === 0) { - destination.complete(); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - - drop(count: number): Observable { - return new Observable((destination) => { - let seen = 0; - - this.subscribe( - { - next(value) { - seen++; - if (seen > count) { - destination.next(value); - } - }, - error(error) { - destination.error(error); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - - flatMap( - project: (value: T, index: number) => ConvertableToObservable - ): Observable { - return new Observable((destination) => { - let queue: T[] = []; - let index = 0; - let innerAC: AbortController | undefined; - let outerComplete = false; - - const startInner = (value: T) => { - innerAC = new AbortController(); - - const signal = useSignalAll([innerAC.signal, destination.signal]); - - let innerObservable: Observable; - try { - innerObservable = Observable.from(project(value, index++)); - } catch (error) { - destination.error(error); - return; - } - - innerObservable.subscribe( - { - next(innerValue) { - destination.next(innerValue); - }, - error(error) { - destination.error(error); - }, - complete() { - innerAC = undefined; - if (queue.length > 0) { - startInner(queue.shift()!); - } else if (outerComplete) { - destination.complete(); - } - }, - }, - { - signal, - } - ); - }; - - this.subscribe( - { - next(value) { - if (innerAC) { - queue.push(value); - } else { - startInner(value); - } - }, - error(error) { - destination.error(error); - }, - complete() { - outerComplete = true; - if (queue.length === 0) { - destination.complete(); - } - }, - }, - { - signal: destination.signal, - } - ); - }); - } - - takeUntil(notifier: ConvertableToObservable): Observable { - return new Observable((destination) => { - Observable.from(notifier).subscribe( - { - next() { - destination.complete(); - }, - error(error) { - destination.error(error); - }, - }, - { - signal: destination.signal, - } - ); - this.subscribe(destination, { signal: destination.signal }); - }); - } - - every( - predicate: (value: T, index: number) => boolean, - options?: SubscriptionOptions - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - - let index = 0; - this.subscribe( - { - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (!result) { - resolve(false); - ac.abort(); - } - }, - error: reject, - complete() { - resolve(true); - }, - }, - { - signal, - } - ); - }); - } - - some( - predicate: (value: T, index: number) => boolean, - options?: SubscriptionOptions - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - - let index = 0; - this.subscribe( - { - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (result) { - resolve(true); - ac.abort(); - } - }, - error: reject, - complete() { - resolve(false); - }, - }, - { - signal, - } - ); - }); - } - - find( - predicate: (value: T, index: number) => boolean, - options?: SubscriptionOptions - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - - let index = 0; - this.subscribe( - { - next(value) { - let result = false; - try { - result = predicate(value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - if (result) { - resolve(value); - ac.abort(); - } - }, - error: reject, - complete() { - // TODO: Figure out the proper semantics here. - reject(new Error("Value not found")); - }, - }, - { - signal, - } - ); - }); - } - - reduce( - reducer: (accumulated: S, value: T, index: number) => S, - initialValue?: S, - options?: SubscriptionOptions - ): Promise { - return new Promise((resolve, reject) => { - const ac = new AbortController(); - - const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) - : ac.signal; - - let hasState = arguments.length >= 2; - let state = initialValue; - let index = 0; - this.subscribe( - { - next(value) { - if (hasState) { - try { - state = reducer(state!, value, index++); - } catch (error) { - reject(error); - ac.abort(); - return; - } - } else { - state = value as any; - } - hasState = true; - }, - error: reject, - complete() { - resolve(state!); - }, - }, - { - signal, - } - ); - }); - } - - toArray(options?: SubscriptionOptions): Promise { - return this.reduce( - (arr, value) => { - arr.push(value); - return arr; - }, - [] as T[], - options - ); - } - - catch( - handleError: (error: unknown) => ConvertableToObservable - ): Observable { - return new Observable((destination) => { - this.subscribe( - { - next(value) { - destination.next(value); - }, - error(error) { - Observable.from(handleError(error)).subscribe(destination, { - signal: destination.signal, - }); - }, - complete() { - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - - finally(onFinalize: () => void): Observable { - return new Observable((destination) => { - destination.addTeardown(onFinalize); - this.subscribe(destination, { - signal: destination.signal, - }); - }); - } - - switchMap( - project: (value: T, index: number) => ConvertableToObservable - ): Observable { - return new Observable((destination) => { - let index = 0; - let outerComplete = false; - let innerAC: AbortController | undefined; - this.subscribe( - { - next: (value) => { - innerAC?.abort(); - innerAC = new AbortController(); - const signal = useSignalAll([innerAC.signal, destination.signal]); - let innerObservable: Observable; - try { - innerObservable = Observable.from(project(value, index++)); - } catch (error) { - destination.error(error); - return; - } - - innerObservable.subscribe( - { - next(innerValue) { - destination.next(innerValue); - }, - error(error) { - destination.error(error); - }, - complete() { - innerAC = undefined; - if (outerComplete) { - destination.complete(); - } - }, - }, - { - signal, - } - ); - }, - error(error) { - destination.error(error); - }, - complete() { - outerComplete = true; - if (!innerAC) { - destination.complete(); - } - }, - }, - { - signal: destination.signal, - } - ); - }); - } - - do(fnOrObserver: ((value: T) => void) | Partial>): Observable { - return new Observable((destination) => { - const doObserver: Partial> = - typeof fnOrObserver === "function" - ? { next: fnOrObserver } - : fnOrObserver; - this.subscribe( - { - next(value) { - doObserver.next?.(value); - destination.next(value); - }, - error(error) { - doObserver.error?.(error); - destination.error(error); - }, - complete() { - doObserver.complete?.(); - destination.complete(); - }, - }, - { - signal: destination.signal, - } - ); - }); - } - - [Symbol.asyncIterator](): AsyncGenerator { - let ac: AbortController | undefined; - let deferred: [(value: IteratorResult) => void, (error: any) => void][] = - []; - let buffer: T[] = []; - let hasError = false; - let error: any = undefined; - let isComplete = false; - - return { - next: () => { - return new Promise((resolve, reject) => { - if (buffer.length > 0) { - resolve({ value: buffer.shift()!, done: false }); - return; - } - - if (hasError) { - reject(error); - return; - } - - if (isComplete) { - resolve({ value: undefined, done: true }); - return; - } - - if (!ac) { - ac = new AbortController(); - this.subscribe( - { - next(value) { - if (deferred.length > 0) { - const [resolve] = deferred.shift()!; - resolve({ value, done: false }); - } else { - buffer.push(value); - } - }, - error(err) { - if (buffer.length > 0) { - hasError = true; - error = err; - } else { - while (deferred.length > 0) { - const [, reject] = deferred.shift()!; - reject(err); - } - } - }, - complete() {}, - }, - { - signal: ac.signal, - } - ); - } - - deferred.push([resolve, reject]); - }); - }, - - throw: (err) => { - return new Promise((_resolve, reject) => { - ac?.abort(); - hasError = true; - error = err; - for (const [, reject] of deferred) { - reject(error); - } - reject(error); - }); - }, - - return: () => { - return new Promise((resolve, reject) => { - ac?.abort(); - isComplete = true; - for (const [resolve] of deferred) { - resolve({ value: undefined, done: true }); - } - resolve({ value: undefined, done: true }); - }); - }, - - [Symbol.asyncIterator]() { - return this; - }, - }; - } -} - -class Subscriber implements Observer { - #active = true; - - #abortController = new AbortController(); - #signal: AbortSignal; - - get signal() { - return this.#signal; - } - - get isActive() { - return this.#active && !this.signal.aborted; - } - - constructor(signal: AbortSignal | undefined, private _observer: Observer) { - const ownSignal = this.#abortController.signal; - this.#signal = signal ? useSignalAll([signal, ownSignal]) : ownSignal; - } - - next(value: T): void { - if (this.isActive) { - this._observer.next(value); - } - } - - error(error: any): void { - if (this.isActive) { - this.#active = false; - this._observer.error(error); - this.#abortController.abort(); - } - } - - complete(): void { - if (this.isActive) { - this.#active = false; - this._observer.complete(); - this.#abortController.abort(); - } - } - - addTeardown(teardown: () => void) { - if (this.isActive) { - this.#abortController.signal.addEventListener("abort", teardown, { - once: true, - }); - } else { - teardown(); - } - } - - removeTeardown(teardown: () => void) { - this.#abortController.signal.removeEventListener("abort", teardown); - } -} - -function noop() {} - -function polyfill() { - const proto = EventTarget.prototype as any; - if (typeof proto.on !== "function") { - proto.on = function (type: Parameters[0]) { - return new Observable((subscriber) => { - this.addEventListener( - type, - (event) => { - subscriber.next(event); - }, - { signal: subscriber.signal } - ); - }); - }; - } - - if (typeof globalThis.Observable !== "function") { - (globalThis as any).Observable = Observable; - } -} - -polyfill(); - -function useSignalAll(signals: AbortSignal[]): AbortSignal { - if (typeof (AbortSignal as any).all === "function") { - return (AbortSignal as any).all(signals); - } else { - const ac = new AbortController(); - const handleAbort = () => { - ac.abort(); - for (const signal of signals) { - signal.removeEventListener("abort", handleAbort); - } - }; - for (const signal of signals) { - signal.addEventListener("abort", handleAbort); - } - return ac.signal; - } -} - -function isPromiseLike(value: any): value is PromiseLike { - return ( - value instanceof Promise || - (typeof value === "object" && - value !== null && - typeof value.then === "function") - ); -} diff --git a/interfaces.txt b/interfaces.txt new file mode 100644 index 0000000..5d5c7a9 --- /dev/null +++ b/interfaces.txt @@ -0,0 +1,73 @@ +AbortSignal +AbstractWorker +Animation +AudioScheduledSourceNode +AudioWorkletNode +BaseAudioContext +BroadcastChannel +Document +Element +EventSource +FileReader +FontFaceSet +GlobalEventHandlers +HTMLBodyElement +HTMLElement +HTMLFrameSetElement +HTMLMediaElement +HTMLVideoElement +IDBDatabase +IDBOpenDBRequest +IDBRequest +IDBTransaction +MIDIAccess +MIDIInput +MIDIPort +MathMLElement +MediaDevices +MediaKeySession +MediaQueryList +MediaRecorder +MediaSource +MediaStream +MediaStreamTrack +MessagePort +Notification +OfflineAudioContext +OffscreenCanvas +PaymentRequest +Performance +PermissionStatus +PictureInPictureWindow +RTCDTMFSender +RTCDataChannel +RTCDtlsTransport +RTCIceTransport +RTCPeerConnection +RTCSctpTransport +RemotePlayback +SVGElement +SVGSVGElement +ScreenOrientation +ScriptProcessorNode +ServiceWorker +ServiceWorkerContainer +ServiceWorkerRegistration +ShadowRoot +SourceBuffer +SourceBufferList +SpeechSynthesis +SpeechSynthesisUtterance +TextTrack +TextTrackCue +TextTrackList +VideoDecoder +VideoEncoder +VisualViewport +WakeLockSentinel +WebSocket +Window +WindowEventHandlers +Worker +XMLHttpRequest +XMLHttpRequestEventTarget \ No newline at end of file From c5fd151f3b8156ee825068b93f38f71f5a3f1ba2 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 29 Nov 2023 14:51:19 -0600 Subject: [PATCH 06/11] Correct teardown semantics --- impl/observable.js | 48 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/impl/observable.js b/impl/observable.js index 1f9ecc9..eae8731 100644 --- a/impl/observable.js +++ b/impl/observable.js @@ -620,6 +620,7 @@ class Subscriber { #active = true; #abortController = new AbortController(); #signal; + #teardowns; get signal() { return this.#signal; } @@ -650,17 +651,56 @@ class Subscriber { this.#abortController.abort(); } } + #teardownHandler = () => { + const teardowns = Array.from(this.#teardowns); + this.#teardowns = undefined; + let errors; + for (let i = teardowns.length - 1; i >= 0; i--) { + try { + teardowns[i](); + } catch (error) { + errors ??= []; + errors.push(error); + } + } + if (errors) { + // TODO: We need to figure out how to report multiple errors. + throw new Error( + `${errors.length} teardowns failed.\n ${errors.join('\n')}}`, + ); + } + }; addTeardown(teardown) { if (this.isActive) { - this.#abortController.signal.addEventListener('abort', teardown, { - once: true, - }); + if (!this.#teardowns) { + this.#teardowns = []; + this.#abortController.signal.addEventListener( + 'abort', + this.#teardownHandler, + { + once: true, + }, + ); + } + this.#teardowns.push(teardown); } else { teardown(); } } removeTeardown(teardown) { - this.#abortController.signal.removeEventListener('abort', teardown); + if (this.#teardowns) { + const index = this.#teardowns.indexOf(teardown); + if (index >= 0) { + this.#teardowns.splice(index, 1); + if (this.#teardowns.length === 0) { + this.#teardowns = undefined; + this.#abortController.signal.removeEventListener( + 'abort', + this.#teardownHandler, + ); + } + } + } } } function noop() {} From f77c8977dd6a5989a8a28456d9254d190bb093bd Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 5 Dec 2023 10:26:29 -0600 Subject: [PATCH 07/11] Ensure iteration does not occur if subscriber has been deactivated by consumer --- impl/observable.js | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/impl/observable.js b/impl/observable.js index eae8731..2041ea7 100644 --- a/impl/observable.js +++ b/impl/observable.js @@ -18,25 +18,39 @@ export class Observable { } if (Symbol.asyncIterator in value) { return new Observable(async (subscriber) => { + const iterator = value[Symbol.asyncIterator](); try { - for await (const v of value) { - subscriber.next(v); + while (subscriber.isActive) { + const { value, done } = await iterator.next(); + if (done) { + subscriber.complete(); + return; + } + subscriber.next(value); } - subscriber.complete(); } catch (error) { subscriber.error(error); + } finally { + iterator.return?.(); } }); } if (Symbol.iterator in value) { return new Observable((subscriber) => { + const iterator = value[Symbol.iterator](); try { - for (const v of value) { - if (subscriber.isActive) subscriber.next(v); + while (subscriber.isActive) { + const { value, done } = iterator.next(); + if (done) { + subscriber.complete(); + return; + } + subscriber.next(value); } - subscriber.complete(); } catch (error) { subscriber.error(error); + } finally { + iterator.return?.(); } }); } From 2171e3fce62c052cb4b23d4ea86eb2a1af38ca63 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 5 Dec 2023 15:56:57 -0600 Subject: [PATCH 08/11] AbortSignal.any not all. --- impl/observable.js | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/impl/observable.js b/impl/observable.js index 2041ea7..a917b0b 100644 --- a/impl/observable.js +++ b/impl/observable.js @@ -77,7 +77,7 @@ export class Observable { return new Promise((resolve, reject) => { const ac = new AbortController(); const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) + ? abortSignalAny([ac.signal, options.signal]) : ac.signal; this.subscribe( { @@ -215,7 +215,7 @@ export class Observable { let outerComplete = false; const startInner = (value) => { innerAC = new AbortController(); - const signal = useSignalAll([innerAC.signal, destination.signal]); + const signal = abortSignalAny([innerAC.signal, destination.signal]); let innerObservable; try { innerObservable = Observable.from(project(value, index++)); @@ -292,7 +292,7 @@ export class Observable { return new Promise((resolve, reject) => { const ac = new AbortController(); const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) + ? abortSignalAny([ac.signal, options.signal]) : ac.signal; let index = 0; this.subscribe( @@ -326,7 +326,7 @@ export class Observable { return new Promise((resolve, reject) => { const ac = new AbortController(); const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) + ? abortSignalAny([ac.signal, options.signal]) : ac.signal; let index = 0; this.subscribe( @@ -360,7 +360,7 @@ export class Observable { return new Promise((resolve, reject) => { const ac = new AbortController(); const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) + ? abortSignalAny([ac.signal, options.signal]) : ac.signal; let index = 0; this.subscribe( @@ -395,7 +395,7 @@ export class Observable { return new Promise((resolve, reject) => { const ac = new AbortController(); const signal = options?.signal - ? useSignalAll([ac.signal, options.signal]) + ? abortSignalAny([ac.signal, options.signal]) : ac.signal; let hasState = arguments.length >= 2; let state = initialValue; @@ -477,7 +477,7 @@ export class Observable { next: (value) => { innerAC?.abort(); innerAC = new AbortController(); - const signal = useSignalAll([innerAC.signal, destination.signal]); + const signal = abortSignalAny([innerAC.signal, destination.signal]); let innerObservable; try { innerObservable = Observable.from(project(value, index++)); @@ -644,7 +644,7 @@ class Subscriber { constructor(signal, _observer) { this._observer = _observer; const ownSignal = this.#abortController.signal; - this.#signal = signal ? useSignalAll([signal, ownSignal]) : ownSignal; + this.#signal = signal ? abortSignalAny([signal, ownSignal]) : ownSignal; } next(value) { if (this.isActive) { @@ -719,9 +719,9 @@ class Subscriber { } function noop() {} -function useSignalAll(signals) { - if (typeof AbortSignal.all === 'function') { - return AbortSignal.all(signals); +function abortSignalAny(signals) { + if (typeof AbortSignal.any === 'function') { + return AbortSignal.any(signals); } else { const ac = new AbortController(); const handleAbort = () => { From 2a0474c3bae8cb69099acc708bc8b37565fd61ab Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 19 Dec 2023 11:38:36 -0600 Subject: [PATCH 09/11] Remove methods that we haven't agreed on yet --- impl/observable.d.ts | 5 --- impl/observable.js | 96 -------------------------------------------- 2 files changed, 101 deletions(-) diff --git a/impl/observable.d.ts b/impl/observable.d.ts index 5d74042..f67737e 100644 --- a/impl/observable.d.ts +++ b/impl/observable.d.ts @@ -54,10 +54,6 @@ export declare class Observable { handleError: (error: unknown) => ConvertableToObservable, ): Observable; finally(onFinalize: () => void): Observable; - switchMap( - project: (value: T, index: number) => ConvertableToObservable, - ): Observable; - do(fnOrObserver: ((value: T) => void) | Partial>): Observable; [Symbol.asyncIterator](): AsyncGenerator; } @@ -71,7 +67,6 @@ declare class Subscriber implements Observer { error(error: any): void; complete(): void; addTeardown(teardown: () => void): void; - removeTeardown(teardown: () => void): void; } export {}; diff --git a/impl/observable.js b/impl/observable.js index a917b0b..05c60eb 100644 --- a/impl/observable.js +++ b/impl/observable.js @@ -467,87 +467,6 @@ export class Observable { }); }); } - switchMap(project) { - return new Observable((destination) => { - let index = 0; - let outerComplete = false; - let innerAC; - this.subscribe( - { - next: (value) => { - innerAC?.abort(); - innerAC = new AbortController(); - const signal = abortSignalAny([innerAC.signal, destination.signal]); - let innerObservable; - try { - innerObservable = Observable.from(project(value, index++)); - } catch (error) { - destination.error(error); - return; - } - innerObservable.subscribe( - { - next(innerValue) { - destination.next(innerValue); - }, - error(error) { - destination.error(error); - }, - complete() { - innerAC = undefined; - if (outerComplete) { - destination.complete(); - } - }, - }, - { - signal, - }, - ); - }, - error(error) { - destination.error(error); - }, - complete() { - outerComplete = true; - if (!innerAC) { - destination.complete(); - } - }, - }, - { - signal: destination.signal, - }, - ); - }); - } - do(fnOrObserver) { - return new Observable((destination) => { - const doObserver = - typeof fnOrObserver === 'function' - ? { next: fnOrObserver } - : fnOrObserver; - this.subscribe( - { - next(value) { - doObserver.next?.(value); - destination.next(value); - }, - error(error) { - doObserver.error?.(error); - destination.error(error); - }, - complete() { - doObserver.complete?.(); - destination.complete(); - }, - }, - { - signal: destination.signal, - }, - ); - }); - } [Symbol.asyncIterator]() { let ac; let deferred = []; @@ -701,21 +620,6 @@ class Subscriber { teardown(); } } - removeTeardown(teardown) { - if (this.#teardowns) { - const index = this.#teardowns.indexOf(teardown); - if (index >= 0) { - this.#teardowns.splice(index, 1); - if (this.#teardowns.length === 0) { - this.#teardowns = undefined; - this.#abortController.signal.removeEventListener( - 'abort', - this.#teardownHandler, - ); - } - } - } - } } function noop() {} From 6765ffb038a278d9d6a87b23539ad70d26b71e3f Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 2 Jan 2024 15:56:18 -0600 Subject: [PATCH 10/11] Remove unnecessary temp file --- interfaces.txt | 73 -------------------------------------------------- 1 file changed, 73 deletions(-) delete mode 100644 interfaces.txt diff --git a/interfaces.txt b/interfaces.txt deleted file mode 100644 index 5d5c7a9..0000000 --- a/interfaces.txt +++ /dev/null @@ -1,73 +0,0 @@ -AbortSignal -AbstractWorker -Animation -AudioScheduledSourceNode -AudioWorkletNode -BaseAudioContext -BroadcastChannel -Document -Element -EventSource -FileReader -FontFaceSet -GlobalEventHandlers -HTMLBodyElement -HTMLElement -HTMLFrameSetElement -HTMLMediaElement -HTMLVideoElement -IDBDatabase -IDBOpenDBRequest -IDBRequest -IDBTransaction -MIDIAccess -MIDIInput -MIDIPort -MathMLElement -MediaDevices -MediaKeySession -MediaQueryList -MediaRecorder -MediaSource -MediaStream -MediaStreamTrack -MessagePort -Notification -OfflineAudioContext -OffscreenCanvas -PaymentRequest -Performance -PermissionStatus -PictureInPictureWindow -RTCDTMFSender -RTCDataChannel -RTCDtlsTransport -RTCIceTransport -RTCPeerConnection -RTCSctpTransport -RemotePlayback -SVGElement -SVGSVGElement -ScreenOrientation -ScriptProcessorNode -ServiceWorker -ServiceWorkerContainer -ServiceWorkerRegistration -ShadowRoot -SourceBuffer -SourceBufferList -SpeechSynthesis -SpeechSynthesisUtterance -TextTrack -TextTrackCue -TextTrackList -VideoDecoder -VideoEncoder -VisualViewport -WakeLockSentinel -WebSocket -Window -WindowEventHandlers -Worker -XMLHttpRequest -XMLHttpRequestEventTarget \ No newline at end of file From 865b27ffce756030d768040b31a9e8c9aed9bf87 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 2 Jan 2024 15:57:27 -0600 Subject: [PATCH 11/11] Fix Dominic's name --- LICENSE.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE.txt b/LICENSE.txt index 2b54eb8..0693147 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2023 Ben Lesh , Domenic Farolino +Copyright (c) 2023 Ben Lesh , Dominic Farolino Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal