Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix targetHeight sync, move code to node core #2491

Merged
merged 4 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- Provide a better error message when user increases project start height beyond indexed height (#2492)
- Define new core modules to reduce duplicate code in nodes (#2491)

### Fixed
- "targetHeight" being updated out of sync with indexing, leading to it possibly being behind "lastProcessedHeight" (#2491)

## [11.0.0] - 2024-07-11
### Changed
Expand Down
8 changes: 4 additions & 4 deletions packages/node-core/src/admin/admin.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from '@nestjs/common';
import {EventEmitter2, OnEvent} from '@nestjs/event-emitter';
import {TargetBlockPayload, RewindPayload, AdminEvent, IndexerEvent} from '../events';
import {MonitorService, PoiService, StoreService} from '../indexer';
import {MonitorService, PoiService, ProofOfIndexHuman, StoreService} from '../indexer';
import {getLogger} from '../logger';
import {timeout} from '../utils';
import {BlockRangeDto, BlockRangeDtoInterface} from './blockRange';
Expand Down Expand Up @@ -68,7 +68,7 @@ export class AdminController {
}

@Get('poi/')
async getPoisByRange(@Query(ValidationPipe) blockRange: BlockRangeDto) {
async getPoisByRange(@Query(ValidationPipe) blockRange: BlockRangeDto): Promise<ProofOfIndexHuman[]> {
const {endBlock, startBlock} = blockRange;
// TODO, class validator seems not work properly, need to complete in future
if (endBlock && Number(startBlock) > Number(endBlock)) {
Expand Down Expand Up @@ -131,15 +131,15 @@ export class AdminListener {
constructor(private eventEmitter: EventEmitter2) {}

@OnEvent(IndexerEvent.RewindSuccess)
handleRewindSuccess(payload: RewindPayload) {
handleRewindSuccess(payload: RewindPayload): void {
this.eventEmitter.emit(AdminEvent.RewindTargetResponse, {
...payload,
message: `Rewind to block ${payload.height} successful`,
});
}

@OnEvent(IndexerEvent.RewindFailure)
handleRewindFailure(payload: RewindPayload) {
handleRewindFailure(payload: RewindPayload): void {
this.eventEmitter.emit(AdminEvent.RewindTargetResponse, {...payload});
}
}
5 changes: 0 additions & 5 deletions packages/node-core/src/admin/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {AdminController, AdminListener} from './admin.controller';

export const adminControllers = [AdminController];
// include for other service
export const adminServices = [AdminListener];
export * from './blockRange';
44 changes: 44 additions & 0 deletions packages/node-core/src/indexer/core.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {Module} from '@nestjs/common';
import {AdminController, AdminListener} from '../admin/admin.controller';
import {IndexingBenchmarkService, PoiBenchmarkService} from './benchmark.service';
import {ConnectionPoolService} from './connectionPool.service';
import {ConnectionPoolStateManager} from './connectionPoolState.manager';
import {InMemoryCacheService} from './inMemoryCache.service';
import {MonitorService} from './monitor.service';
import {PoiService, PoiSyncService} from './poi';
import {SandboxService} from './sandbox.service';
import {StoreService} from './store.service';
import {StoreCacheService} from './storeCache';

@Module({
providers: [
InMemoryCacheService,
SandboxService,
ConnectionPoolStateManager,
ConnectionPoolService,
IndexingBenchmarkService,
PoiBenchmarkService,
MonitorService,
PoiService,
PoiSyncService,
StoreService,
StoreCacheService,
AdminListener,
],
controllers: [AdminController],
exports: [
ConnectionPoolService,
ConnectionPoolStateManager,
SandboxService,
MonitorService,
PoiService,
PoiSyncService,
StoreService,
StoreCacheService,
InMemoryCacheService,
],
})
export class CoreModule {}
20 changes: 13 additions & 7 deletions packages/node-core/src/indexer/fetch.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ class TestFetchService extends BaseFetchService<BaseDataSource, IBlockDispatcher
getGenesisHash(): string {
return genesisHash;
}
async getFinalizedHeight(): Promise<number> {
return Promise.resolve(this.finalizedHeight);
}
async getBestHeight(): Promise<number> {
return Promise.resolve(this.bestHeight);
}
Expand Down Expand Up @@ -74,7 +71,7 @@ class TestFetchService extends BaseFetchService<BaseDataSource, IBlockDispatcher
this.projectService.getDataSourcesMap = jest.fn(() => blockHeightMap);
}

protected async getFinalizedHeader(): Promise<Header> {
async getFinalizedHeader(): Promise<Header> {
return Promise.resolve({blockHeight: this.finalizedHeight, blockHash: '0xxx', parentHash: '0xxx'});
}
}
Expand Down Expand Up @@ -214,7 +211,12 @@ describe('Fetch Service', () => {
dictionaryService,
eventEmitter,
schedulerRegistry,
unfinalizedBlocksService
unfinalizedBlocksService,
{
metadata: {
set: jest.fn(),
},
} as any
);

spyOnEnqueueSequential = jest.spyOn(fetchService as any, 'enqueueSequential') as any;
Expand Down Expand Up @@ -333,7 +335,7 @@ describe('Fetch Service', () => {
});

it('checks chain heads at an interval', async () => {
const finalizedSpy = jest.spyOn(fetchService, 'getFinalizedHeight');
const finalizedSpy = jest.spyOn(fetchService, 'getFinalizedHeader');
const bestSpy = jest.spyOn(fetchService, 'getBestHeight');

await fetchService.init(1);
Expand All @@ -347,7 +349,11 @@ describe('Fetch Service', () => {
expect(finalizedSpy).toHaveBeenCalledTimes(2);
expect(bestSpy).toHaveBeenCalledTimes(2);

await expect(fetchService.getFinalizedHeight()).resolves.toBe(fetchService.finalizedHeight);
await expect(fetchService.getFinalizedHeader()).resolves.toEqual({
blockHeight: fetchService.finalizedHeight,
blockHash: '0xxx',
parentHash: '0xxx',
});
});

it('enqueues blocks WITHOUT dictionary', async () => {
Expand Down
7 changes: 6 additions & 1 deletion packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {IBlockDispatcher} from './blockDispatcher';
import {mergeNumAndBlocksToNums} from './dictionary';
import {DictionaryService} from './dictionary/dictionary.service';
import {getBlockHeight, mergeNumAndBlocks} from './dictionary/utils';
import {StoreCacheService} from './storeCache';
import {Header, IBlock, IProjectService} from './types';
import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service';

Expand Down Expand Up @@ -52,7 +53,8 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
protected dictionaryService: DictionaryService<DS, FB>,
private eventEmitter: EventEmitter2,
private schedulerRegistry: SchedulerRegistry,
private unfinalizedBlocksService: IUnfinalizedBlocksServiceUtil
private unfinalizedBlocksService: IUnfinalizedBlocksServiceUtil,
private storeCacheService: StoreCacheService
) {}

private get latestBestHeight(): number {
Expand Down Expand Up @@ -232,6 +234,9 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
continue;
}

// Update the target height, this happens here to stay in sync with the rest of indexing
this.storeCacheService.metadata.set('targetHeight', latestHeight);

// This could be latestBestHeight, dictionary should never include finalized blocks
// TODO add buffer so dictionary not used when project synced
if (startBlockHeight < this.latestBestHeight - scaledBatchSize) {
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

export * from './benchmark.service';
export * from './connectionPool.service';
export * from './connectionPoolState.manager';
export * from './entities';
Expand All @@ -25,3 +24,4 @@ export * from './indexer.manager';
export * from './ds-processor.service';
export * from './unfinalizedBlocks.service';
export * from './monitor.service';
export * from './core.module';
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ export * from './worker.monitor.service';
export * from './worker.service';
export * from './worker';
export * from './utils';
export * from './worker.core.module';
33 changes: 33 additions & 0 deletions packages/node-core/src/indexer/worker/worker.core.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {Module} from '@nestjs/common';
import {ConnectionPoolService} from '../connectionPool.service';
import {ConnectionPoolStateManager} from '../connectionPoolState.manager';
import {InMemoryCacheService} from '../inMemoryCache.service';
import {MonitorService} from '../monitor.service';
import {SandboxService} from '../sandbox.service';
import {WorkerInMemoryCacheService} from './worker.cache.service';
import {WorkerConnectionPoolStateManager} from './worker.connectionPoolState.manager';
import {WorkerMonitorService} from './worker.monitor.service';

@Module({
providers: [
ConnectionPoolService,
SandboxService,
{
provide: ConnectionPoolStateManager,
useFactory: () => new WorkerConnectionPoolStateManager((global as any).host),
},
{
provide: MonitorService,
useFactory: () => new WorkerMonitorService((global as any).host),
},
{
provide: InMemoryCacheService,
useFactory: () => new WorkerInMemoryCacheService((global as any).host),
},
],
exports: [ConnectionPoolService, SandboxService, MonitorService, InMemoryCacheService],
})
export class WorkerCoreModule {}
13 changes: 1 addition & 12 deletions packages/node-core/src/meta/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,4 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {MetricEventListener} from './event.listener';
import {HealthController} from './health.controller';
import {HealthService} from './health.service';
import {gaugeProviders} from './meta';
import {ReadyController} from './ready.controller';
import {ReadyService} from './ready.service';

export * from './meta.service';

export const metaControllers = [HealthController, ReadyController];

export const metaServices = [MetricEventListener, HealthService, ReadyService, ...gaugeProviders];
export * from './meta.module';
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import { Controller, Get } from '@nestjs/common';
import { MetaService } from './meta.service';
import {Controller, Get} from '@nestjs/common';
import {MetaService} from './meta.service';

@Controller('meta')
export class MetaController {
Expand Down
40 changes: 40 additions & 0 deletions packages/node-core/src/meta/meta.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {DynamicModule, Module} from '@nestjs/common';
import {PrometheusModule} from '@willsoto/nestjs-prometheus';
// import { FetchModule } from '../indexer/fetch.module';
import {CoreModule} from '../indexer';
import {MetricEventListener} from './event.listener';
import {HealthController} from './health.controller';
import {HealthService} from './health.service';
import {gaugeProviders} from './meta';
import {MetaController} from './meta.controller';
import {MetaService, MetaServiceOptions} from './meta.service';
import {ReadyController} from './ready.controller';
import {ReadyService} from './ready.service';

@Module({
// imports: [PrometheusModule.register(), FetchModule],
// controllers: [...metaControllers, MetaController],
// providers: [...metaServices, MetaService],
})
export class MetaModule {
static forRoot(options: MetaServiceOptions): DynamicModule {
return {
module: MetaModule,
imports: [PrometheusModule.register(), CoreModule],
controllers: [HealthController, ReadyController, MetaController],
providers: [
MetricEventListener,
HealthService,
ReadyService,
...gaugeProviders,
{
provide: MetaService,
useFactory: () => new MetaService(options),
},
],
};
}
}
49 changes: 26 additions & 23 deletions packages/node-core/src/meta/meta.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
// SPDX-License-Identifier: GPL-3.0

import {OnEvent} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {NodeConfig} from '../configure';
import {
BestBlockPayload,
EventPayload,
Expand All @@ -13,11 +11,31 @@ import {
ProcessedBlockCountPayload,
TargetBlockPayload,
} from '../events';
import {StoreCacheService} from '../indexer';

const UPDATE_HEIGHT_INTERVAL = 5000;
export type MetaServiceOptions = {
/**
* The version of the node from package.json
* @example
* "1.0.0
* */
version: string;
sdkVersion: {
/**
* The name of the sdk
* @example
* "@polkadot/api"
* */
name: string;
/**
* The semver of the skd package from package.json
* @example
* "1.0.0"
* */
version: string;
};
};

export abstract class BaseMetaService {
export class MetaService {
private currentProcessingHeight?: number;
private currentProcessingTimestamp?: number;
private bestHeight?: number;
Expand All @@ -29,26 +47,17 @@ export abstract class BaseMetaService {
private lastProcessedTimestamp?: number;
private processedBlockCount?: number;

constructor(private storeCacheService: StoreCacheService, config: NodeConfig) {
// TODO update UPDATE_HEIGHT_INTERVAL should be configureable based on config.storeFlushInterval * 1000 but need to use SchedulerRegistry for that
}

protected abstract packageVersion: string;

protected abstract sdkVersion(): {
name: string;
version: string;
};
constructor(private opts: MetaServiceOptions) {}

getMeta() {
const {name: sdkName, version} = this.sdkVersion();
const {name: sdkName, version} = this.opts.sdkVersion;

return {
currentProcessingHeight: this.currentProcessingHeight,
currentProcessingTimestamp: this.currentProcessingTimestamp,
targetHeight: this.targetHeight,
bestHeight: this.bestHeight,
indexerNodeVersion: this.packageVersion,
indexerNodeVersion: this.opts.version,
lastProcessedHeight: this.lastProcessedHeight,
lastProcessedTimestamp: this.lastProcessedTimestamp,
uptime: process.uptime(),
Expand All @@ -60,12 +69,6 @@ export abstract class BaseMetaService {
};
}

@Interval(UPDATE_HEIGHT_INTERVAL)
getTargetHeight(): void {
if (this.targetHeight === undefined) return;
this.storeCacheService.metadata.set('targetHeight', this.targetHeight);
}

@OnEvent(IndexerEvent.BlockProcessing)
handleProcessingBlock(blockPayload: ProcessBlockPayload): void {
this.currentProcessingHeight = blockPayload.height;
Expand Down
Loading
Loading