Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle failed api inits #1970

Merged
merged 14 commits into from
Sep 6, 2023
1 change: 1 addition & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Update apollo-links to 1.0.2, fix dictionary resolver failed to get token issue
- Use test runs as unit for tests instead of entity checks (#1957)
- handle APIs in connection pool whose initialization failed (#1970)
- Fix generated operation hash with single entity, buffer did not get hashed issue.
- Infinite recursion in setValueModel with arrays (#1993)

Expand Down
123 changes: 121 additions & 2 deletions packages/node-core/src/api.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {EventEmitter2} from '@nestjs/event-emitter';
import {ProjectNetworkConfig} from '@subql/common';
import {ApiConnectionError, ApiErrorType} from './api.connection.error';
import {NetworkMetadataPayload} from './events';
import {IndexerEvent, NetworkMetadataPayload} from './events';
import {ConnectionPoolService} from './indexer';
import {getLogger} from './logger';
import {raceFulfilled, retryWithBackoff} from './utils';

const logger = getLogger('api');

Expand All @@ -24,10 +27,15 @@ export interface IApiConnectionSpecific<A = any, SA = any, B extends Array<any>
}

export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]> implements IApi<A, SA, B> {
constructor(protected connectionPoolService: ConnectionPoolService<IApiConnectionSpecific<A, SA, B>>) {}
constructor(
protected connectionPoolService: ConnectionPoolService<IApiConnectionSpecific<A, SA, B>>,
protected eventEmitter: EventEmitter2
) {}

abstract networkMeta: NetworkMetadataPayload;

private timeouts: Record<string, NodeJS.Timeout | undefined> = {};

async fetchBlocks(heights: number[], numAttempts = MAX_RECONNECT_ATTEMPTS): Promise<B> {
let reconnectAttempts = 0;
while (reconnectAttempts < numAttempts) {
Expand Down Expand Up @@ -58,6 +66,117 @@ export abstract class ApiService<A = any, SA = any, B extends Array<any> = any[]
return apiInstance.unsafeApi;
}

async createConnections(
network: ProjectNetworkConfig & {chainId: string},
createConnection: (endpoint: string) => Promise<IApiConnectionSpecific<A, SA, B>>,
getChainId: (connection: IApiConnectionSpecific) => Promise<string>,
postConnectedHook?: (connection: IApiConnectionSpecific, endpoint: string, index: number) => void
): Promise<void> {
const endpointToApiIndex: Record<string, IApiConnectionSpecific<A, SA, B>> = {};

const failedConnections: Map<number, string> = new Map();

const connectionPromises = (network.endpoint as string[]).map(async (endpoint, i) => {
try {
const connection = await createConnection(endpoint);
this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 1,
apiIndex: i,
endpoint: endpoint,
});

if (postConnectedHook) {
postConnectedHook(connection, endpoint, i);
}

if (!this.networkMeta) {
this.networkMeta = connection.networkMeta;
}

const chainId = await getChainId(connection);

if (network.chainId !== chainId) {
throw this.metadataMismatchError('ChainId', network.chainId, chainId);
}

this.connectionPoolService.addToConnections(connection, endpoint);
} catch (e) {
logger.error(`Failed to init ${endpoint}: ${e}`);
endpointToApiIndex[endpoint] = null as unknown as IApiConnectionSpecific<A, SA, B>;
failedConnections.set(i, endpoint);
throw e;
}
});

try {
const {fulfilledIndex, result} = (await raceFulfilled(connectionPromises)) as {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there a cast needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to rule out undefined

result: void;
fulfilledIndex: number;
};
connectionPromises.splice(fulfilledIndex, 1);
} catch (e) {
throw new Error('All endpoints failed to initialize. Please add healthier endpoints');
}

Promise.allSettled(connectionPromises).then((res) => {
// Retry failed connections in the background
for (const [index, endpoint] of failedConnections) {
this.retryConnection(createConnection, getChainId, network, index, endpoint, postConnectedHook);
}
});
}

async performConnection(
createConnection: (endpoint: string) => Promise<IApiConnectionSpecific<A, SA, B>>,
getChainId: (connection: IApiConnectionSpecific) => Promise<string>,
network: ProjectNetworkConfig & {chainId: string},
index: number,
endpoint: string,
postConnectedHook?: (connection: IApiConnectionSpecific, endpoint: string, index: number) => void
) {
const connection = await createConnection(endpoint);
const chainId = await getChainId(connection);

if (postConnectedHook) {
postConnectedHook(connection, endpoint, index);
}

if (network.chainId === chainId) {
// Replace null connection with the new connection
await this.connectionPoolService.updateConnection(connection, endpoint);
logger.info(`Updated connection for ${endpoint}`);
} else {
throw this.metadataMismatchError('ChainId', network.chainId, chainId);
}
}

retryConnection(
createConnection: (endpoint: string) => Promise<IApiConnectionSpecific<A, SA, B>>,
getChainId: (connection: IApiConnectionSpecific) => Promise<string>,
network: ProjectNetworkConfig & {chainId: string},
index: number,
endpoint: string,
postConnectedHook?: (connection: IApiConnectionSpecific, endpoint: string, index: number) => void
): void {
this.timeouts[endpoint] = retryWithBackoff(
() => this.performConnection(createConnection, getChainId, network, index, endpoint, postConnectedHook),
(error) => {
logger.error(`Initialization retry failed for ${endpoint}: ${error}`);
},
() => {
logger.error(`Initialization retry attempts exhausted for ${endpoint}`);
}
);
}

protected metadataMismatchError(metadata: string, expected: string, actual: string): Error {
return Error(
`Value of ${metadata} does not match across all endpoints\n
Expected: ${expected}
Actual: ${actual}`
);
}

handleError(error: Error): ApiConnectionError {
return new ApiConnectionError(error.name, error.message, ApiErrorType.Default);
}
Expand Down
14 changes: 7 additions & 7 deletions packages/node-core/src/indexer/connectionPool.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {delay} from '@subql/common';
import {ApiErrorType, ConnectionPoolStateManager, IApiConnectionSpecific, NodeConfig} from '..';
import {ConnectionPoolService} from './connectionPool.service';

async function waitFor(conditionFn: () => boolean, timeout = 30000, interval = 100): Promise<void> {
async function waitFor(conditionFn: () => boolean, timeout = 50000, interval = 100): Promise<void> {
return new Promise<void>((resolve, reject) => {
const startTime = Date.now();
const checkCondition = () => {
Expand Down Expand Up @@ -58,7 +58,7 @@ describe('ConnectionPoolService', () => {
it('should handle successful reconnection on the first attempt', async () => {
(mockApiConnection.apiConnect as any).mockImplementation(() => Promise.resolve());

(connectionPoolService as any).handleApiDisconnects(0);
(connectionPoolService as any).handleApiDisconnects('https://example.com/api');

await waitFor(() => (mockApiConnection.apiConnect as any).mock.calls.length === 1);
expect(mockApiConnection.apiConnect).toHaveBeenCalledTimes(1);
Expand All @@ -71,7 +71,7 @@ describe('ConnectionPoolService', () => {
.mockImplementationOnce(() => Promise.reject(new Error('Reconnection failed')))
.mockImplementation(() => Promise.resolve());

(connectionPoolService as any).handleApiDisconnects(0);
(connectionPoolService as any).handleApiDisconnects('https://example.com/api');

await waitFor(() => (mockApiConnection.apiConnect as any).mock.calls.length === 3);
expect(mockApiConnection.apiConnect).toHaveBeenCalledTimes(3);
Expand All @@ -81,12 +81,12 @@ describe('ConnectionPoolService', () => {
it('should handle failed reconnection after max attempts', async () => {
(mockApiConnection.apiConnect as any).mockImplementation(() => Promise.reject(new Error('Reconnection failed')));

(connectionPoolService as any).handleApiDisconnects(0);
(connectionPoolService as any).handleApiDisconnects('https://example.com/api');

await waitFor(() => (mockApiConnection.apiConnect as any).mock.calls.length === 5);
expect(mockApiConnection.apiConnect).toHaveBeenCalledTimes(5);
expect(connectionPoolService.numConnections).toBe(0);
}, 31000);
}, 50000);
});

describe('handleApiDisconnects', () => {
Expand All @@ -99,12 +99,12 @@ describe('ConnectionPoolService', () => {
const handleApiDisconnectsSpy = jest.spyOn(connectionPoolService as any, 'handleApiDisconnects');

// Trigger handleApiError with connection issue twice
connectionPoolService.handleApiError(0, {
connectionPoolService.handleApiError('https://example.com/api', {
name: 'ConnectionError',
errorType: ApiErrorType.Connection,
message: 'Connection error',
});
connectionPoolService.handleApiError(0, {
connectionPoolService.handleApiError('https://example.com/api', {
name: 'ConnectionError',
errorType: ApiErrorType.Connection,
message: 'Connection error',
Expand Down
Loading
Loading