diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..0693147 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 Ben Lesh , Dominic Farolino + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/impl/observable-polyfill.d.ts b/impl/observable-polyfill.d.ts new file mode 100644 index 0000000..2871499 --- /dev/null +++ b/impl/observable-polyfill.d.ts @@ -0,0 +1,379 @@ +import { Observable } from './observable'; + +declare global { + interface EventTarget { + on(eventType: string): Observable; + } + interface AbortSignal { + on( + eventType: K, + ): Observable; + } + interface AbstractWorker { + on( + eventType: K, + ): Observable; + } + interface Animation { + on( + eventType: K, + ): Observable; + } + interface AudioScheduledSourceNode { + on( + eventType: K, + ): Observable; + } + interface AudioWorkletNode { + on( + eventType: K, + ): Observable; + } + interface BaseAudioContext { + on( + eventType: K, + ): Observable; + } + interface BroadcastChannel { + on( + eventType: K, + ): Observable; + } + interface Document { + on( + eventType: K, + ): Observable; + } + interface Element { + on( + eventType: K, + ): Observable; + } + interface EventSource { + on( + eventType: K, + ): Observable; + } + interface FileReader { + on( + eventType: K, + ): Observable; + } + interface FontFaceSet { + on( + eventType: K, + ): Observable; + } + interface GlobalEventHandlers { + on( + eventType: K, + ): Observable; + } + interface HTMLBodyElement { + on( + eventType: K, + ): Observable; + } + interface HTMLElement { + on( + eventType: K, + ): Observable; + } + interface HTMLFrameSetElement { + on( + eventType: K, + ): Observable; + } + interface HTMLMediaElement { + on( + eventType: K, + ): Observable; + } + interface HTMLVideoElement { + on( + eventType: K, + ): Observable; + } + interface IDBDatabase { + on( + eventType: K, + ): Observable; + } + interface IDBOpenDBRequest { + on( + eventType: K, + ): Observable; + } + interface IDBRequest { + on( + eventType: K, + ): Observable; + } + interface IDBTransaction { + on( + eventType: K, + ): Observable; + } + interface MIDIAccess { + on( + eventType: K, + ): Observable; + } + interface MIDIInput { + on( + eventType: K, + ): Observable; + } + interface MIDIPort { + on( + eventType: K, + ): Observable; + } + interface MathMLElement { + on( + eventType: K, + ): Observable; + } + interface MediaDevices { + on( + eventType: K, + ): Observable; + } + interface MediaKeySession { + on( + eventType: K, + ): Observable; + } + interface MediaQueryList { + on( + eventType: K, + ): Observable; + } + interface MediaRecorder { + on( + eventType: K, + ): Observable; + } + interface MediaSource { + on( + eventType: K, + ): Observable; + } + interface MediaStream { + on( + eventType: K, + ): Observable; + } + interface MediaStreamTrack { + on( + eventType: K, + ): Observable; + } + interface MessagePort { + on( + eventType: K, + ): Observable; + } + interface Notification { + on( + eventType: K, + ): Observable; + } + interface OfflineAudioContext { + on( + eventType: K, + ): Observable; + } + interface OffscreenCanvas { + on( + eventType: K, + ): Observable; + } + interface PaymentRequest { + on( + eventType: K, + ): Observable; + } + interface Performance { + on( + eventType: K, + ): Observable; + } + interface PermissionStatus { + on( + eventType: K, + ): Observable; + } + interface PictureInPictureWindow { + on( + eventType: K, + ): Observable; + } + interface RTCDTMFSender { + on( + eventType: K, + ): Observable; + } + interface RTCDataChannel { + on( + eventType: K, + ): Observable; + } + interface RTCDtlsTransport { + on( + eventType: K, + ): Observable; + } + interface RTCIceTransport { + on( + eventType: K, + ): Observable; + } + interface RTCPeerConnection { + on( + eventType: K, + ): Observable; + } + interface RTCSctpTransport { + on( + eventType: K, + ): Observable; + } + interface RemotePlayback { + on( + eventType: K, + ): Observable; + } + interface SharedWorker { + on( + eventType: K, + ): Observable; + } + interface SVGElement { + on( + eventType: K, + ): Observable; + } + interface SVGSVGElement { + on( + eventType: K, + ): Observable; + } + interface ScreenOrientation { + on( + eventType: K, + ): Observable; + } + interface ScriptProcessorNode { + on( + eventType: K, + ): Observable; + } + interface ServiceWorker { + on( + eventType: K, + ): Observable; + } + interface ServiceWorkerContainer { + on( + eventType: K, + ): Observable; + } + interface ServiceWorkerRegistration { + on( + eventType: K, + ): Observable; + } + interface ShadowRoot { + on( + eventType: K, + ): Observable; + } + interface SourceBuffer { + on( + eventType: K, + ): Observable; + } + interface SourceBufferList { + on( + eventType: K, + ): Observable; + } + interface SpeechSynthesis { + on( + eventType: K, + ): Observable; + } + interface SpeechSynthesisUtterance { + on( + eventType: K, + ): Observable; + } + interface TextTrack { + on( + eventType: K, + ): Observable; + } + interface TextTrackCue { + on( + eventType: K, + ): Observable; + } + interface TextTrackList { + on( + eventType: K, + ): Observable; + } + interface VideoDecoder { + on( + eventType: K, + ): Observable; + } + interface VideoEncoder { + on( + eventType: K, + ): Observable; + } + interface VisualViewport { + on( + eventType: K, + ): Observable; + } + interface WakeLockSentinel { + on( + eventType: K, + ): Observable; + } + interface WebSocket { + on( + eventType: K, + ): Observable; + } + interface Window { + on( + eventType: K, + ): Observable; + } + interface WindowEventHandlers { + on( + eventType: K, + ): Observable; + } + interface Worker { + on( + eventType: K, + ): Observable; + } + interface XMLHttpRequest { + on( + eventType: K, + ): Observable; + } + interface XMLHttpRequestEventTarget { + on( + eventType: K, + ): Observable; + } +} + +export {}; diff --git a/impl/observable-polyfill.js b/impl/observable-polyfill.js new file mode 100644 index 0000000..0349ea9 --- /dev/null +++ b/impl/observable-polyfill.js @@ -0,0 +1,23 @@ +import { Observable } from './observable.js'; + +function polyfill() { + const proto = EventTarget.prototype; + if (typeof proto.on !== 'function') { + proto.on = function (type) { + return new Observable((subscriber) => { + this.addEventListener( + type, + (event) => { + subscriber.next(event); + }, + { signal: subscriber.signal }, + ); + }); + }; + } + if (typeof globalThis.Observable !== 'function') { + globalThis.Observable = Observable; + } +} + +polyfill(); diff --git a/impl/observable.d.ts b/impl/observable.d.ts new file mode 100644 index 0000000..f67737e --- /dev/null +++ b/impl/observable.d.ts @@ -0,0 +1,72 @@ +export interface Observer { + next(value: T): void; + error(error: any): void; + complete(): void; +} +export interface SubscriptionOptions { + signal?: AbortSignal; +} +type ConvertableToObservable = + | Observable + | PromiseLike + | Iterable + | AsyncIterable; +export declare class Observable { + private start; + static from(value: ConvertableToObservable): Observable; + constructor(start: (subscriber: Subscriber) => void); + subscribe(onNext: (value: T) => void, options?: SubscriptionOptions): void; + subscribe( + observer: Partial>, + options?: SubscriptionOptions, + ): void; + forEach( + callback: (value: T) => void, + options?: SubscriptionOptions, + ): Promise; + map(project: (value: T, index: number) => R): Observable; + filter(predicate: (value: T, index: number) => boolean): Observable; + take(count: number): Observable; + drop(count: number): Observable; + flatMap( + project: (value: T, index: number) => ConvertableToObservable, + ): Observable; + takeUntil(notifier: ConvertableToObservable): Observable; + every( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions, + ): Promise; + some( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions, + ): Promise; + find( + predicate: (value: T, index: number) => boolean, + options?: SubscriptionOptions, + ): Promise; + reduce( + reducer: (accumulated: S, value: T, index: number) => S, + initialValue?: S, + options?: SubscriptionOptions, + ): Promise; + toArray(options?: SubscriptionOptions): Promise; + catch( + handleError: (error: unknown) => ConvertableToObservable, + ): Observable; + finally(onFinalize: () => void): Observable; + [Symbol.asyncIterator](): AsyncGenerator; +} + +declare class Subscriber implements Observer { + #private; + private _observer; + get signal(): AbortSignal; + get isActive(): boolean; + constructor(signal: AbortSignal | undefined, _observer: Observer); + next(value: T): void; + error(error: any): void; + complete(): void; + addTeardown(teardown: () => void): void; +} + +export {}; diff --git a/impl/observable.js b/impl/observable.js new file mode 100644 index 0000000..05c60eb --- /dev/null +++ b/impl/observable.js @@ -0,0 +1,650 @@ +export class Observable { + static from(value) { + if (value instanceof Observable) { + return value; + } + if (isPromiseLike(value)) { + return new Observable((subscriber) => { + value.then( + (value) => { + subscriber.next(value); + subscriber.complete(); + }, + (error) => { + subscriber.error(error); + }, + ); + }); + } + if (Symbol.asyncIterator in value) { + return new Observable(async (subscriber) => { + const iterator = value[Symbol.asyncIterator](); + try { + while (subscriber.isActive) { + const { value, done } = await iterator.next(); + if (done) { + subscriber.complete(); + return; + } + subscriber.next(value); + } + } catch (error) { + subscriber.error(error); + } finally { + iterator.return?.(); + } + }); + } + if (Symbol.iterator in value) { + return new Observable((subscriber) => { + const iterator = value[Symbol.iterator](); + try { + while (subscriber.isActive) { + const { value, done } = iterator.next(); + if (done) { + subscriber.complete(); + return; + } + subscriber.next(value); + } + } catch (error) { + subscriber.error(error); + } finally { + iterator.return?.(); + } + }); + } + throw new TypeError('Value is not observable'); + } + constructor(start) { + this.start = start; + } + subscribe(fnOrObserver, options) { + const partialObserver = + typeof fnOrObserver === 'function' + ? { next: fnOrObserver } + : fnOrObserver; + const observer = { + next: noop, + error: reportError, + complete: noop, + ...partialObserver, + }; + const subscriber = new Subscriber(options?.signal, observer); + this.start(subscriber); + } + forEach(callback, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? abortSignalAny([ac.signal, options.signal]) + : ac.signal; + this.subscribe( + { + next: (value) => { + try { + callback(value); + } catch (error) { + reject(error); + ac.abort(); + } + }, + error: reject, + complete: resolve, + }, + { + signal, + }, + ); + }); + } + map(project) { + return new Observable((destination) => { + let index = 0; + this.subscribe( + { + next(value) { + let result; + try { + result = project(value, index++); + } catch (error) { + destination.error(error); + return; + } + destination.next(result); + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + filter(predicate) { + return new Observable((destination) => { + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + destination.error(error); + return; + } + if (result) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + take(count) { + return new Observable((destination) => { + let remaining = count; + this.subscribe( + { + next(value) { + remaining--; + if (remaining >= 0) { + destination.next(value); + } + if (remaining === 0) { + destination.complete(); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + drop(count) { + return new Observable((destination) => { + let seen = 0; + this.subscribe( + { + next(value) { + seen++; + if (seen > count) { + destination.next(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + flatMap(project) { + return new Observable((destination) => { + let queue = []; + let index = 0; + let innerAC; + let outerComplete = false; + const startInner = (value) => { + innerAC = new AbortController(); + const signal = abortSignalAny([innerAC.signal, destination.signal]); + let innerObservable; + try { + innerObservable = Observable.from(project(value, index++)); + } catch (error) { + destination.error(error); + return; + } + innerObservable.subscribe( + { + next(innerValue) { + destination.next(innerValue); + }, + error(error) { + destination.error(error); + }, + complete() { + innerAC = undefined; + if (queue.length > 0) { + startInner(queue.shift()); + } else if (outerComplete) { + destination.complete(); + } + }, + }, + { + signal, + }, + ); + }; + this.subscribe( + { + next(value) { + if (innerAC) { + queue.push(value); + } else { + startInner(value); + } + }, + error(error) { + destination.error(error); + }, + complete() { + outerComplete = true; + if (queue.length === 0) { + destination.complete(); + } + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + takeUntil(notifier) { + return new Observable((destination) => { + Observable.from(notifier).subscribe( + { + next() { + destination.complete(); + }, + error(error) { + destination.error(error); + }, + }, + { + signal: destination.signal, + }, + ); + this.subscribe(destination, { signal: destination.signal }); + }); + } + every(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? abortSignalAny([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (!result) { + resolve(false); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(true); + }, + }, + { + signal, + }, + ); + }); + } + some(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? abortSignalAny([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(true); + ac.abort(); + } + }, + error: reject, + complete() { + resolve(false); + }, + }, + { + signal, + }, + ); + }); + } + find(predicate, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? abortSignalAny([ac.signal, options.signal]) + : ac.signal; + let index = 0; + this.subscribe( + { + next(value) { + let result = false; + try { + result = predicate(value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + if (result) { + resolve(value); + ac.abort(); + } + }, + error: reject, + complete() { + // TODO: Figure out the proper semantics here. + reject(new Error('Value not found')); + }, + }, + { + signal, + }, + ); + }); + } + reduce(reducer, initialValue, options) { + return new Promise((resolve, reject) => { + const ac = new AbortController(); + const signal = options?.signal + ? abortSignalAny([ac.signal, options.signal]) + : ac.signal; + let hasState = arguments.length >= 2; + let state = initialValue; + let index = 0; + this.subscribe( + { + next(value) { + if (hasState) { + try { + state = reducer(state, value, index++); + } catch (error) { + reject(error); + ac.abort(); + return; + } + } else { + state = value; + } + hasState = true; + }, + error: reject, + complete() { + resolve(state); + }, + }, + { + signal, + }, + ); + }); + } + toArray(options) { + return this.reduce( + (arr, value) => { + arr.push(value); + return arr; + }, + [], + options, + ); + } + catch(handleError) { + return new Observable((destination) => { + this.subscribe( + { + next(value) { + destination.next(value); + }, + error(error) { + Observable.from(handleError(error)).subscribe(destination, { + signal: destination.signal, + }); + }, + complete() { + destination.complete(); + }, + }, + { + signal: destination.signal, + }, + ); + }); + } + finally(onFinalize) { + return new Observable((destination) => { + destination.addTeardown(onFinalize); + this.subscribe(destination, { + signal: destination.signal, + }); + }); + } + [Symbol.asyncIterator]() { + let ac; + let deferred = []; + let buffer = []; + let hasError = false; + let error = undefined; + let isComplete = false; + return { + next: () => { + return new Promise((resolve, reject) => { + if (buffer.length > 0) { + resolve({ value: buffer.shift(), done: false }); + return; + } + if (hasError) { + reject(error); + return; + } + if (isComplete) { + resolve({ value: undefined, done: true }); + return; + } + if (!ac) { + ac = new AbortController(); + this.subscribe( + { + next(value) { + if (deferred.length > 0) { + const [resolve] = deferred.shift(); + resolve({ value, done: false }); + } else { + buffer.push(value); + } + }, + error(err) { + if (buffer.length > 0) { + hasError = true; + error = err; + } else { + while (deferred.length > 0) { + const [, reject] = deferred.shift(); + reject(err); + } + } + }, + complete() {}, + }, + { + signal: ac.signal, + }, + ); + } + deferred.push([resolve, reject]); + }); + }, + throw: (err) => { + return new Promise((_resolve, reject) => { + ac?.abort(); + hasError = true; + error = err; + for (const [, deferredReject] of deferred) { + deferredReject(error); + } + reject(error); + }); + }, + return: () => { + return new Promise((resolve) => { + ac?.abort(); + isComplete = true; + for (const [deferredResolve] of deferred) { + deferredResolve({ value: undefined, done: true }); + } + resolve({ value: undefined, done: true }); + }); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + } +} +class Subscriber { + #active = true; + #abortController = new AbortController(); + #signal; + #teardowns; + get signal() { + return this.#signal; + } + get isActive() { + return this.#active && !this.signal.aborted; + } + constructor(signal, _observer) { + this._observer = _observer; + const ownSignal = this.#abortController.signal; + this.#signal = signal ? abortSignalAny([signal, ownSignal]) : ownSignal; + } + next(value) { + if (this.isActive) { + this._observer.next(value); + } + } + error(error) { + if (this.isActive) { + this.#active = false; + this._observer.error(error); + this.#abortController.abort(); + } + } + complete() { + if (this.isActive) { + this.#active = false; + this._observer.complete(); + this.#abortController.abort(); + } + } + #teardownHandler = () => { + const teardowns = Array.from(this.#teardowns); + this.#teardowns = undefined; + let errors; + for (let i = teardowns.length - 1; i >= 0; i--) { + try { + teardowns[i](); + } catch (error) { + errors ??= []; + errors.push(error); + } + } + if (errors) { + // TODO: We need to figure out how to report multiple errors. + throw new Error( + `${errors.length} teardowns failed.\n ${errors.join('\n')}}`, + ); + } + }; + addTeardown(teardown) { + if (this.isActive) { + if (!this.#teardowns) { + this.#teardowns = []; + this.#abortController.signal.addEventListener( + 'abort', + this.#teardownHandler, + { + once: true, + }, + ); + } + this.#teardowns.push(teardown); + } else { + teardown(); + } + } +} +function noop() {} + +function abortSignalAny(signals) { + if (typeof AbortSignal.any === 'function') { + return AbortSignal.any(signals); + } else { + const ac = new AbortController(); + const handleAbort = () => { + ac.abort(); + for (const signal of signals) { + signal.removeEventListener('abort', handleAbort); + } + }; + for (const signal of signals) { + signal.addEventListener('abort', handleAbort); + } + return ac.signal; + } +} +function isPromiseLike(value) { + return ( + value instanceof Promise || + (typeof value === 'object' && + value !== null && + typeof value.then === 'function') + ); +}