diff --git a/packages/common-substrate/src/project/models.ts b/packages/common-substrate/src/project/models.ts index 216f3979fa..20c99cf880 100644 --- a/packages/common-substrate/src/project/models.ts +++ b/packages/common-substrate/src/project/models.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import {RegisteredTypes, RegistryTypes, OverrideModuleType, OverrideBundleType} from '@polkadot/types/types'; -import {BlockFilterImpl, ProcessorImpl} from '@subql/common'; +import {BaseDataSource, BlockFilterImpl, ProcessorImpl} from '@subql/common'; import { SubstrateBlockFilter, SubstrateBlockHandler, @@ -24,7 +24,6 @@ import { IsArray, IsBoolean, IsEnum, - IsInt, IsOptional, IsString, IsObject, @@ -149,15 +148,12 @@ export class CustomMapping implements BaseMapping { file: string; } -export class RuntimeDataSourceBase implements SubstrateRuntimeDatasource { +export class RuntimeDataSourceBase extends BaseDataSource implements SubstrateRuntimeDatasource { @IsEnum(SubstrateDatasourceKind, {groups: [SubstrateDatasourceKind.Runtime]}) kind: SubstrateDatasourceKind.Runtime; @Type(() => RuntimeMapping) @ValidateNested() mapping: RuntimeMapping; - @IsOptional() - @IsInt() - startBlock?: number; } export class FileReferenceImpl implements FileReference { @@ -166,6 +162,7 @@ export class FileReferenceImpl implements FileReference { } export class CustomDataSourceBase + extends BaseDataSource implements SubstrateCustomDatasource { @IsString() @@ -173,9 +170,6 @@ export class CustomDataSourceBase CustomMapping) @ValidateNested() mapping: M; - @IsOptional() - @IsInt() - startBlock?: number; @Type(() => FileReferenceImpl) @ValidateNested({each: true}) assets: Map; diff --git a/packages/common/src/project/utils.ts b/packages/common/src/project/utils.ts index efc576db46..d444597973 100644 --- a/packages/common/src/project/utils.ts +++ b/packages/common/src/project/utils.ts @@ -4,7 +4,7 @@ import fs from 'fs'; import os from 'os'; import path from 'path'; -import {FileReference, MultichainProjectManifest, ProjectRootAndManifest} from '@subql/types-core'; +import {BaseDataSource, FileReference, MultichainProjectManifest, ProjectRootAndManifest} from '@subql/types-core'; import { registerDecorator, validateSync, @@ -294,3 +294,17 @@ export class FileReferenceImp implements ValidatorConstraintInterface { } export const tsProjectYamlPath = (tsManifestEntry: string) => tsManifestEntry.replace('.ts', '.yaml'); + +@ValidatorConstraint({async: false}) +export class IsEndBlockGreater implements ValidatorConstraintInterface { + validate(endBlock: number, args: ValidationArguments) { + const object = args.object as BaseDataSource; + return object.startBlock !== undefined && object.endBlock !== undefined + ? object.endBlock >= object.startBlock + : true; + } + + defaultMessage(args: ValidationArguments) { + return 'End block must be greater than or equal to start block'; + } +} diff --git a/packages/common/src/project/versioned/base.ts b/packages/common/src/project/versioned/base.ts index 627d20094f..6a03639890 100644 --- a/packages/common/src/project/versioned/base.ts +++ b/packages/common/src/project/versioned/base.ts @@ -3,9 +3,19 @@ import {FileReference, ParentProject, Processor} from '@subql/types-core'; import {plainToInstance, Type} from 'class-transformer'; -import {Allow, Equals, IsObject, IsOptional, IsString, ValidateNested, validateSync} from 'class-validator'; +import { + Allow, + Equals, + IsInt, + IsObject, + IsOptional, + IsString, + Validate, + ValidateNested, + validateSync, +} from 'class-validator'; import yaml from 'js-yaml'; -import {toJsonObject} from '../utils'; +import {IsEndBlockGreater, toJsonObject} from '../utils'; import {ParentProjectModel} from './v1_0_0/models'; export abstract class ProjectManifestBaseImpl { @@ -78,3 +88,13 @@ export class BaseDeploymentV1_0_0 { }); } } + +export class BaseDataSource { + @IsOptional() + @IsInt() + startBlock?: number; + @Validate(IsEndBlockGreater) + @IsOptional() + @IsInt() + endBlock?: number; +} diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index dcc1e311dd..182d4d0113 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -208,6 +208,12 @@ export abstract class BaseBlockDispatcher implements IBloc // Flush all data from cache and wait await this.storeCacheService.flushCache(false, true); } + + if (!this.projectService.hasDataSourcesAfterHeight(height)) { + logger.info(`All data sources have been processed up to block number ${height}. Exiting gracefully...`); + await this.storeCacheService.flushCache(false, true); + process.exit(0); + } } /** diff --git a/packages/node-core/src/indexer/dictionary.service.ts b/packages/node-core/src/indexer/dictionary.service.ts index 68a8930adc..69c2d77481 100644 --- a/packages/node-core/src/indexer/dictionary.service.ts +++ b/packages/node-core/src/indexer/dictionary.service.ts @@ -349,7 +349,7 @@ export class DictionaryService { dataSources: BlockHeightMap, buildDictionaryQueryEntries: (dataSources: DS[]) => DictionaryQueryEntry[] ): void { - this.queriesMap = dataSources.map((dataSources) => buildDictionaryQueryEntries(dataSources)); + this.queriesMap = dataSources.map(buildDictionaryQueryEntries); } async scopedDictionaryEntries( @@ -360,10 +360,8 @@ export class DictionaryService { const queryDetails = this.queriesMap?.getDetails(startBlockHeight); const queryEntry: DictionaryQueryEntry[] = queryDetails?.value ?? []; - // Update end block if query changes - if (queryDetails?.endHeight && queryDetails?.endHeight < queryEndBlock) { - queryEndBlock = queryDetails?.endHeight; - } + queryEndBlock = + queryDetails?.endHeight && queryDetails?.endHeight < queryEndBlock ? queryDetails.endHeight : queryEndBlock; const dict = await this.getDictionary(startBlockHeight, queryEndBlock, scaledBatchSize, queryEntry); diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index ca8908c77f..cf7c5808bc 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -4,6 +4,7 @@ import {EventEmitter2} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry, IProjectNetworkConfig} from '@subql/types-core'; +import {range} from 'lodash'; import { BlockDispatcher, delay, @@ -80,7 +81,6 @@ const mockDs: BaseDataSource = { const projectService = { getStartBlockFromDataSources: jest.fn(() => mockDs.startBlock), - getDataSourcesMap: jest.fn(() => new BlockHeightMap(new Map([[1, [mockDs, mockDs]]]))), // TODO getAllDataSources: jest.fn(() => [mockDs]), } as any as IProjectService; @@ -147,6 +147,10 @@ describe('Fetch Service', () => { eventEmitter, schedulerRegistry ); + + (fetchService as any).projectService.getDataSourcesMap = jest.fn( + () => new BlockHeightMap(new Map([[1, [mockDs, mockDs]]])) + ); }); const enableDictionary = () => { @@ -176,6 +180,58 @@ describe('Fetch Service', () => { expect(preHookLoopSpy).toHaveBeenCalled(); }); + it('adds bypassBlocks for empty datasources', async () => { + (fetchService as any).projectService.getDataSourcesMap = jest.fn().mockReturnValueOnce( + new BlockHeightMap( + new Map([ + [ + 1, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + ], + ], + [ + 10, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + {startBlock: 10, endBlock: 20}, + ], + ], + [ + 21, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + ], + ], + [ + 50, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + {startBlock: 50, endBlock: 200}, + ], + ], + [ + 101, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 50, endBlock: 200}, + ], + ], + [201, [{startBlock: 1, endBlock: 300}]], + [301, []], + [500, [{startBlock: 500}]], + ]) + ) + ); + + await fetchService.init(1); + expect((fetchService as any).bypassBlocks).toEqual(range(301, 500)); + }); + it('checks chain heads at an interval', async () => { const finalizedSpy = jest.spyOn(fetchService, 'getFinalizedHeight'); const bestSpy = jest.spyOn(fetchService, 'getBestHeight'); @@ -308,8 +364,7 @@ describe('Fetch Service', () => { }); it('skips bypassBlocks', async () => { - // This doesn't get set in init because networkConfig doesn't define it, so we can set it - (fetchService as any).bypassBlocks = [3]; + (fetchService as any).networkConfig.bypassBlocks = [3]; const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks'); diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 5544b839b1..d83f259bb4 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -98,10 +98,31 @@ export abstract class BaseFetchService< ); } + private updateBypassBlocksFromDatasources(): void { + const datasources = this.projectService.getDataSourcesMap().getAll(); + + const heights = Array.from(datasources.keys()); + + for (let i = 0; i < heights.length - 1; i++) { + const currentHeight = heights[i]; + const nextHeight = heights[i + 1]; + + const currentDS = datasources.get(currentHeight); + // If the value for the current height is an empty array, then it's a gap + if (currentDS && currentDS.length === 0) { + this.bypassBlocks.push(...range(currentHeight, nextHeight)); + } + } + } + async init(startHeight: number): Promise { + this.bypassBlocks = []; + if (this.networkConfig?.bypassBlocks !== undefined) { this.bypassBlocks = transformBypassBlocks(this.networkConfig.bypassBlocks).filter((blk) => blk >= startHeight); } + + this.updateBypassBlocksFromDatasources(); const interval = await this.getChainInterval(); await Promise.all([this.getFinalizedBlockHead(), this.getBestBlockHead()]); @@ -243,7 +264,7 @@ export abstract class BaseFetchService< startBlockHeight >= this.dictionaryService.startHeight && startBlockHeight < this.latestFinalizedHeight ) { - /* queryEndBlock needs to be limited by the latest height. + /* queryEndBlock needs to be limited by the latest height or the maximum value of endBlock in datasources. * Dictionaries could be in the future depending on if they index unfinalized blocks or the node is using an RPC endpoint that is behind. */ const queryEndBlock = Math.min(startBlockHeight + DICTIONARY_MAX_QUERY_SIZE, this.latestFinalizedHeight); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 72ac64becb..b544672deb 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -129,7 +129,12 @@ export abstract class BaseIndexerManager< private filterDataSources(nextProcessingHeight: number, dataSources: DS[]): DS[] { let filteredDs: DS[]; - filteredDs = dataSources.filter((ds) => ds.startBlock !== undefined && ds.startBlock <= nextProcessingHeight); + filteredDs = dataSources.filter( + (ds) => + ds.startBlock !== undefined && + ds.startBlock <= nextProcessingHeight && + (ds.endBlock ?? Number.MAX_SAFE_INTEGER) >= nextProcessingHeight + ); // perform filter for custom ds filteredDs = filteredDs.filter((ds) => { @@ -146,8 +151,13 @@ export abstract class BaseIndexerManager< private assertDataSources(ds: DS[], blockHeight: number) { if (!ds.length) { logger.error( - `Your start block of all the datasources is greater than the current indexed block height in your database. Either change your startBlock (project.yaml) to <= ${blockHeight} - or delete your database and start again from the currently specified startBlock` + `Issue detected with data sources: \n + Either all data sources have a 'startBlock' greater than the current indexed block height (${blockHeight}), + or they have an 'endBlock' less than the current block. \n + Solution options: \n + 1. Adjust 'startBlock' in project.yaml to be less than or equal to ${blockHeight}, + and 'endBlock' to be greater than or equal to ${blockHeight}. \n + 2. Delete your database and start again with the currently specified 'startBlock' and 'endBlock'.` ); process.exit(1); } diff --git a/packages/node-core/src/indexer/project.service.spec.ts b/packages/node-core/src/indexer/project.service.spec.ts new file mode 100644 index 0000000000..0b971918e1 --- /dev/null +++ b/packages/node-core/src/indexer/project.service.spec.ts @@ -0,0 +1,201 @@ +// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {NodeConfig} from '../configure'; +import {BlockHeightMap} from '../utils/blockHeightMap'; +import {BaseDsProcessorService} from './ds-processor.service'; +import {DynamicDsService} from './dynamic-ds.service'; +import {BaseProjectService} from './project.service'; +import {ISubqueryProject} from './types'; + +class TestProjectService extends BaseProjectService { + packageVersion = '1.0.0'; + + async getBlockTimestamp(height: number): Promise { + return Promise.resolve(new Date()); + } + + onProjectChange(project: any): void { + return; + } +} + +describe('BaseProjectService', () => { + let service: TestProjectService; + + beforeEach(() => { + service = new TestProjectService( + null as unknown as BaseDsProcessorService, + null as unknown as any, + null as unknown as any, + null as unknown as any, + null as unknown as any, + {dataSources: []} as unknown as ISubqueryProject, + null as unknown as any, + null as unknown as any, + {unsafe: false} as unknown as NodeConfig, + {getDynamicDatasources: jest.fn()} as unknown as DynamicDsService, + null as unknown as any, + null as unknown as any + ); + }); + + it('hasDataSourcesAfterHeight', () => { + (service as any).dynamicDsService.dynamicDatasources = []; + (service as any).projectUpgradeService = { + projects: [ + [ + 1, + { + dataSources: [ + {startBlock: 1, endBlock: 300}, + {startBlock: 10, endBlock: 20}, + {startBlock: 1, endBlock: 100}, + {startBlock: 50, endBlock: 200}, + {startBlock: 500}, + ], + }, + ], + ], + } as any; + + const result1 = service.hasDataSourcesAfterHeight(301); + expect(result1).toBe(true); + const result2 = service.hasDataSourcesAfterHeight(502); + expect(result2).toBe(true); + }); + + it('hasDataSourcesAfterHeight - no available datasource', () => { + (service as any).dynamicDsService.dynamicDatasources = []; + (service as any).projectUpgradeService = { + projects: [ + [ + 1, + { + dataSources: [ + {startBlock: 1, endBlock: 300}, + {startBlock: 10, endBlock: 20}, + {startBlock: 1, endBlock: 100}, + {startBlock: 50, endBlock: 200}, + ], + }, + ], + ], + } as any; + + const result = service.hasDataSourcesAfterHeight(301); + expect(result).toBe(false); + }); + + it('getDataSources', async () => { + (service as any).project.dataSources = [ + {startBlock: 100, endBlock: 200}, + {startBlock: 1, endBlock: 100}, + ]; + (service as any).dynamicDsService.getDynamicDatasources = jest + .fn() + .mockResolvedValue([{startBlock: 150, endBlock: 250}]); + + const result = await service.getDataSources(175); + expect(result).toEqual([ + {startBlock: 100, endBlock: 200}, + {startBlock: 150, endBlock: 250}, + ]); + }); + + describe('getDatasourceMap', () => { + it('should add endBlock heights correctly', () => { + (service as any).dynamicDsService.dynamicDatasources = []; + (service as any).projectUpgradeService = { + projects: [ + [ + 1, + { + dataSources: [ + {startBlock: 1, endBlock: 300}, + {startBlock: 10, endBlock: 20}, + {startBlock: 1, endBlock: 100}, + {startBlock: 50, endBlock: 200}, + {startBlock: 500}, + ], + }, + ], + ], + } as any; + + const result = service.getDataSourcesMap(); + expect(result.getAll()).toEqual( + new Map([ + [ + 1, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + ], + ], + [ + 10, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + {startBlock: 10, endBlock: 20}, + ], + ], + [ + 21, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + ], + ], + [ + 50, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + {startBlock: 50, endBlock: 200}, + ], + ], + [ + 101, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 50, endBlock: 200}, + ], + ], + [201, [{startBlock: 1, endBlock: 300}]], + [301, []], + [500, [{startBlock: 500}]], + ]) + ); + }); + + it('should contain datasources from current project only', () => { + (service as any).dynamicDsService.dynamicDatasources = []; + (service as any).projectUpgradeService = { + projects: [ + [ + 1, + { + dataSources: [{startBlock: 1}, {startBlock: 200}], + }, + ], + [ + 100, + { + dataSources: [{startBlock: 100}], + }, + ], + ], + } as any; + + const result = service.getDataSourcesMap(); + expect(result.getAll()).toEqual( + new Map([ + [1, [{startBlock: 1}]], + [100, [{startBlock: 100}]], + ]) + ); + }); + }); +}); diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index 7d59439364..49162ef2cd 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -10,7 +10,15 @@ import {IApi} from '../api.service'; import {IProjectUpgradeService, NodeConfig} from '../configure'; import {IndexerEvent} from '../events'; import {getLogger} from '../logger'; -import {getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, initHotSchemaReload, reindex} from '../utils'; +import { + getExistingProjectSchema, + getStartHeight, + hasValue, + initDbSchema, + initHotSchemaReload, + mainThreadOnly, + reindex, +} from '../utils'; import {BlockHeightMap} from '../utils/blockHeightMap'; import {BaseDsProcessorService} from './ds-processor.service'; import {DynamicDsService} from './dynamic-ds.service'; @@ -260,31 +268,74 @@ export abstract class BaseProjectService dsHeight > height && ds.length); + } + // This gets used when indexing blocks, it needs to be async to ensure dynamicDs is updated within workers async getDataSources(blockHeight?: number): Promise { const dataSources = this.project.dataSources; const dynamicDs = await this.dynamicDsService.getDynamicDatasources(); return [...dataSources, ...dynamicDs].filter( - (ds) => blockHeight === undefined || (ds.startBlock !== undefined && ds.startBlock <= blockHeight) + (ds) => + blockHeight === undefined || + (ds.startBlock !== undefined && + ds.startBlock <= blockHeight && + (ds.endBlock === undefined || ds.endBlock >= blockHeight)) ); } + @mainThreadOnly() getDataSourcesMap(): BlockHeightMap { - assert(isMainThread, 'This method is only avaiable on the main thread'); const dynamicDs = this.dynamicDsService.dynamicDatasources; - const dsMap = new Map(); - // Loop through all projects - for (const [height, project] of this.projectUpgradeService.projects) { - // Iterate all the DS at the project height + const projects = [...this.projectUpgradeService.projects]; + + for (let i = 0; i < projects.length; i++) { + const [height, project] = projects[i]; + let nextMinStartHeight: number; + + if (i + 1 < projects.length) { + const nextProject = projects[i + 1][1]; + nextMinStartHeight = nextProject.dataSources + .filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock) + .sort((a, b) => a.startBlock - b.startBlock)[0].startBlock; + } + + const activeDataSources = new Set(); + //events denote addition or deletion of datasources from height-datasource map entries at the specified block height + const events: { + block: number; //block height at which addition or deletion of datasource should take place + start: boolean; //if start=TRUE, add the datasource. otherwise remove the datasource. + ds: DS; + }[] = []; + [...project.dataSources, ...dynamicDs] - .filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock) - .sort((a, b) => a.startBlock - b.startBlock) - .forEach((ds, index, dataSources) => { - dsMap.set(Math.max(height, ds.startBlock), dataSources.slice(0, index + 1)); + .filter((ds): ds is DS & {startBlock: number} => { + return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock); + }) + .forEach((ds) => { + events.push({block: Math.max(height, ds.startBlock), start: true, ds}); + if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds}); }); + + // sort events by block in ascending order, start events come before end events + const sortedEvents = events.sort((a, b) => a.block - b.block || Number(b.start) - Number(a.start)); + + sortedEvents.forEach((event) => { + event.start ? activeDataSources.add(event.ds) : activeDataSources.delete(event.ds); + dsMap.set(event.block, Array.from(activeDataSources)); + }); } return new BlockHeightMap(dsMap); diff --git a/packages/node-core/src/indexer/types.ts b/packages/node-core/src/indexer/types.ts index 54d3876792..f41b8a1f6c 100644 --- a/packages/node-core/src/indexer/types.ts +++ b/packages/node-core/src/indexer/types.ts @@ -48,4 +48,5 @@ export interface IProjectService { getDataSources(blockHeight?: number): Promise; getStartBlockFromDataSources(): number; getDataSourcesMap(): BlockHeightMap; + hasDataSourcesAfterHeight(height: number): boolean; } diff --git a/packages/types-core/src/project/versioned/base.ts b/packages/types-core/src/project/versioned/base.ts index 7060f37a49..db0f3e8287 100644 --- a/packages/types-core/src/project/versioned/base.ts +++ b/packages/types-core/src/project/versioned/base.ts @@ -15,6 +15,11 @@ export interface BaseDataSource = Omit & TemplateBase; +export type BaseTemplateDataSource = Omit & + TemplateBase;