diff --git a/README.md b/README.md index 18b4c3c..80011e6 100644 --- a/README.md +++ b/README.md @@ -372,6 +372,28 @@ mappedSignal.add(payload => { assert.deepEqual(received, ['cat!', 'dog!', 'frog!', 'sloth!']); ``` +#### Signal.reduce + +Signal.reduce provides the ability to aggregate payloads coming through a Signal, similar to reducing an array in JavaScript. + +```ts +import {Signal} from 'micro-signals'; +import * as assert from 'assert'; + +const signal = new Signal(); +const sumSignal = signal.reduce((total, current) => total + current, 0); + +const received: number[] = []; + +sumSignal.add(payload => { + received.push(payload); +}); + +[5, 10, 20, 100].forEach(x => signal.dispatch(x)); + +assert.deepEqual(received, [5, 15, 35, 135]); +``` + #### Signal.merge Signal.merge takes an arbitrary number of signals as constructor arguments and forward payloads from diff --git a/src/extended-signal.ts b/src/extended-signal.ts index 7dabe3f..25db43b 100644 --- a/src/extended-signal.ts +++ b/src/extended-signal.ts @@ -1,4 +1,4 @@ -import {BaseSignal, Cache, Listener, ReadableSignal} from './interfaces'; +import {Accumulator, BaseSignal, Cache, Listener, ReadableSignal} from './interfaces'; import { TagMap } from './tag-map'; export class ExtendedSignal implements ReadableSignal { @@ -103,6 +103,18 @@ export class ExtendedSignal implements ReadableSignal { listener => payload => listener(payload), ); } + public reduce(accumulator: Accumulator, initialValue: U): ReadableSignal { + return convertedListenerSignal( + this._baseSignal, + listener => (() => { + let accum = initialValue; + return (payload: T) => { + accum = accumulator(accum, payload); + listener(accum); + }; + })(), + ); + } public cache(cache: Cache): ReadableSignal { this._baseSignal.add(payload => cache.add(payload)); diff --git a/src/interfaces.ts b/src/interfaces.ts index ebfe572..00853bf 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -14,6 +14,7 @@ export interface ReadableSignal extends BaseSignal { promisify(rejectSignal?: ReadableSignal): Promise; readOnly(): ReadableSignal; cache(cache: Cache): ReadableSignal; + reduce(accumulator: Accumulator, initialValue: U): ReadableSignal; } export interface WritableSignal { @@ -28,3 +29,5 @@ export interface Cache { add(payload: T): void; forEach(callback: (payload: T) => void): void; } + +export type Accumulator = (accum: U, current: T) => any; diff --git a/test/signal.spec.ts b/test/signal.spec.ts index 2327622..0451055 100644 --- a/test/signal.spec.ts +++ b/test/signal.spec.ts @@ -10,6 +10,7 @@ import {mappedSuite} from './suites/mapped-suite'; import {mergedSuite} from './suites/merged-suite'; import {promisifySuite} from './suites/promisify-suite'; import {readOnlySuite} from './suites/read-only-suite'; +import {reducedSuite} from './suites/reduced-suite'; // TODO run the signal suite on the converted signals as well? @@ -48,6 +49,11 @@ readOnlySuite( signal => signal.readOnly(), ); +reducedSuite( + 'Signal#reduce', + (baseSignal, accumulator, initialValue) => baseSignal.reduce(accumulator, initialValue), +); + test('Signal listeners should received dispatched payloads', t => { const signal = new Signal(); diff --git a/test/suites/reduced-suite.ts b/test/suites/reduced-suite.ts new file mode 100644 index 0000000..a38ef05 --- /dev/null +++ b/test/suites/reduced-suite.ts @@ -0,0 +1,52 @@ +import test = require('tape'); +import {Accumulator, ReadableSignal, Signal} from '../../src'; +import {LeakDetectionSignal} from '../lib/leak-detection-signal'; +import {parentChildSuite} from './parent-child-suite'; + +export type ReducedSignalCreationFunction = ( + baseSignal: ReadableSignal, + accumulator: Accumulator, + initialValue: U, +) => ReadableSignal; + +export function reducedSuite(prefix: string, createReducedSignal: ReducedSignalCreationFunction) { + parentChildSuite(prefix, () => { + const parentSignal = new Signal(); + const childSignal = createReducedSignal(parentSignal, (_, payload) => payload, undefined); + return { parentSignal, childSignal }; + }); + + test(`${prefix} should dispatch with the accumulated payload`, t => { + const baseSignal = new Signal(); + + const reducedSignal = baseSignal.reduce((accum, curr) => accum + curr, 5); + + const addResults: number[] = []; + const addOnceResults: number[] = []; + + reducedSignal.add(x => addResults.push(x)); + reducedSignal.addOnce(x => addOnceResults.push(x)); + + baseSignal.dispatch(50); + baseSignal.dispatch(0); + baseSignal.dispatch(100); + + t.deepEqual(addResults, [55, 55, 155]); + t.deepEqual(addOnceResults, [55]); + + t.end(); + }); + + test('ReducedSignal should not leak', t => { + const signal = new LeakDetectionSignal(); + const mappedSignal = createReducedSignal(signal, () => true, false); + + const listener = () => { /* empty listener */ }; + mappedSignal.add(listener); + signal.dispatch(undefined); + mappedSignal.remove(listener); + + t.equal(signal.listenerCount, 0); + t.end(); + }); +}