From bc9d8ad24528aa0c3f9289072dcc96b9eef562a1 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 1 Aug 2023 14:30:22 -0500 Subject: [PATCH] Create initial reference implementation Just a spike in TypeScript. Will add tests in a followup, add compilation to JavaScript, etc. --- impl/observable.ts | 496 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 496 insertions(+) create mode 100644 impl/observable.ts diff --git a/impl/observable.ts b/impl/observable.ts new file mode 100644 index 0000000..7276107 --- /dev/null +++ b/impl/observable.ts @@ -0,0 +1,496 @@ +/** +MIT License + +Copyright (c) 2023 Ben Lesh , Domenic Farolino + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +export interface Observer { + next(value: T): void; + error(error: any): void; + complete(): void; +} + +export interface SubscriptionOptions extends Partial> { + signal?: AbortSignal; +} + +export class Observable { + constructor(private start: (subscriber: Subscriber) => void) { } + + subscribe(options?: SubscriptionOptions) { + const observer: Observer = options + ? typeof options === 'function' + ? { next: options, error: reportError, complete: noop } + : typeof options.next === 'function' && + typeof options.error === 'function' && + typeof options.complete === 'function' + ? (options as Observer) + : { + next: (value) => options.next?.(value), + error: (error) => { + if (options.error) { + options.error(error); + } else { + reportError(error); + } + }, + complete: () => options.complete?.(), + } + : { + next: noop, + error: reportError, + complete: noop, + }; + + const subscriber = new Subscriber(options?.signal, observer); + + this.start(subscriber); + } + + forEach( + callback: (value: T) => void, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options?.signal); + + this.subscribe({ + next: (value) => { + try { + callback(value); + } catch (error) { + reject(error); + ac.abort(); + } + }, + error: reject, + complete: resolve, + signal: ac.signal, + }); + }); + } + + map(project: (value: T, index: number) => R): Observable { + return new Observable((destination) => { + let index = 0; + + this.subscribe({ + next(value) { + let result: R; + try { + result = project(value, index++); + } catch (error) { + destination.error(error); + return; + } + destination.next(result); + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + signal: destination.signal, + }); + }); + } + + filter(predicate: (value: T, index: number) => boolean): Observable { + return new Observable((destination) => { + let index = 0; + + this.subscribe({ + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + destination.error(error); + return; + } + if (result) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + signal: destination.signal, + }); + }); + } + + take(count: number): Observable { + return new Observable((destination) => { + let remaining = count; + + this.subscribe({ + next(value) { + remaining--; + if (remaining >= 0) { + destination.next(value); + } + if (remaining === 0) { + destination.complete(); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + signal: destination.signal, + }); + }); + } + + drop(count: number): Observable { + return new Observable((destination) => { + let seen = 0; + + this.subscribe({ + next(value) { + seen++; + if (seen > count) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + signal: destination.signal, + }); + }); + } + + flatMap( + project: (value: T, index: number) => Observable + ): Observable { + return new Observable((destination) => { + let queue: T[] = []; + let index = 0; + let innerAC: AbortController | undefined; + let outerComplete = false; + + const startInner = (value: T) => { + innerAC = new AbortController(); + maybeFollowSignal(innerAC, destination.signal); + + let innerObservable: Observable; + try { + innerObservable = project(value, index++); + } catch (error) { + destination.error(error); + return; + } + + innerObservable.subscribe({ + next(innerValue) { + destination.next(innerValue); + }, + error(error) { + destination.error(error); + }, + complete() { + innerAC = undefined; + if (queue.length > 0) { + startInner(queue.shift()!); + } else if (outerComplete) { + destination.complete(); + } + }, + signal: innerAC.signal, + }); + }; + + this.subscribe({ + next(value) { + if (innerAC) { + queue.push(value); + } else { + startInner(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (queue.length === 0) { + destination.complete(); + } + }, + signal: destination.signal, + }); + }); + } + + takeUntil(notifier: Observable): Observable { + return new Observable((destination) => { + notifier.subscribe({ + next() { + destination.complete(); + }, + error(error) { + destination.error(error); + }, + signal: destination.signal, + }); + this.subscribe(destination); + }); + } + + every( + predicate: (value: T, index: number) => boolean, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options.signal); + let index = 0; + this.subscribe({ + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (!result) { + resolve(false); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(true); + }, + signal: ac.signal, + }); + }); + } + + some( + predicate: (value: T, index: number) => boolean, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options.signal); + let index = 0; + this.subscribe({ + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(true); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(false); + }, + signal: ac.signal, + }); + }); + } + + find( + predicate: (value: T, index: number) => boolean, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options.signal); + let index = 0; + this.subscribe({ + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(value); + ac.abort(); + } + }, + error: reject, + complete() { + // TODO: Figure out the proper semantics here. + reject(new Error('Value not found')); + }, + signal: ac.signal, + }); + }); + } + + reduce( + reducer: (accumulated: S, value: T, index: number) => S, + initialValue?: S, + options?: { signal?: AbortSignal } + ): Promise { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + maybeFollowSignal(ac, options.signal); + let hasState = arguments.length >= 2; + let state = initialValue; + let index = 0; + this.subscribe({ + next(value) { + if (hasState) { + try { + state = reducer(state, value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + } else { + state = value as any; + } + }, + error: reject, + complete() { + resolve(state); + }, + signal: ac.signal, + }); + }); + } + + toArray(options?: { signal?: AbortSignal }): Promise { + return this.reduce( + (arr, value) => { + arr.push(value); + return arr; + }, + [], + options + ); + } +} + +class Subscriber implements Observer { + #stopped = false; + + #abortController = new AbortController(); + + get signal() { + return this.#abortController.signal; + } + + get closed() { + return this.#stopped || this.#abortController.signal.aborted; + } + + constructor( + private _signal: AbortSignal | undefined, + private _observer: Observer + ) { + maybeFollowSignal(this.#abortController, this._signal); + } + + next(value: T): void { + if (!this.closed) { + this._observer.next(value); + } + } + + error(error: any): void { + if (!this.closed) { + this.#stopped = true; + this._observer.error(error); + this.#abortController.abort(); + } + } + + complete(): void { + if (!this.closed) { + this.#stopped = true; + this._observer.complete(); + this.#abortController.abort(); + } + } +} + +function noop() { } + +function maybeFollowSignal( + abortController: AbortController, + signal: AbortSignal | undefined +) { + if (signal) { + const parentAbortHandler = () => { + abortController.abort(); + }; + signal.addEventListener('abort', parentAbortHandler, { + once: true, + }); + abortController.signal.addEventListener('abort', () => { + signal.removeEventListener('abort', parentAbortHandler); + }); + } +} + +function polyfill() { + const proto = EventTarget.prototype as any; + if (typeof proto.on !== 'function') { + proto.on = function (type: Parameters[0]) { + return new Observable((subscriber) => { + this.addEventListener( + type, + (event) => { + subscriber.next(event); + }, + { signal: subscriber.signal } + ); + }); + }; + } + + if (typeof globalThis.Observable !== 'function') { + (globalThis as any).Observable = Observable; + } +} + +polyfill();