From b41e033f6ba4ba745bf274de34ae4ee7d92cfa33 Mon Sep 17 00:00:00 2001 From: Michele Esposito <34438276+mikesposito@users.noreply.github.com> Date: Tue, 3 Dec 2024 19:26:53 +0100 Subject: [PATCH] fix: internal listeners infinite retry loop (#284) * fix: internal listeners infinite retry loop * edit `CHANGELOG` * refactor: rename to `#internalEventListeners` * refactor: simplify listener equality check Co-authored-by: Mark Stacey * Revert "refactor: simplify listener equality check" This reverts commit eb5514d510af7086873adf951962f4882d763747. * apply fix to `SubscribeBlockTracker` * fix: reject `getLatestBlock` promise when block tracker stops * fix: remove on error internal listener * Suggestion to simplify prevention of dangling Promise on destroy (#286) * Suggestion to simplify prevention of dangling Promise on destroy The `fix/internal-listeners` branch has a number of changes intended to ensure we don't have a dangling unresolved Promise when the block tracker is destroyed. This solution involved adding an additional listener to capture errors, and it involved not removing internal listeners when `destroy` is called. This required changes to some logging in `_updateAndQueue` as well. This commit is an alternative solution that avoids the use of internal listeners, thus avoiding much of the complexity in the previous solution. Instead an internal deferred Promise is used. This also might be slightly more efficient when `getLatestBlock` is called repeatedly, as we can reuse the same listener rather than creating a new one each time. * Unset pending latest block after it has resolved * fix: operand of a delete operator cannot be a private identifier * test: change error message * refactor: add helper methods * refactor: simplify `SubscribeBlockTracker` * Update CHANGELOG.md Co-authored-by: Mark Stacey * fix: `SubscribeBlockTracker` throws when subscription fails * test: remove broken case * test: add case for returning the same promise * test: add coverage for `PollingBlockTracker` * test: add coverage for `SubscribeBlockTracker` * test: remove redundant tests * test: check promises return for `SubscribeBlockTracker` * rename test case Co-authored-by: Elliot Winkler * test: spy on `provider.request` * edit changelog entry * test: set automatic timer calls to 2 --------- Co-authored-by: Mark Stacey Co-authored-by: Elliot Winkler --- CHANGELOG.md | 3 + src/PollingBlockTracker.test.ts | 133 ++++++++++++++++++++++++++++++ src/PollingBlockTracker.ts | 67 ++++++++++++--- src/SubscribeBlockTracker.test.ts | 110 ++++++++++++++---------- src/SubscribeBlockTracker.ts | 69 +++++++++++++--- 5 files changed, 318 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 366dbe4c..7b899c8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- Avoid risk of infinite retry loops when fetching new blocks ([#284](https://github.com/MetaMask/eth-block-tracker/pull/284)) + - When the provider returns an error and `PollingBlockTracker` or `SubscribeBlockTracker` is destroyed, the promise returned by the `getLatestBlock` method will be rejected. ## [11.0.2] ### Fixed diff --git a/src/PollingBlockTracker.test.ts b/src/PollingBlockTracker.test.ts index 7a598db2..d826bc60 100644 --- a/src/PollingBlockTracker.test.ts +++ b/src/PollingBlockTracker.test.ts @@ -185,6 +185,112 @@ describe('PollingBlockTracker', () => { ); }); + it('should return a promise that rejects if the request for the block number fails and the block tracker is then stopped', async () => { + recordCallsToSetTimeout({ numAutomaticCalls: 1 }); + + await withPollingBlockTracker( + { + provider: { + stubs: [ + { + methodName: 'eth_blockNumber', + error: 'boom', + }, + ], + }, + }, + async ({ blockTracker }) => { + const latestBlockPromise = blockTracker.getLatestBlock(); + + expect(blockTracker.isRunning()).toBe(true); + await blockTracker.destroy(); + await expect(latestBlockPromise).rejects.toThrow( + 'Block tracker destroyed', + ); + expect(blockTracker.isRunning()).toBe(false); + }, + ); + }); + + it('should not retry failed requests after the block tracker is stopped', async () => { + recordCallsToSetTimeout({ numAutomaticCalls: 2 }); + + await withPollingBlockTracker( + { + provider: { + stubs: [ + { + methodName: 'eth_blockNumber', + error: 'boom', + }, + ], + }, + }, + async ({ blockTracker, provider }) => { + const requestSpy = jest.spyOn(provider, 'request'); + + const latestBlockPromise = blockTracker.getLatestBlock(); + await blockTracker.destroy(); + + await expect(latestBlockPromise).rejects.toThrow( + 'Block tracker destroyed', + ); + expect(requestSpy).toHaveBeenCalledTimes(1); + expect(requestSpy).toHaveBeenCalledWith({ + jsonrpc: '2.0', + id: expect.any(Number), + method: 'eth_blockNumber', + params: [], + }); + }, + ); + }); + + it('should return a promise that resolves when a new block is available', async () => { + recordCallsToSetTimeout(); + + await withPollingBlockTracker( + { + provider: { + stubs: [ + { + methodName: 'eth_blockNumber', + result: '0x1', + }, + ], + }, + }, + async ({ blockTracker }) => { + expect(await blockTracker.getLatestBlock()).toBe('0x1'); + }, + ); + }); + + it('should resolve all returned promises when a new block is available', async () => { + recordCallsToSetTimeout(); + + await withPollingBlockTracker( + { + provider: { + stubs: [ + { + methodName: 'eth_blockNumber', + result: '0x1', + }, + ], + }, + }, + async ({ blockTracker }) => { + const promises = [ + blockTracker.getLatestBlock(), + blockTracker.getLatestBlock(), + ]; + + expect(await Promise.all(promises)).toStrictEqual(['0x1', '0x1']); + }, + ); + }); + it('request the latest block number with `skipCache: true` if the block tracker was initialized with `setSkipCacheFlag: true`', async () => { recordCallsToSetTimeout(); @@ -575,6 +681,33 @@ describe('PollingBlockTracker', () => { }); }); + it('should return the same promise if called multiple times', async () => { + await withPollingBlockTracker( + { + provider: { + stubs: [ + { + methodName: 'eth_blockNumber', + result: '0x0', + }, + { + methodName: 'eth_blockNumber', + result: '0x1', + }, + ], + }, + }, + async ({ blockTracker }) => { + const promiseToCheckLatestBlock1 = blockTracker.checkForLatestBlock(); + const promiseToCheckLatestBlock2 = blockTracker.checkForLatestBlock(); + + expect(promiseToCheckLatestBlock1).toStrictEqual( + promiseToCheckLatestBlock2, + ); + }, + ); + }); + it('should fetch the latest block number', async () => { recordCallsToSetTimeout(); diff --git a/src/PollingBlockTracker.ts b/src/PollingBlockTracker.ts index 8ab89b99..b0ad70d2 100644 --- a/src/PollingBlockTracker.ts +++ b/src/PollingBlockTracker.ts @@ -1,6 +1,11 @@ import type { SafeEventEmitterProvider } from '@metamask/eth-json-rpc-provider'; import SafeEventEmitter from '@metamask/safe-event-emitter'; -import { getErrorMessage, type JsonRpcRequest } from '@metamask/utils'; +import { + createDeferredPromise, + type DeferredPromise, + getErrorMessage, + type JsonRpcRequest, +} from '@metamask/utils'; import getCreateRandomId from 'json-rpc-random-id'; import type { BlockTracker } from './BlockTracker'; @@ -10,8 +15,6 @@ const log = createModuleLogger(projectLogger, 'polling-block-tracker'); const createRandomId = getCreateRandomId(); const sec = 1000; -const calculateSum = (accumulator: number, currentValue: number) => - accumulator + currentValue; const blockTrackerEvents: (string | symbol)[] = ['sync', 'latest']; export interface PollingBlockTrackerOptions { @@ -28,6 +31,8 @@ interface ExtendedJsonRpcRequest extends JsonRpcRequest<[]> { skipCache?: boolean; } +type InternalListener = (value: string) => void; + export class PollingBlockTracker extends SafeEventEmitter implements BlockTracker @@ -54,6 +59,10 @@ export class PollingBlockTracker private readonly _setSkipCacheFlag: boolean; + readonly #internalEventListeners: InternalListener[] = []; + + #pendingLatestBlock?: Omit, 'resolve'>; + constructor(opts: PollingBlockTrackerOptions = {}) { // parse + validate args if (!opts.provider) { @@ -90,6 +99,7 @@ export class PollingBlockTracker this._cancelBlockResetTimeout(); this._maybeEnd(); super.removeAllListeners(); + this.#rejectPendingLatestBlock(new Error('Block tracker destroyed')); } isRunning(): boolean { @@ -104,13 +114,24 @@ export class PollingBlockTracker // return if available if (this._currentBlock) { return this._currentBlock; + } else if (this.#pendingLatestBlock) { + return await this.#pendingLatestBlock.promise; } + + const { promise, resolve, reject } = createDeferredPromise({ + suppressUnhandledRejection: true, + }); + this.#pendingLatestBlock = { reject, promise }; + // wait for a new latest block - const latestBlock: string = await new Promise((resolve) => - this.once('latest', resolve), - ); - // return newly set current block - return latestBlock; + const onLatestBlock = (value: string) => { + this.#removeInternalListener(onLatestBlock); + resolve(value); + this.#pendingLatestBlock = undefined; + }; + this.#addInternalListener(onLatestBlock); + this.once('latest', onLatestBlock); + return await promise; } // dont allow module consumer to remove our internal event listeners @@ -179,9 +200,17 @@ export class PollingBlockTracker } private _getBlockTrackerEventCount(): number { - return blockTrackerEvents - .map((eventName) => this.listenerCount(eventName)) - .reduce(calculateSum); + return ( + blockTrackerEvents + .map((eventName) => this.listeners(eventName)) + .flat() + // internal listeners are not included in the count + .filter((listener) => + this.#internalEventListeners.every( + (internalListener) => !Object.is(internalListener, listener), + ), + ).length + ); } private _shouldUseNewBlock(newBlock: string) { @@ -333,6 +362,22 @@ export class PollingBlockTracker this._pollingTimeout = undefined; } } + + #addInternalListener(listener: InternalListener) { + this.#internalEventListeners.push(listener); + } + + #removeInternalListener(listener: InternalListener) { + this.#internalEventListeners.splice( + this.#internalEventListeners.indexOf(listener), + 1, + ); + } + + #rejectPendingLatestBlock(error: unknown) { + this.#pendingLatestBlock?.reject(error); + this.#pendingLatestBlock = undefined; + } } /** diff --git a/src/SubscribeBlockTracker.test.ts b/src/SubscribeBlockTracker.test.ts index 6d78f230..0fa51e40 100644 --- a/src/SubscribeBlockTracker.test.ts +++ b/src/SubscribeBlockTracker.test.ts @@ -151,6 +151,44 @@ describe('SubscribeBlockTracker', () => { }); }); + it('should resolve all returned promises when a new block is available', async () => { + recordCallsToSetTimeout(); + + await withSubscribeBlockTracker( + { + provider: { + stubs: [ + { + methodName: 'eth_blockNumber', + result: '0x1', + }, + ], + }, + }, + async ({ blockTracker }) => { + const promises = [ + blockTracker.getLatestBlock(), + blockTracker.getLatestBlock(), + ]; + + expect(await Promise.all(promises)).toStrictEqual(['0x1', '0x1']); + }, + ); + }); + + it('should reject the returned promise if the block tracker is destroyed in the meantime', async () => { + await withSubscribeBlockTracker(async ({ blockTracker }) => { + const promiseToGetLatestBlock = + blockTracker[methodToGetLatestBlock](); + await blockTracker.destroy(); + + await expect(promiseToGetLatestBlock).rejects.toThrow( + 'Block tracker destroyed', + ); + expect(blockTracker.isRunning()).toBe(false); + }); + }); + it('should fetch the latest block number', async () => { recordCallsToSetTimeout(); @@ -251,7 +289,7 @@ describe('SubscribeBlockTracker', () => { }); METHODS_TO_ADD_LISTENER.forEach((methodToAddListener) => { - it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should never resolve if, while making the request for the latest block number, the provider throws an Error`, async () => { + it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should reject if, while making the request for the latest block number, the provider throws an Error`, async () => { recordCallsToSetTimeout({ numAutomaticCalls: 1 }); const thrownError = new Error('boom'); @@ -269,21 +307,19 @@ describe('SubscribeBlockTracker', () => { }, }, async ({ blockTracker }) => { - const promiseForCaughtError = new Promise((resolve) => { - blockTracker[methodToAddListener]('error', resolve); - }); + const listener = jest.fn(); + blockTracker[methodToAddListener]('error', listener); const promiseForLatestBlock = blockTracker[methodToGetLatestBlock](); - const caughtError = await promiseForCaughtError; - expect(caughtError).toBe(thrownError); - await expect(promiseForLatestBlock).toNeverResolve(); + await expect(promiseForLatestBlock).rejects.toThrow(thrownError); + expect(listener).toHaveBeenCalledWith(thrownError); }, ); }); - it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should never resolve if, while making the request for the latest block number, the provider throws a string`, async () => { + it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should reject if, while making the request for the latest block number, the provider throws a string`, async () => { recordCallsToSetTimeout({ numAutomaticCalls: 1 }); const thrownString = 'boom'; @@ -301,21 +337,19 @@ describe('SubscribeBlockTracker', () => { }, }, async ({ blockTracker }) => { - const promiseForCaughtError = new Promise((resolve) => { - blockTracker[methodToAddListener]('error', resolve); - }); + const listener = jest.fn(); + blockTracker[methodToAddListener]('error', listener); const promiseForLatestBlock = blockTracker[methodToGetLatestBlock](); - const caughtError = await promiseForCaughtError; - expect(caughtError).toBe(thrownString); - await expect(promiseForLatestBlock).toNeverResolve(); + await expect(promiseForLatestBlock).rejects.toBe(thrownString); + expect(listener).toHaveBeenCalledWith(thrownString); }, ); }); - it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should never resolve if, while making the request for the latest block number, the provider rejects with an error`, async () => { + it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should reject if, while making the request for the latest block number, the provider rejects with an error`, async () => { recordCallsToSetTimeout({ numAutomaticCalls: 1 }); await withSubscribeBlockTracker( @@ -330,21 +364,19 @@ describe('SubscribeBlockTracker', () => { }, }, async ({ blockTracker }) => { - const promiseForCaughtError = new Promise((resolve) => { - blockTracker[methodToAddListener]('error', resolve); - }); + const listener = jest.fn(); + blockTracker[methodToAddListener]('error', listener); const promiseForLatestBlock = blockTracker[methodToGetLatestBlock](); - const caughtError = await promiseForCaughtError; - expect(caughtError.message).toBe('boom'); - await expect(promiseForLatestBlock).toNeverResolve(); + await expect(promiseForLatestBlock).rejects.toThrow('boom'); + expect(listener).toHaveBeenCalledWith(new Error('boom')); }, ); }); - it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should never resolve if, while making the request to subscribe, the provider throws an Error`, async () => { + it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should reject if, while making the request to subscribe, the provider throws an Error`, async () => { recordCallsToSetTimeout({ numAutomaticCalls: 1 }); const thrownError = new Error('boom'); @@ -362,21 +394,19 @@ describe('SubscribeBlockTracker', () => { }, }, async ({ blockTracker }) => { - const promiseForCaughtError = new Promise((resolve) => { - blockTracker[methodToAddListener]('error', resolve); - }); + const listener = jest.fn(); + blockTracker[methodToAddListener]('error', listener); const promiseForLatestBlock = blockTracker[methodToGetLatestBlock](); - const caughtError = await promiseForCaughtError; - expect(caughtError).toBe(thrownError); - await expect(promiseForLatestBlock).toNeverResolve(); + await expect(promiseForLatestBlock).rejects.toThrow(thrownError); + expect(listener).toHaveBeenCalledWith(thrownError); }, ); }); - it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should never resolve if, while making the request to subscribe, the provider throws a string`, async () => { + it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should reject if, while making the request to subscribe, the provider throws a string`, async () => { recordCallsToSetTimeout({ numAutomaticCalls: 1 }); const thrownString = 'boom'; @@ -394,21 +424,19 @@ describe('SubscribeBlockTracker', () => { }, }, async ({ blockTracker }) => { - const promiseForCaughtError = new Promise((resolve) => { - blockTracker[methodToAddListener]('error', resolve); - }); + const listener = jest.fn(); + blockTracker[methodToAddListener]('error', listener); const promiseForLatestBlock = blockTracker[methodToGetLatestBlock](); - const caughtError = await promiseForCaughtError; - expect(caughtError).toBe(thrownString); - await expect(promiseForLatestBlock).toNeverResolve(); + await expect(promiseForLatestBlock).rejects.toBe(thrownString); + expect(listener).toHaveBeenCalledWith(thrownString); }, ); }); - it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should never resolve if, while making the request to subscribe, the provider rejects with an error`, async () => { + it(`should emit the "error" event (added via \`${methodToAddListener}\`) and should reject if, while making the request to subscribe, the provider rejects with an error`, async () => { recordCallsToSetTimeout({ numAutomaticCalls: 1 }); await withSubscribeBlockTracker( @@ -423,16 +451,14 @@ describe('SubscribeBlockTracker', () => { }, }, async ({ blockTracker }) => { - const promiseForCaughtError = new Promise((resolve) => { - blockTracker[methodToAddListener]('error', resolve); - }); + const listener = jest.fn(); + blockTracker[methodToAddListener]('error', listener); const promiseForLatestBlock = blockTracker[methodToGetLatestBlock](); - const caughtError = await promiseForCaughtError; - expect(caughtError.message).toBe('boom'); - await expect(promiseForLatestBlock).toNeverResolve(); + await expect(promiseForLatestBlock).rejects.toThrow('boom'); + expect(listener).toHaveBeenCalledWith(new Error('boom')); }, ); }); diff --git a/src/SubscribeBlockTracker.ts b/src/SubscribeBlockTracker.ts index d81a8ea9..f3cc2241 100644 --- a/src/SubscribeBlockTracker.ts +++ b/src/SubscribeBlockTracker.ts @@ -1,6 +1,11 @@ import type { SafeEventEmitterProvider } from '@metamask/eth-json-rpc-provider'; import SafeEventEmitter from '@metamask/safe-event-emitter'; -import type { Json, JsonRpcNotification } from '@metamask/utils'; +import { + createDeferredPromise, + type DeferredPromise, + type Json, + type JsonRpcNotification, +} from '@metamask/utils'; import getCreateRandomId from 'json-rpc-random-id'; import type { BlockTracker } from './BlockTracker'; @@ -9,8 +14,6 @@ const createRandomId = getCreateRandomId(); const sec = 1000; -const calculateSum = (accumulator: number, currentValue: number) => - accumulator + currentValue; const blockTrackerEvents: (string | symbol)[] = ['sync', 'latest']; export interface SubscribeBlockTrackerOptions { @@ -25,6 +28,8 @@ interface SubscriptionNotificationParams { result: { number: string }; } +type InternalListener = (value: string) => void; + export class SubscribeBlockTracker extends SafeEventEmitter implements BlockTracker @@ -43,6 +48,10 @@ export class SubscribeBlockTracker private _subscriptionId: string | null; + readonly #internalEventListeners: InternalListener[] = []; + + #pendingLatestBlock?: Omit, 'resolve'>; + constructor(opts: SubscribeBlockTrackerOptions = {}) { // parse + validate args if (!opts.provider) { @@ -75,6 +84,7 @@ export class SubscribeBlockTracker this._cancelBlockResetTimeout(); await this._maybeEnd(); super.removeAllListeners(); + this.#rejectPendingLatestBlock(new Error('Block tracker destroyed')); } isRunning(): boolean { @@ -89,13 +99,24 @@ export class SubscribeBlockTracker // return if available if (this._currentBlock) { return this._currentBlock; + } else if (this.#pendingLatestBlock) { + return await this.#pendingLatestBlock.promise; } + + const { resolve, reject, promise } = createDeferredPromise({ + suppressUnhandledRejection: true, + }); + this.#pendingLatestBlock = { reject, promise }; + // wait for a new latest block - const latestBlock: string = await new Promise((resolve) => - this.once('latest', resolve), - ); - // return newly set current block - return latestBlock; + const onLatestBlock = (value: string) => { + this.#removeInternalListener(onLatestBlock); + resolve(value); + this.#pendingLatestBlock = undefined; + }; + this.#addInternalListener(onLatestBlock); + this.once('latest', onLatestBlock); + return await promise; } // dont allow module consumer to remove our internal event listeners @@ -162,9 +183,17 @@ export class SubscribeBlockTracker } private _getBlockTrackerEventCount(): number { - return blockTrackerEvents - .map((eventName) => this.listenerCount(eventName)) - .reduce(calculateSum); + return ( + blockTrackerEvents + .map((eventName) => this.listeners(eventName)) + .flat() + // internal listeners are not included in the count + .filter((listener) => + this.#internalEventListeners.every( + (internalListener) => !Object.is(internalListener, listener), + ), + ).length + ); } private _shouldUseNewBlock(newBlock: string) { @@ -236,6 +265,7 @@ export class SubscribeBlockTracker this._newPotentialLatest(blockNumber); } catch (e) { this.emit('error', e); + this.#rejectPendingLatestBlock(e); } } } @@ -247,6 +277,7 @@ export class SubscribeBlockTracker this._subscriptionId = null; } catch (e) { this.emit('error', e); + this.#rejectPendingLatestBlock(e); } } } @@ -271,6 +302,22 @@ export class SubscribeBlockTracker this._newPotentialLatest(response.params.result.number); } } + + #addInternalListener(listener: InternalListener) { + this.#internalEventListeners.push(listener); + } + + #removeInternalListener(listener: InternalListener) { + this.#internalEventListeners.splice( + this.#internalEventListeners.indexOf(listener), + 1, + ); + } + + #rejectPendingLatestBlock(error: unknown) { + this.#pendingLatestBlock?.reject(error); + this.#pendingLatestBlock = undefined; + } } /**