Skip to content

Commit

Permalink
fix: internal listeners infinite retry loop (#284)
Browse files Browse the repository at this point in the history
* fix: internal listeners infinite retry loop

* edit `CHANGELOG`

* refactor: rename to `#internalEventListeners`

* refactor: simplify listener equality check

Co-authored-by: Mark Stacey <[email protected]>

* Revert "refactor: simplify listener equality check"

This reverts commit eb5514d.

* apply fix to `SubscribeBlockTracker`

* fix: reject `getLatestBlock` promise when block tracker stops

* fix: remove on error internal listener

* Suggestion to simplify prevention of dangling Promise on destroy (#286)

* Suggestion to simplify prevention of dangling Promise on destroy

The `fix/internal-listeners` branch has a number of changes intended to
ensure we don't have a dangling unresolved Promise when the block
tracker is destroyed. This solution involved adding an additional
listener to capture errors, and it involved not removing internal
listeners when `destroy` is called. This required changes to some
logging in `_updateAndQueue` as well.

This commit is an alternative solution that avoids the use of internal
listeners, thus avoiding much of the complexity in the previous
solution. Instead an internal deferred Promise is used. This also
might be slightly more efficient when `getLatestBlock` is called
repeatedly, as we can reuse the same listener rather than creating a
new one each time.

* Unset pending latest block after it has resolved

* fix: operand of a delete operator cannot be a private identifier

* test: change error message

* refactor: add helper methods

* refactor: simplify `SubscribeBlockTracker`

* Update CHANGELOG.md

Co-authored-by: Mark Stacey <[email protected]>

* fix: `SubscribeBlockTracker` throws when subscription fails

* test: remove broken case

* test: add case for returning the same promise

* test: add coverage for `PollingBlockTracker`

* test: add coverage for `SubscribeBlockTracker`

* test: remove redundant tests

* test: check promises return for `SubscribeBlockTracker`

* rename test case

Co-authored-by: Elliot Winkler <[email protected]>

* test: spy on `provider.request`

* edit changelog entry

* test: set automatic timer calls to 2

---------

Co-authored-by: Mark Stacey <[email protected]>
Co-authored-by: Elliot Winkler <[email protected]>
  • Loading branch information
3 people authored Dec 3, 2024
1 parent 985b19c commit b41e033
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 64 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- Avoid risk of infinite retry loops when fetching new blocks ([#284](https://github.com/MetaMask/eth-block-tracker/pull/284))
- When the provider returns an error and `PollingBlockTracker` or `SubscribeBlockTracker` is destroyed, the promise returned by the `getLatestBlock` method will be rejected.

## [11.0.2]
### Fixed
Expand Down
133 changes: 133 additions & 0 deletions src/PollingBlockTracker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,112 @@ describe('PollingBlockTracker', () => {
);
});

it('should return a promise that rejects if the request for the block number fails and the block tracker is then stopped', async () => {
recordCallsToSetTimeout({ numAutomaticCalls: 1 });

await withPollingBlockTracker(
{
provider: {
stubs: [
{
methodName: 'eth_blockNumber',
error: 'boom',
},
],
},
},
async ({ blockTracker }) => {
const latestBlockPromise = blockTracker.getLatestBlock();

expect(blockTracker.isRunning()).toBe(true);
await blockTracker.destroy();
await expect(latestBlockPromise).rejects.toThrow(
'Block tracker destroyed',
);
expect(blockTracker.isRunning()).toBe(false);
},
);
});

it('should not retry failed requests after the block tracker is stopped', async () => {
recordCallsToSetTimeout({ numAutomaticCalls: 2 });

await withPollingBlockTracker(
{
provider: {
stubs: [
{
methodName: 'eth_blockNumber',
error: 'boom',
},
],
},
},
async ({ blockTracker, provider }) => {
const requestSpy = jest.spyOn(provider, 'request');

const latestBlockPromise = blockTracker.getLatestBlock();
await blockTracker.destroy();

await expect(latestBlockPromise).rejects.toThrow(
'Block tracker destroyed',
);
expect(requestSpy).toHaveBeenCalledTimes(1);
expect(requestSpy).toHaveBeenCalledWith({
jsonrpc: '2.0',
id: expect.any(Number),
method: 'eth_blockNumber',
params: [],
});
},
);
});

it('should return a promise that resolves when a new block is available', async () => {
recordCallsToSetTimeout();

await withPollingBlockTracker(
{
provider: {
stubs: [
{
methodName: 'eth_blockNumber',
result: '0x1',
},
],
},
},
async ({ blockTracker }) => {
expect(await blockTracker.getLatestBlock()).toBe('0x1');
},
);
});

it('should resolve all returned promises when a new block is available', async () => {
recordCallsToSetTimeout();

await withPollingBlockTracker(
{
provider: {
stubs: [
{
methodName: 'eth_blockNumber',
result: '0x1',
},
],
},
},
async ({ blockTracker }) => {
const promises = [
blockTracker.getLatestBlock(),
blockTracker.getLatestBlock(),
];

expect(await Promise.all(promises)).toStrictEqual(['0x1', '0x1']);
},
);
});

it('request the latest block number with `skipCache: true` if the block tracker was initialized with `setSkipCacheFlag: true`', async () => {
recordCallsToSetTimeout();

Expand Down Expand Up @@ -575,6 +681,33 @@ describe('PollingBlockTracker', () => {
});
});

it('should return the same promise if called multiple times', async () => {
await withPollingBlockTracker(
{
provider: {
stubs: [
{
methodName: 'eth_blockNumber',
result: '0x0',
},
{
methodName: 'eth_blockNumber',
result: '0x1',
},
],
},
},
async ({ blockTracker }) => {
const promiseToCheckLatestBlock1 = blockTracker.checkForLatestBlock();
const promiseToCheckLatestBlock2 = blockTracker.checkForLatestBlock();

expect(promiseToCheckLatestBlock1).toStrictEqual(
promiseToCheckLatestBlock2,
);
},
);
});

it('should fetch the latest block number', async () => {
recordCallsToSetTimeout();

Expand Down
67 changes: 56 additions & 11 deletions src/PollingBlockTracker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import type { SafeEventEmitterProvider } from '@metamask/eth-json-rpc-provider';
import SafeEventEmitter from '@metamask/safe-event-emitter';
import { getErrorMessage, type JsonRpcRequest } from '@metamask/utils';
import {
createDeferredPromise,
type DeferredPromise,
getErrorMessage,
type JsonRpcRequest,
} from '@metamask/utils';
import getCreateRandomId from 'json-rpc-random-id';

import type { BlockTracker } from './BlockTracker';
Expand All @@ -10,8 +15,6 @@ const log = createModuleLogger(projectLogger, 'polling-block-tracker');
const createRandomId = getCreateRandomId();
const sec = 1000;

const calculateSum = (accumulator: number, currentValue: number) =>
accumulator + currentValue;
const blockTrackerEvents: (string | symbol)[] = ['sync', 'latest'];

export interface PollingBlockTrackerOptions {
Expand All @@ -28,6 +31,8 @@ interface ExtendedJsonRpcRequest extends JsonRpcRequest<[]> {
skipCache?: boolean;
}

type InternalListener = (value: string) => void;

export class PollingBlockTracker
extends SafeEventEmitter
implements BlockTracker
Expand All @@ -54,6 +59,10 @@ export class PollingBlockTracker

private readonly _setSkipCacheFlag: boolean;

readonly #internalEventListeners: InternalListener[] = [];

#pendingLatestBlock?: Omit<DeferredPromise<string>, 'resolve'>;

constructor(opts: PollingBlockTrackerOptions = {}) {
// parse + validate args
if (!opts.provider) {
Expand Down Expand Up @@ -90,6 +99,7 @@ export class PollingBlockTracker
this._cancelBlockResetTimeout();
this._maybeEnd();
super.removeAllListeners();
this.#rejectPendingLatestBlock(new Error('Block tracker destroyed'));
}

isRunning(): boolean {
Expand All @@ -104,13 +114,24 @@ export class PollingBlockTracker
// return if available
if (this._currentBlock) {
return this._currentBlock;
} else if (this.#pendingLatestBlock) {
return await this.#pendingLatestBlock.promise;
}

const { promise, resolve, reject } = createDeferredPromise<string>({
suppressUnhandledRejection: true,
});
this.#pendingLatestBlock = { reject, promise };

// wait for a new latest block
const latestBlock: string = await new Promise((resolve) =>
this.once('latest', resolve),
);
// return newly set current block
return latestBlock;
const onLatestBlock = (value: string) => {
this.#removeInternalListener(onLatestBlock);
resolve(value);
this.#pendingLatestBlock = undefined;
};
this.#addInternalListener(onLatestBlock);
this.once('latest', onLatestBlock);
return await promise;
}

// dont allow module consumer to remove our internal event listeners
Expand Down Expand Up @@ -179,9 +200,17 @@ export class PollingBlockTracker
}

private _getBlockTrackerEventCount(): number {
return blockTrackerEvents
.map((eventName) => this.listenerCount(eventName))
.reduce(calculateSum);
return (
blockTrackerEvents
.map((eventName) => this.listeners(eventName))
.flat()
// internal listeners are not included in the count
.filter((listener) =>
this.#internalEventListeners.every(
(internalListener) => !Object.is(internalListener, listener),
),
).length
);
}

private _shouldUseNewBlock(newBlock: string) {
Expand Down Expand Up @@ -333,6 +362,22 @@ export class PollingBlockTracker
this._pollingTimeout = undefined;
}
}

#addInternalListener(listener: InternalListener) {
this.#internalEventListeners.push(listener);
}

#removeInternalListener(listener: InternalListener) {
this.#internalEventListeners.splice(
this.#internalEventListeners.indexOf(listener),
1,
);
}

#rejectPendingLatestBlock(error: unknown) {
this.#pendingLatestBlock?.reject(error);
this.#pendingLatestBlock = undefined;
}
}

/**
Expand Down
Loading

0 comments on commit b41e033

Please sign in to comment.