diff --git a/src/index.js b/src/index.js index b44f75b..7e79ddf 100644 --- a/src/index.js +++ b/src/index.js @@ -11,7 +11,7 @@ function callbackToAsyncIterator( listener: ((arg: CallbackInput) => any) => Promise, options?: { onError?: (err: Error) => void, - onClose?: (arg?: ?ReturnVal) => void, + onClose?: (arg?: ?ReturnVal) => Promise | void, buffering?: boolean, } = {} ) { @@ -21,10 +21,14 @@ function callbackToAsyncIterator( let pushQueue = []; let listening = true; let listenerReturnValue; + let listenerReturnedValue = false; + let closingWaitingOnListenerReturnValue = false; // Start listener listener(value => pushValue(value)) .then(a => { listenerReturnValue = a; + listenerReturnedValue = true; + if (closingWaitingOnListenerReturnValue) emptyQueue(); }) .catch(err => { onError(err); @@ -49,12 +53,23 @@ function callbackToAsyncIterator( } function emptyQueue() { + if (onClose && !listenerReturnedValue) { + closingWaitingOnListenerReturnValue = true; + return; + } if (listening) { listening = false; pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); pullQueue = []; pushQueue = []; - onClose && onClose(listenerReturnValue); + if (onClose) { + try { + const closeRet = onClose(listenerReturnValue); + if (closeRet) closeRet.catch(e => onError(e)); + } catch (e) { + onError(e); + } + } } } @@ -66,7 +81,7 @@ function callbackToAsyncIterator( emptyQueue(); return Promise.resolve({ value: undefined, done: true }); }, - throw(error) { + throw(error: Error) { emptyQueue(); onError(error); return Promise.reject(error); @@ -84,7 +99,7 @@ function callbackToAsyncIterator( return() { return Promise.reject(err); }, - throw(error) { + throw(error: Error) { return Promise.reject(error); }, [$$asyncIterator]() { diff --git a/src/test/index.test.js b/src/test/index.test.js index dc07222..fd19f6e 100644 --- a/src/test/index.test.js +++ b/src/test/index.test.js @@ -95,6 +95,38 @@ describe('options', () => { }); }); + it('should call onError with an error thrown by a non async onClose', async () => { + const error = new Error('Bla bla'); + const listener = (cb: () => void) => Promise.resolve(); + + expect.assertions(1); + const iter = asyncify(listener, { + onClose: () => { + throw error; + }, + onError: err => { + expect(err).toEqual(error); + }, + }); + await iter.return(); + }); + + it('should call onError with an error thrown by an async onClose', async () => { + const error = new Error('Bla bla'); + const listener = (cb: () => void) => Promise.resolve(); + + expect.assertions(1); + const iter = asyncify(listener, { + onClose: async () => { + throw error; + }, + onError: err => { + expect(err).toEqual(error); + }, + }); + await iter.return(); + }); + it('should call onClose with the return value from the listener', async () => { const returnValue = 'asdf'; const listener = (cb: () => void) => @@ -113,6 +145,24 @@ describe('options', () => { await iter.return(); }); + it('should call onClose with the return value from an listener only after the promise resolves', async () => { + const returnValue = 'asdf'; + const listener = (cb: () => void) => + new Promise(res => { + res(returnValue); + }); + + expect.hasAssertions(); + const iter = asyncify(listener, { + onClose: val => { + expect(val).toEqual(returnValue); + }, + }); + // Wait a tick so that the promise resolves with the return value + iter.return(); + await new Promise(res => setTimeout(res, 10)); + }); + describe('buffering', () => { it('should not buffer incoming values if disabled', async () => { const listener = (cb: (arg: number) => void) =>