Skip to content

Commit

Permalink
Sync with node-core 20230728 (#138)
Browse files Browse the repository at this point in the history
* Sync with node-core 20230728

* Update packages/node/src/ethereum/api.service.ethereum.ts

Co-authored-by: Naveen V <[email protected]>

* use prerelease

* fix version

* Update packages/node/CHANGELOG.md

Co-authored-by: Scott Twiname <[email protected]>

---------

Co-authored-by: Naveen V <[email protected]>
Co-authored-by: Scott Twiname <[email protected]>
  • Loading branch information
3 people authored Jul 31, 2023
1 parent 2a905ec commit de429be
Show file tree
Hide file tree
Showing 16 changed files with 195 additions and 134 deletions.
2 changes: 2 additions & 0 deletions packages/common-ethereum/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed
- Sync witb `@subql/common`, fix iPFS repeat cat same cid

## [2.2.0] - 2023-06-27
### Changed
Expand Down
2 changes: 1 addition & 1 deletion packages/common-ethereum/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"main": "dist/index.js",
"license": "GPL-3.0",
"dependencies": {
"@subql/common": "^2.3.0",
"@subql/common": "^2.4.0",
"@subql/types-ethereum": "workspace:*",
"js-yaml": "^4.1.0",
"pino": "^6.13.3",
Expand Down
5 changes: 5 additions & 0 deletions packages/node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `!null` filter for logs (#135)

### Updated
- Update license to GPL-3.0 (#137)
- Updated retry logic for eth requests (#134)
- Adjust batch size for `JsonRpcBatchProvider` dynamically (#121)
- Sync with node-core :
- init db schema manually during test run
- fix retry logic for workers in connection pool
- Performance scoring fix

### Fixed
- Fixed missing mmrQueryService in indexer module
Expand Down
4 changes: 2 additions & 2 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
"@nestjs/event-emitter": "^2.0.0",
"@nestjs/platform-express": "^9.4.0",
"@nestjs/schedule": "^3.0.1",
"@subql/common": "^2.3.0",
"@subql/common": "^2.4.0",
"@subql/common-ethereum": "workspace:*",
"@subql/node-core": "^3.1.2",
"@subql/node-core": "^4.0.1",
"@subql/testing": "^2.0.0",
"@subql/types": "^2.1.2",
"@subql/types-ethereum": "workspace:*",
Expand Down
9 changes: 8 additions & 1 deletion packages/node/src/configure/configure.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ function warnDeprecations() {
@Global()
@Module({})
export class ConfigureModule {
static async register(): Promise<DynamicModule> {
static async getInstance(): Promise<{
config: NodeConfig;
project: () => Promise<SubqueryProject>;
}> {
const { argv } = yargsOptions;
let config: NodeConfig;
let rawManifest: unknown;
Expand Down Expand Up @@ -130,6 +133,10 @@ export class ConfigureModule {
});
return p;
};
return { config, project };
}
static async register(): Promise<DynamicModule> {
const { config, project } = await ConfigureModule.getInstance();

return {
module: ConfigureModule,
Expand Down
48 changes: 9 additions & 39 deletions packages/node/src/ethereum/api.connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import { EventEmitter2 } from '@nestjs/event-emitter';
import {
ApiConnectionError,
ApiErrorType,
IApiConnectionSpecific,
DisconnectionError,
LargeResponseError,
NetworkMetadataPayload,
RateLimitError,
TimeoutError,
IApiConnectionSpecific,
} from '@subql/node-core';
import { EthereumBlockWrapper } from '@subql/types-ethereum';
import { EthereumApi } from './api.ethereum';
Expand Down Expand Up @@ -68,13 +72,13 @@ export class EthereumApiConnection
static handleError(e: Error): ApiConnectionError {
let formatted_error: ApiConnectionError;
if (e.message.startsWith(`No response received from RPC endpoint in`)) {
formatted_error = EthereumApiConnection.handleTimeoutError(e);
formatted_error = new TimeoutError(e);
} else if (e.message.startsWith(`disconnected from `)) {
formatted_error = EthereumApiConnection.handleDisconnectionError(e);
formatted_error = new DisconnectionError(e);
} else if (e.message.startsWith(`Rate Limited at endpoint`)) {
formatted_error = EthereumApiConnection.handleRateLimitError(e);
formatted_error = new RateLimitError(e);
} else if (e.message.includes(`Exceeded max limit of`)) {
formatted_error = EthereumApiConnection.handleLargeResponseError(e);
formatted_error = new LargeResponseError(e);
} else {
formatted_error = new ApiConnectionError(
e.name,
Expand All @@ -84,38 +88,4 @@ export class EthereumApiConnection
}
return formatted_error;
}

static handleLargeResponseError(e: Error): ApiConnectionError {
const newMessage = `Oversized RPC node response. This issue is related to the network's RPC nodes configuration, not your application. You may report it to the network's maintainers or try a different RPC node.\n\n${e.message}`;

return new ApiConnectionError(
'RpcInternalError',
newMessage,
ApiErrorType.Default,
);
}

static handleRateLimitError(e: Error): ApiConnectionError {
return new ApiConnectionError(
'RateLimit',
e.message,
ApiErrorType.RateLimit,
);
}

static handleTimeoutError(e: Error): ApiConnectionError {
return new ApiConnectionError(
'TimeoutError',
e.message,
ApiErrorType.Timeout,
);
}

static handleDisconnectionError(e: Error): ApiConnectionError {
return new ApiConnectionError(
'ConnectionError',
e.message,
ApiErrorType.Connection,
);
}
}
58 changes: 29 additions & 29 deletions packages/node/src/ethereum/api.service.ethereum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,40 +53,40 @@ export class EthereumApiService extends ApiService<

const endpointToApiIndex: Record<string, EthereumApiConnection> = {};

await Promise.all(
endpoints.map(async (endpoint, i) => {
const connection = await EthereumApiConnection.create(
endpoint,
this.fetchBlockBatches,
this.eventEmitter,
);

const api = connection.unsafeApi;

this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 1,
apiIndex: i,
endpoint: endpoint,
});
for await (const [i, endpoint] of endpoints.entries()) {
const connection = await EthereumApiConnection.create(
endpoint,
this.fetchBlockBatches,
this.eventEmitter,
);

const api = connection.unsafeApi;

this.eventEmitter.emit(IndexerEvent.ApiConnected, {
value: 1,
apiIndex: i,
endpoint: endpoint,
});

if (!this.networkMeta) {
this.networkMeta = connection.networkMeta;
}

if (!this.networkMeta) {
this.networkMeta = connection.networkMeta;
}
if (network.chainId !== api.getChainId().toString()) {
throw this.metadataMismatchError(
'ChainId',
network.chainId,
api.getChainId().toString(),
);
}

if (network.chainId !== api.getChainId().toString()) {
throw this.metadataMismatchError(
'ChainId',
network.chainId,
api.getChainId().toString(),
);
}
endpointToApiIndex[endpoint] = connection;
}

endpointToApiIndex[endpoint] = connection;
}),
await this.connectionPoolService.addBatchToConnections(
endpointToApiIndex,
);

this.connectionPoolService.addBatchToConnections(endpointToApiIndex);

return this;
} catch (e) {
logger.error(e, 'Failed to init api service');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// / Copyright 2020-2021 OnFinality Limited authors & contributors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import { IBlockDispatcher } from '@subql/node-core';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ import {
HostDynamicDS,
WorkerBlockDispatcher,
IUnfinalizedBlocksService,
HostConnectionPoolState,
ConnectionPoolStateManager,
connectionPoolStateHostFunctions,
} from '@subql/node-core';
import { Store } from '@subql/types-ethereum';
import chalk from 'chalk';
import {
SubqlProjectDs,
SubqueryProject,
} from '../../configure/SubqueryProject';
import { EthereumApiConnection } from '../../ethereum/api.connection';

import { EthereumBlockWrapped } from '../../ethereum/block.ethereum';
import { DynamicDsService } from '../dynamic-ds.service';
import { UnfinalizedBlocksService } from '../unfinalizedBlocks.service';
Expand All @@ -41,6 +45,8 @@ async function createIndexerWorker(
store: Store,
dynamicDsService: IDynamicDsService<SubqlProjectDs>,
unfinalizedBlocksService: IUnfinalizedBlocksService<EthereumBlockWrapped>,
connectionPoolState: ConnectionPoolStateManager<EthereumApiConnection>,

root: string,
): Promise<IndexerWorker> {
const indexerWorker = Worker.create<
Expand Down Expand Up @@ -75,6 +81,7 @@ async function createIndexerWorker(
unfinalizedBlocksService.processUnfinalizedBlockHeader.bind(
unfinalizedBlocksService,
),
...connectionPoolStateHostFunctions(connectionPoolState),
},
root,
);
Expand All @@ -100,6 +107,7 @@ export class WorkerBlockDispatcherService
@Inject('ISubqueryProject') project: SubqueryProject,
dynamicDsService: DynamicDsService,
unfinalizedBlocksSevice: UnfinalizedBlocksService,
connectionPoolState: ConnectionPoolStateManager<EthereumApiConnection>,
) {
super(
nodeConfig,
Expand All @@ -116,6 +124,7 @@ export class WorkerBlockDispatcherService
storeService.getStore(),
dynamicDsService,
unfinalizedBlocksSevice,
connectionPoolState,
project.root,
),
);
Expand Down
6 changes: 6 additions & 0 deletions packages/node/src/indexer/fetch.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
ApiService,
NodeConfig,
ConnectionPoolService,
ConnectionPoolStateManager,
SmartBatchService,
StoreCacheService,
PgMmrCacheService,
Expand Down Expand Up @@ -55,6 +56,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2],
},
IndexerManager,
ConnectionPoolStateManager,
ConnectionPoolService,
{
provide: SmartBatchService,
Expand Down Expand Up @@ -85,6 +87,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
project: SubqueryProject,
dynamicDsService: DynamicDsService,
unfinalizedBlocks: UnfinalizedBlocksService,
connectionPoolState: ConnectionPoolStateManager<EthereumApiConnection>,
) =>
nodeConfig.workers !== undefined
? new WorkerBlockDispatcherService(
Expand All @@ -98,6 +101,7 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
project,
dynamicDsService,
unfinalizedBlocks,
connectionPoolState,
)
: new BlockDispatcherService(
apiService,
Expand Down Expand Up @@ -125,9 +129,11 @@ import { UnfinalizedBlocksService } from './unfinalizedBlocks.service';
'ISubqueryProject',
DynamicDsService,
UnfinalizedBlocksService,
ConnectionPoolStateManager,
],
},
FetchService,
ConnectionPoolService,
IndexingBenchmarkService,
PoiBenchmarkService,
{
Expand Down
12 changes: 12 additions & 0 deletions packages/node/src/indexer/indexer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import {
ConnectionPoolService,
StoreCacheService,
WorkerDynamicDsService,
WorkerConnectionPoolStateManager,
ConnectionPoolStateManager,
NodeConfig,
PgMmrCacheService,
MmrQueryService,
} from '@subql/node-core';
Expand All @@ -32,6 +35,15 @@ import { WorkerUnfinalizedBlocksService } from './worker/worker.unfinalizedBlock
IndexerManager,
StoreCacheService,
StoreService,
{
provide: ConnectionPoolStateManager,
useFactory: () => {
if (isMainThread) {
throw new Error('Expected to be worker thread');
}
return new WorkerConnectionPoolStateManager((global as any).host);
},
},
ConnectionPoolService,
{
provide: ApiService,
Expand Down
15 changes: 13 additions & 2 deletions packages/node/src/indexer/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import {
hostDynamicDsKeys,
HostDynamicDS,
ProcessBlockResponse,
HostConnectionPoolState,
hostConnectionPoolStateKeys,
} from '@subql/node-core';
import { SubqlProjectDs } from '../../configure/SubqueryProject';
import { EthereumApiConnection } from '../../ethereum/api.connection';
import { IndexerManager } from '../indexer.manager';
import { WorkerModule } from './worker.module';
import {
Expand Down Expand Up @@ -119,10 +122,18 @@ async function waitForWorkerBatchSize(heapSizeInBytes: number): Promise<void> {

// Register these functions to be exposed to worker host
(global as any).host = WorkerHost.create<
HostStore & HostDynamicDS<SubqlProjectDs> & HostUnfinalizedBlocks,
HostStore &
HostDynamicDS<SubqlProjectDs> &
HostUnfinalizedBlocks &
HostConnectionPoolState<EthereumApiConnection>,
IInitIndexerWorker
>(
[...hostStoreKeys, ...hostDynamicDsKeys, ...hostUnfinalizedBlocksKeys],
[
...hostStoreKeys,
...hostDynamicDsKeys,
...hostUnfinalizedBlocksKeys,
...hostConnectionPoolStateKeys,
],
{
initWorker,
fetchBlock,
Expand Down
Loading

0 comments on commit de429be

Please sign in to comment.