diff --git a/src/async.ts b/src/async.ts new file mode 100644 index 0000000..ff1491f --- /dev/null +++ b/src/async.ts @@ -0,0 +1,298 @@ +type OnOffEventEmitter = { + on: (eventName: E, listener: (data: T) => void, ...args: any[]) => void + off: (eventName: E, listener: (data: T) => void, ...args: any[]) => void +} +type AddRemoveListenerEventEmitter = { + addListener: (eventName: E, listener: (data: T) => void, ...args: any[]) => void + removeListener: (eventName: E, listener: (data: T) => void, ...args: any[]) => void +} + +type EventEmitter = OnOffEventEmitter | AddRemoveListenerEventEmitter + +/** + * Converts an event emitter to an async generator. + * @param on function to add a listener to the event emitter + * @param off function to remove a listener from the event emitter + * @returns an async generator that yields events from the event emitter + * @example + * const emitter = new EventEmitter() + * const gen = fromEventHandler(on => emitter.on('event', on), off => emitter.off('event', off)) + * (async () => { + * for await (const event of gen) { + * console.log(event) + * } + * })() + * emitter.emit('event', 'hello') // logs 'hello' + * emitter.emit('event', 'world') // logs 'world' + */ +export async function* fromEventHandler( + on: (listener: (data: T) => void) => void, + off: (listener: (data: T) => void) => void +): AsyncGenerator { + // Queue to hold events until they can be yielded + const eventQueue: T[] = []; + // Resolve function for the current waiting promise, if any + let resolve: ((value: T) => void) | null = null; + + // The event listener either resolves a promise with new data or queues the data + const listener = (data: T) => { + if (resolve) { + resolve(data); + resolve = null; // Reset resolve since it's been fulfilled + } else { + eventQueue.push(data); + } + }; + + // Register the event listener + on(listener); + + try { + while (true) { + // If there's no event to yield, wait for the next one + if (eventQueue.length === 0) { + // Wait for the next event + await new Promise((r) => { + resolve = r; + }).then((data) => { + // Directly yield the data when it arrives + eventQueue.push(data); + }); + } + + // Yield the next event from the queue + if (eventQueue.length > 0) { + yield eventQueue.shift()!; + } + } + } finally { + // Cleanup: remove the event listener + off(listener); + } +} + +/** + * Converts an event emitter to an async generator. + * @param eventEmitter event emitter to convert to an async generator + * @param eventName name of the event to listen for + * @returns an async generator that yields events from the event emitter + * @example + * const emitter = new EventEmitter() + * const gen = fromEvent(emitter, 'event') + * (async () => { + * for await (const event of gen) { + * console.log(event) + * } + * })() + * emitter.emit('event', 'hello') // logs 'hello' + * emitter.emit('event', 'world') // logs 'world' + */ +export function fromEvent(eventEmitter: EventEmitter, eventName: E): AsyncGenerator; +export function fromEvent(eventEmitter: EventEmitter, eventName: string): AsyncGenerator; +export function fromEvent(eventEmitter: EventEmitter, eventName: E): AsyncGenerator { + return fromEventHandler( + (listener) => { + if ('on' in eventEmitter) { + eventEmitter.on(eventName, listener) + } else { + eventEmitter.addListener(eventName, listener) + } + }, + (listener) => { + if ('off' in eventEmitter) { + eventEmitter.off(eventName, listener) + } else { + eventEmitter.removeListener(eventName, listener) + } + } + ); +} + +/** + * Merges multiple `AsyncIterable` objects into a single asynchronous generator. This function + * takes any number of `AsyncIterable` inputs and returns an `AsyncGenerator` that yields values + * from each of the input iterables as they become available. The merging operation does not + * guarantee the order in which values are yielded from the different inputs; values are yielded + * as soon as they are resolved from any of the input iterables. + * + * @param iterators - A rest parameter of `AsyncIterable` objects to be merged. + * @returns An `AsyncGenerator` that yields values from the input iterables as they are available. + * + * @example + * ```typescript + * async function* asyncNumbers() { + * yield* [1, 2, 3]; + * } + * + * async function* asyncLetters() { + * yield* ['a', 'b', 'c']; + * } + * + * (async () => { + * const merged = merge(asyncNumbers(), asyncLetters()); + * for await (const value of merged) { + * console.log(value); // Outputs 1, 'a', 2, 'b', 3, 'c' in no specific order + * } + * })(); + * ``` + * + * Note: The output order depends on the resolution of the yielded values from each input iterable and + * may vary with each execution. + */ +export async function* merge(...iterators: AsyncIterable[]): AsyncGenerator { + const sources = iterators.map(it => it[Symbol.asyncIterator]()); + let active = sources.length; + + const nexts: Promise<{ value: IteratorResult; index: number }>[] = sources.map((source, index) => + source.next().then(value => ({ value, index })) + ); + + try { + while (active > 0) { + const { value, index } = await Promise.race(nexts); + if (value.done) { + active--; + nexts[index] = new Promise(() => { }); // Replace with a never-resolving promise + } else { + yield value.value; + nexts[index] = sources[index].next().then(value => ({ value, index })); + } + } + } finally { + // Clean-up: make sure all iterators are properly closed + for (const source of sources) { + if (typeof source.return === 'function') { + await source.return(); + } + } + } +} + +/** + * Merges two `AsyncIterable` objects into a single asynchronous generator. This function takes + * two `AsyncIterable` inputs and returns an `AsyncGenerator` that yields values from each of the + * input iterables as they become available. The merging operation does not guarantee the order in + * which values are yielded from the different inputs; values are yielded as soon as they are + * resolved from any of the input iterables. + * + * @param gen1 - The first `AsyncIterable` to merge. + * @param gen2 - The second `AsyncIterable` to merge. + * @returns An `AsyncGenerator` that yields values from the input iterables as they are available. + * + * @example + * ```typescript + * async function* asyncNumbers() { + * yield* [1, 2, 3]; + * } + * + * async function* asyncLetters() { + * yield* ['a', 'b', 'c']; + * } + * + * (async () => { + * const merged = merge2(asyncNumbers(), asyncLetters()); + * for await (const value of merged) { + * console.log(value); // Outputs 1, 'a', 2, 'b', 3, 'c' in no specific order + * } + * })(); + * ``` + * + * Note: The output order depends on the resolution of the yielded values from each input iterable and + * may vary with each execution. + */ +export function merge2(gen1: AsyncGenerator, gen2: AsyncGenerator): AsyncGenerator { + return merge(gen1, gen2) +} + +/** + * Maps the values of an `AsyncGenerator` using a mapping function. This function takes an + * `AsyncGenerator` and a mapping function and returns an `AsyncGenerator` that yields the + * values of the input generator after applying the mapping function to each value. + * + * @param gen - The input `AsyncGenerator`. + * @param fn - The mapping function to apply to the values of the input generator. + * @returns An `AsyncGenerator` that yields the mapped values of the input generator. + * + * @example + * ```typescript + * async function* asyncNumbers() { + * yield* [1, 2, 3]; + * } + * + * (async () => { + * const doubled = map(asyncNumbers(), x => x * 2); + * for await (const value of doubled) { + * console.log(value); // Outputs 2, 4, 6 + * } + * })(); + * ``` + */ +export async function* map(gen: AsyncGenerator, fn: (value: T) => U): AsyncGenerator { + for await (const value of gen) { + yield fn(value) + } +} + +/** + * Filters the values of an `AsyncGenerator` using a predicate function. This function takes an + * `AsyncGenerator` and a predicate function and returns an `AsyncGenerator` that yields the + * values of the input generator that satisfy the predicate. + * + * @param gen - The input `AsyncGenerator`. + * @param predicate - The predicate function to filter the values of the input generator. + * @returns An `AsyncGenerator` that yields the values of the input generator that satisfy the predicate. + * + * @example + * ```typescript + * async function* asyncNumbers() { + * yield* [1, 2, 3, 4, 5]; + * } + * + * (async () => { + * const evens = filter(asyncNumbers(), x => x % 2 === 0); + * for await (const value of evens) { + * console.log(value); // Outputs 2, 4 + * } + * })(); + * ``` + */ +export async function* filter(gen: AsyncGenerator, predicate: (value: T) => boolean): AsyncGenerator { + for await (const value of gen) { + if (predicate(value)) { + yield value + } + } +} + +/** + * Takes the first `n` values from an `AsyncGenerator`. This function takes an `AsyncGenerator` + * and a number `n` and returns an `AsyncGenerator` that yields the first `n` values of the input + * generator. + * + * @param gen - The input `AsyncGenerator`. + * @param n - The number of values to take from the input generator. + * @returns An `AsyncGenerator` that yields the first `n` values of the input generator. + * + * @example + * ```typescript + * async function* asyncNumbers() { + * yield* [1, 2, 3, 4, 5]; + * } + * + * (async () => { + * const firstThree = take(asyncNumbers(), 3); + * for await (const value of firstThree) { + * console.log(value); // Outputs 1, 2, 3 + * } + * })(); + * ``` + */ +export async function* until(gen: AsyncGenerator, predicate: (value: T) => boolean): AsyncGenerator { + for await (const value of gen) { + yield value + if (predicate(value)) { + break + } + } +} + diff --git a/tests/async.test.ts b/tests/async.test.ts new file mode 100644 index 0000000..857da3b --- /dev/null +++ b/tests/async.test.ts @@ -0,0 +1,426 @@ +import { describe, test, it, expect, beforeEach, jest } from 'bun:test' +import { merge, merge2, map, filter, fromEvent, fromEventHandler, until } from '~/async'; + +async function* asyncGenerator(array: T[]): AsyncGenerator { + for (const item of array) { + yield await new Promise(resolve => setTimeout(() => resolve(item), 10)); + } +} + +describe('merge', () => { + it('correctly merges values from multiple async iterators', async () => { + const iter1 = asyncGenerator([1, 3, 5]); + const iter2 = asyncGenerator([2, 4, 6]); + const merged = merge(iter1, iter2); + + const result = []; + for await (const value of merged) { + result.push(value); + } + + // Since execution order can vary, we check that all items are present and sorted + expect(result.sort()).toEqual([1, 2, 3, 4, 5, 6]); + }); + + it('handles empty iterators correctly', async () => { + const iter1 = asyncGenerator([]); + const iter2 = asyncGenerator([1, 2, 3]); + const merged = merge(iter1, iter2); + + const result = []; + for await (const value of merged) { + result.push(value); + } + + expect(result).toEqual([1, 2, 3]); + }); + + it('completes when all iterators are completed', async () => { + const iter1 = asyncGenerator([1]); + const iter2 = asyncGenerator([2]); + const merged = merge(iter1, iter2); + + const result = []; + for await (const value of merged) { + result.push(value); + } + + expect(result.length).toBe(2); + expect(result.includes(1)).toBeTruthy(); + expect(result.includes(2)).toBeTruthy(); + }); + + // + it('handles errors thrown by an iterator', async () => { + async function* errorGenerator() { + yield 1; + throw new Error('Test error'); + } + + const iter1 = asyncGenerator([1, 2]); + const iter2 = errorGenerator(); + const merged = merge(iter1, iter2); + + // Use an immediately invoked async function and pass the resulting promise to expect + await expect((async () => { + const result = []; + for await (const value of merged) { + result.push(value); + } + })()).rejects.toThrow('Test error'); + }); +}); + +// merge2 +describe('merge2', () => { + it('correctly merges values from two async iterators', async () => { + const iter1 = asyncGenerator([1, 3, 5]); + const iter2 = asyncGenerator([2, 4, 6]); + const merged = merge2(iter1, iter2); + + const result = []; + for await (const value of merged) { + result.push(value); + } + + // Since execution order can vary, we check that all items are present and sorted + expect(result.sort()).toEqual([1, 2, 3, 4, 5, 6]); + }); + + it('handles empty iterators correctly', async () => { + const iter1 = asyncGenerator([]); + const iter2 = asyncGenerator([1, 2, 3]); + const merged = merge2(iter1, iter2); + + const result = []; + for await (const value of merged) { + result.push(value); + } + + expect(result).toEqual([1, 2, 3]); + }); + + it('completes when both iterators are completed', async () => { + const iter1 = asyncGenerator([1]); + const iter2 = asyncGenerator([2]); + const merged = merge2(iter1, iter2); + + const result = []; + for await (const value of merged) { + result.push(value); + } + + expect(result.length).toBe(2); + expect(result.includes(1)).toBeTruthy(); + expect(result.includes(2)).toBeTruthy(); + }); + + it('handles errors thrown by an iterator', async () => { + async function* errorGenerator() { + yield 1; + throw new Error('Test error'); + } + + const iter1 = asyncGenerator([1, 2]); + const iter2 = errorGenerator(); + const merged = merge2(iter1, iter2); + + // Use an immediately invoked async function and pass the resulting promise to expect + await expect((async () => { + const result = []; + for await (const value of merged) { + result.push(value); + } + })()).rejects.toThrow('Test error'); + }); +}); + +// map +describe('map', () => { + it('correctly maps values from an async iterator', async () => { + const iter = asyncGenerator([1, 2, 3]); + const doubled = map(iter, x => x * 2); + + const result = []; + for await (const value of doubled) { + result.push(value); + } + + expect(result).toEqual([2, 4, 6]); + }); + + it('handles empty iterators correctly', async () => { + const iter = asyncGenerator([]); + const doubled = map(iter, x => x * 2); + + const result = []; + for await (const value of doubled) { + result.push(value); + } + + expect(result).toEqual([]); + }); + + it('handles errors thrown by an iterator', async () => { + async function* errorGenerator() { + yield 1; + throw new Error('Test error'); + } + + const iter = errorGenerator(); + const doubled = map(iter, x => x * 2); + + // Use an immediately invoked async function and pass the resulting promise to expect + await expect((async () => { + const result = []; + for await (const value of doubled) { + result.push(value); + } + })()).rejects.toThrow('Test error'); + }); +}); + +// filter +describe('filter', () => { + it('correctly filters values from an async iterator', async () => { + const iter = asyncGenerator([1, 2, 3, 4, 5]); + const evens = filter(iter, x => x % 2 === 0); + + const result = []; + for await (const value of evens) { + result.push(value); + } + + expect(result).toEqual([2, 4]); + }); + + it('handles empty iterators correctly', async () => { + const iter = asyncGenerator([]); + const evens = filter(iter, x => x % 2 === 0); + + const result = []; + for await (const value of evens) { + result.push(value); + } + + expect(result).toEqual([]); + }); + + it('handles errors thrown by an iterator', async () => { + async function* errorGenerator() { + yield 1; + throw new Error('Test error'); + } + + const iter = errorGenerator(); + const evens = filter(iter, x => x % 2 === 0); + + // Use an immediately invoked async function and pass the resulting promise to expect + await expect((async () => { + const result = []; + for await (const value of evens) { + result.push(value); + } + })()).rejects.toThrow('Test error'); + }); +}); + + +// fromEventHandler + +describe('fromEventHandler', () => { + // Mock event emitter to simulate on/off event handling + class MockEventEmitter { + private listeners = new Set<(data: any) => void>(); + + on(listener: (data: any) => void) { + this.listeners.add(listener); + } + + off(listener: (data: any) => void) { + this.listeners.delete(listener); + } + + emit(data: any) { + for (const listener of this.listeners) { + listener(data); + } + } + } + + let emitter: MockEventEmitter; + + beforeEach(() => { + emitter = new MockEventEmitter(); + }); + + it('captures and yields a single event', async () => { + const generator = fromEventHandler( + emitter.on.bind(emitter), + emitter.off.bind(emitter) + ); + + setTimeout(() => emitter.emit(1), 10); + + const result = await generator.next(); + expect(result.value).toBe(1); + expect(result.done).toBeFalsy(); + }); + + it('captures and yields multiple events in order', async () => { + const generator = fromEventHandler( + emitter.on.bind(emitter), + emitter.off.bind(emitter) + ); + + setTimeout(() => emitter.emit(1), 10); + setTimeout(() => emitter.emit(2), 20); + + const results = []; + results.push((await generator.next()).value); + results.push((await generator.next()).value); + + expect(results).toEqual([1, 2]); + }); + + it('removes event listeners when generator is closed', async () => { + const addSpy = jest.fn(); + const removeSpy = jest.fn(); + + let eventListener: (data: number) => void = () => { }; + + // Simulated event registration and removal, capturing the event listener + const on = (listener: (data: number) => void) => { + addSpy(); + eventListener = listener; + }; + const off = (listener: (data: number) => void) => { + removeSpy(); + }; + + const generator = fromEventHandler(on, off); + + // Simulate event emission to ensure 'on' function is triggered + setTimeout(() => eventListener(1), 10); + + // Start consuming the generator to activate the 'on' registration + await generator.next() + await generator.return(undefined) + + expect(addSpy).toHaveBeenCalledTimes(1); + expect(removeSpy).toHaveBeenCalledTimes(1); + }); +}); + +// fromEvent +describe('fromEvent', () => { + // Mock EventEmitter to simulate Node.js EventEmitter for testing purposes + class MockEventEmitter { + private listeners: Map void)[]> = new Map(); + + on(eventName: E, listener: (data: T) => void) { + const eventListeners = this.listeners.get(eventName) || []; + eventListeners.push(listener); + this.listeners.set(eventName, eventListeners); + } + + off(eventName: E, listener: (data: T) => void) { + const eventListeners = this.listeners.get(eventName) || []; + const index = eventListeners.indexOf(listener); + if (index !== -1) { + eventListeners.splice(index, 1); + } + this.listeners.set(eventName, eventListeners); + } + + emit(eventName: E, data: T) { + const eventListeners = this.listeners.get(eventName) || []; + eventListeners.forEach((listener) => listener(data)); + } + } + + it('captures and yields events from an EventEmitter', async () => { + const emitter = new MockEventEmitter(); + const generator = fromEvent(emitter, 'data'); + + // Emit some data asynchronously + setTimeout(() => emitter.emit('data', 1), 10); + setTimeout(() => emitter.emit('data', 2), 20); + + const results = []; + // Consume only the first two emitted events for this test + results.push((await generator.next()).value); + results.push((await generator.next()).value); + + expect(results).toEqual([1, 2]); + }); + + it('removes event listeners when the generator is closed', async () => { + const addSpy = jest.fn(); + const removeSpy = jest.fn(); + + const emitter = new MockEventEmitter(); + + // Wrap original `on` and `off` with spies + emitter.on = jest.fn(emitter.on); + emitter.off = jest.fn(emitter.off); + + const generator = fromEvent(emitter, 'data'); + + // Simulate event emission to ensure 'on' function is triggered + setTimeout(() => emitter.emit('data', 1), 10); + + await generator.next() + await generator.return(undefined) + + expect(emitter.on).toHaveBeenCalledTimes(1); + expect(emitter.off).toHaveBeenCalledTimes(1); + }); +}); + +// until +describe('until', () => { + it('yields all values if predicate never true', async () => { + const source = asyncGenerator([1, 2, 3]); + const results: number[] = []; + + for await (const value of until(source, (x) => x > 3)) { + results.push(value); + } + + expect(results).toEqual([1, 2, 3]); + }); + + it('stops yielding when predicate becomes true', async () => { + const source = asyncGenerator([1, 2, 3, 4, 5]); + const results: number[] = []; + + for await (const value of until(source, (x) => x >= 3)) { + results.push(value); + } + + expect(results).toEqual([1, 2, 3]); + }); + + it('yields nothing from an empty generator', async () => { + const source = asyncGenerator([]); + const results: number[] = []; + + for await (const value of until(source, (x) => x >= 3)) { + results.push(value); + } + + expect(results).toEqual([]); + }); + + it('stops immediately if predicate true for first value', async () => { + const source = asyncGenerator([3, 4, 5]); + const results: number[] = []; + + for await (const value of until(source, (x) => x === 3)) { + results.push(value); + } + + expect(results).toEqual([3]); + }); +}); \ No newline at end of file