Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle failed api inits #1970

Merged
merged 14 commits into from
Sep 6, 2023
1 change: 1 addition & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Fixed
- Use test runs as unit for tests instead of entity checks (#1957)
- handle APIs in connection pool whose initialization failed (#1970)

## [4.2.3] - 2023-08-17
### Fixed
Expand Down
7 changes: 6 additions & 1 deletion packages/node-core/src/indexer/connectionPool.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ export class ConnectionPoolService<T extends IApiConnectionSpecific<any, any, an
async addToConnections(api: T, endpoint: string): Promise<void> {
const index = this.allApi.length;
this.allApi.push(api);
await this.poolStateManager.addToConnections(endpoint, index, endpoint === this.nodeConfig.primaryNetworkEndpoint);
await this.poolStateManager.addToConnections(
endpoint,
index,
endpoint === this.nodeConfig.primaryNetworkEndpoint,
api === null
);
this.apiToIndexMap.set(api, index);
await this.updateNextConnectedApiIndex();
}
Expand Down
31 changes: 25 additions & 6 deletions packages/node-core/src/indexer/connectionPoolState.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ export interface ConnectionPoolItem<T> {
failed: boolean;
lastRequestTime: number;
connected: boolean;
initFailed: boolean;
timeoutId?: NodeJS.Timeout;
}

const logger = getLogger('connection-pool-state');

export interface IConnectionPoolStateManager<T extends IApiConnectionSpecific<any, any, any>> {
addToConnections(endpoint: string, index: number, primary: boolean): Promise<void>;
addToConnections(endpoint: string, index: number, primary: boolean, initFailed: boolean): Promise<void>;
getNextConnectedApiIndex(): Promise<number | undefined>;
// Async to be compatible with workers
getFieldValue<K extends keyof ConnectionPoolItem<T>>(apiIndex: number, field: K): Promise<ConnectionPoolItem<T>[K]>;
Expand All @@ -53,17 +54,23 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
private pool: Record<number, ConnectionPoolItem<T>> = {};

//eslint-disable-next-line @typescript-eslint/require-await
async addToConnections(endpoint: string, index: number, primary: boolean): Promise<void> {
async addToConnections(endpoint: string, index: number, primary: boolean, initFailed: boolean): Promise<void> {
//avoid overwriting state if init failed in one of the workers
if (this.pool[index] && this.pool[index].initFailed) {
return;
}

const poolItem: ConnectionPoolItem<T> = {
primary: primary,
performanceScore: 100,
failureCount: 0,
endpoint: endpoint,
backoffDelay: 0,
rateLimited: false,
failed: false,
failed: initFailed,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add initFailed can't we use this?

connected: true,
lastRequestTime: 0,
initFailed: initFailed,
};
this.pool[index] = poolItem;

Expand All @@ -79,9 +86,15 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
return primaryIndex;
}

const indices = Object.keys(this.pool)
const initedIndices = Object.keys(this.pool)
.map(Number)
.filter((index) => !this.pool[index].backoffDelay && this.pool[index].connected);
.filter((index) => !this.pool[index].initFailed);

if (initedIndices.length === 0) {
throw new Error(`Initialization failed for all endpoints. Please add healthier endpoints.`);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like the right place to throw this error. This isn't an init function

}

const indices = initedIndices.filter((index) => !this.pool[index].backoffDelay && this.pool[index].connected);

if (indices.length === 0) {
// If all endpoints are suspended, try to find a rate-limited one
Expand Down Expand Up @@ -124,7 +137,13 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
private getPrimaryEndpointIndex(): number | undefined {
return Object.keys(this.pool)
.map(Number)
.find((index) => this.pool[index].primary && !this.pool[index].backoffDelay && this.pool[index].connected);
.find(
(index) =>
this.pool[index].primary &&
!this.pool[index].backoffDelay &&
this.pool[index].connected &&
!this.pool[index].initFailed
);
}

get numConnections(): number {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {ConnectionPoolStateManager} from '../connectionPoolState.manager';

export type HostConnectionPoolState<T> = {
hostGetNextConnectedApiIndex: () => Promise<number | undefined>;
hostAddToConnections: (endpoint: string, index: number, primary: boolean) => Promise<void>;
hostAddToConnections: (endpoint: string, index: number, primary: boolean, initFailed: boolean) => Promise<void>;
hostGetFieldFromConnectionPoolItem: <K extends keyof ConnectionPoolItem<T>>(
index: number,
field: K
Expand Down Expand Up @@ -63,8 +63,8 @@ export class WorkerConnectionPoolStateManager<T extends IApiConnectionSpecific>
return this.host.hostGetNextConnectedApiIndex();
}

async addToConnections(endpoint: string, index: number, primary = false): Promise<void> {
return this.host.hostAddToConnections(endpoint, index, primary);
async addToConnections(endpoint: string, index: number, primary = false, initFailed = false): Promise<void> {
return this.host.hostAddToConnections(endpoint, index, primary, initFailed);
}

async getFieldValue<K extends keyof ConnectionPoolItem<T>>(
Expand Down
2 changes: 2 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- fix crashes when intialization fails for one of the endpoint (#1970)

## [2.12.2] - 2023-08-17
### Fixed
Expand Down
105 changes: 55 additions & 50 deletions packages/node/src/indexer/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,66 +96,71 @@ export class ApiService
}

for await (const [i, endpoint] of network.endpoint.entries()) {
const connection = await ApiPromiseConnection.create(
endpoint,
this.fetchBlocksBatches,
{
chainTypes,
},
);

const api = connection.unsafeApi;
try {
const connection = await ApiPromiseConnection.create(
endpoint,
this.fetchBlocksBatches,
{
chainTypes,
},
);

this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 1,
apiIndex: i,
endpoint: endpoint,
});
const api = connection.unsafeApi;

api.on('connected', () => {
this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 1,
apiIndex: i,
endpoint: endpoint,
});
});
api.on('disconnected', () => {
this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 0,
apiIndex: i,
endpoint: endpoint,

api.on('connected', () => {
this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 1,
apiIndex: i,
endpoint: endpoint,
});
});
});

if (!this.networkMeta) {
this.networkMeta = connection.networkMeta;

if (
network.chainId &&
network.chainId !== this.networkMeta.genesisHash
) {
const err = new Error(
`Network chainId doesn't match expected genesisHash. Your SubQuery project is expecting to index data from "${
network.chainId ?? network.genesisHash
}", however the endpoint that you are connecting to is different("${
this.networkMeta.genesisHash
}). Please check that the RPC endpoint is actually for your desired network or update the genesisHash.`,
);
logger.error(err, err.message);
throw err;
}
} else {
const genesisHash = api.genesisHash.toString();
if (this.networkMeta.genesisHash !== genesisHash) {
throw this.metadataMismatchError(
'Genesis Hash',
this.networkMeta.genesisHash,
genesisHash,
);
api.on('disconnected', () => {
this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 0,
apiIndex: i,
endpoint: endpoint,
});
});

if (!this.networkMeta) {
this.networkMeta = connection.networkMeta;

if (
network.chainId &&
network.chainId !== this.networkMeta.genesisHash
) {
const err = new Error(
`Network chainId doesn't match expected genesisHash. Your SubQuery project is expecting to index data from "${
network.chainId ?? network.genesisHash
}", however the endpoint that you are connecting to is different("${
this.networkMeta.genesisHash
}). Please check that the RPC endpoint is actually for your desired network or update the genesisHash.`,
);
logger.error(err, err.message);
throw err;
}
} else {
const genesisHash = api.genesisHash.toString();
if (this.networkMeta.genesisHash !== genesisHash) {
throw this.metadataMismatchError(
'Genesis Hash',
this.networkMeta.genesisHash,
genesisHash,
);
}
}
}

endpointToApiIndex[endpoint] = connection;
endpointToApiIndex[endpoint] = connection;
} catch (e) {
logger.error(`failed to init ${endpoint}: ${e}`);
endpointToApiIndex[endpoint] = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move some of this to node core? Its pretty much the same on all chains

}
}

await this.connectionPoolService.addBatchToConnections(endpointToApiIndex);
Expand Down