diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index ca8908c77f..cf7c5808bc 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -4,6 +4,7 @@ import {EventEmitter2} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry, IProjectNetworkConfig} from '@subql/types-core'; +import {range} from 'lodash'; import { BlockDispatcher, delay, @@ -80,7 +81,6 @@ const mockDs: BaseDataSource = { const projectService = { getStartBlockFromDataSources: jest.fn(() => mockDs.startBlock), - getDataSourcesMap: jest.fn(() => new BlockHeightMap(new Map([[1, [mockDs, mockDs]]]))), // TODO getAllDataSources: jest.fn(() => [mockDs]), } as any as IProjectService; @@ -147,6 +147,10 @@ describe('Fetch Service', () => { eventEmitter, schedulerRegistry ); + + (fetchService as any).projectService.getDataSourcesMap = jest.fn( + () => new BlockHeightMap(new Map([[1, [mockDs, mockDs]]])) + ); }); const enableDictionary = () => { @@ -176,6 +180,58 @@ describe('Fetch Service', () => { expect(preHookLoopSpy).toHaveBeenCalled(); }); + it('adds bypassBlocks for empty datasources', async () => { + (fetchService as any).projectService.getDataSourcesMap = jest.fn().mockReturnValueOnce( + new BlockHeightMap( + new Map([ + [ + 1, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + ], + ], + [ + 10, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + {startBlock: 10, endBlock: 20}, + ], + ], + [ + 21, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + ], + ], + [ + 50, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 1, endBlock: 100}, + {startBlock: 50, endBlock: 200}, + ], + ], + [ + 101, + [ + {startBlock: 1, endBlock: 300}, + {startBlock: 50, endBlock: 200}, + ], + ], + [201, [{startBlock: 1, endBlock: 300}]], + [301, []], + [500, [{startBlock: 500}]], + ]) + ) + ); + + await fetchService.init(1); + expect((fetchService as any).bypassBlocks).toEqual(range(301, 500)); + }); + it('checks chain heads at an interval', async () => { const finalizedSpy = jest.spyOn(fetchService, 'getFinalizedHeight'); const bestSpy = jest.spyOn(fetchService, 'getBestHeight'); @@ -308,8 +364,7 @@ describe('Fetch Service', () => { }); it('skips bypassBlocks', async () => { - // This doesn't get set in init because networkConfig doesn't define it, so we can set it - (fetchService as any).bypassBlocks = [3]; + (fetchService as any).networkConfig.bypassBlocks = [3]; const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks'); diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 98968feddc..fac0beb871 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -99,9 +99,27 @@ export abstract class BaseFetchService< } async init(startHeight: number): Promise { + this.bypassBlocks = []; + if (this.networkConfig?.bypassBlocks !== undefined) { this.bypassBlocks = transformBypassBlocks(this.networkConfig.bypassBlocks).filter((blk) => blk >= startHeight); } + + const datasources = this.projectService.getDataSourcesMap().getAll(); + + const sortedHeights = Array.from(datasources.keys()).sort((a, b) => a - b); + + for (let i = 0; i < sortedHeights.length - 1; i++) { + const currentHeight = sortedHeights[i]; + const nextHeight = sortedHeights[i + 1]; + + const currentDS = datasources.get(currentHeight); + // If the value for the current height is an empty array, then it's a gap + if (currentDS && currentDS.length === 0) { + this.bypassBlocks.push(...range(currentHeight, nextHeight)); + } + } + const interval = await this.getChainInterval(); await Promise.all([this.getFinalizedBlockHead(), this.getBestBlockHead()]); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index 99f43205e6..b544672deb 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -129,7 +129,12 @@ export abstract class BaseIndexerManager< private filterDataSources(nextProcessingHeight: number, dataSources: DS[]): DS[] { let filteredDs: DS[]; - filteredDs = dataSources.filter((ds) => ds.startBlock !== undefined && ds.startBlock <= nextProcessingHeight); + filteredDs = dataSources.filter( + (ds) => + ds.startBlock !== undefined && + ds.startBlock <= nextProcessingHeight && + (ds.endBlock ?? Number.MAX_SAFE_INTEGER) >= nextProcessingHeight + ); // perform filter for custom ds filteredDs = filteredDs.filter((ds) => { diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index 8da27ddc95..541de99e1c 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -292,7 +292,19 @@ export abstract class BaseProjectService(); - for (const [height, project] of this.projectUpgradeService.projects) { + const projects = [...this.projectUpgradeService.projects]; + + for (let i = 0; i < projects.length; i++) { + const [height, project] = projects[i]; + let nextMinStartHeight: number; + + if (i + 1 < projects.length) { + const nextProject = projects[i + 1][1]; + nextMinStartHeight = nextProject.dataSources + .filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock) + .sort((a, b) => a.startBlock - b.startBlock)[0].startBlock; + } + const activeDataSources = new Set(); //events denote addition or deletion of datasources from height-datasource map entries at the specified block height const events: { @@ -302,7 +314,9 @@ export abstract class BaseProjectService !!ds.startBlock) + .filter((ds): ds is DS & {startBlock: number} => { + return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock); + }) .forEach((ds) => { events.push({block: Math.max(height, ds.startBlock), start: true, ds}); if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds}); @@ -311,12 +325,6 @@ export abstract class BaseProjectService a.block - b.block || Number(b.start) - Number(a.start)); - // remove all dsMap entries after this height because it will be replaced by current project datasources - const minStartHeight = sortedEvents[0].block; - [...dsMap.entries()].forEach(([height, ds]) => { - if (height >= minStartHeight) dsMap.delete(height); - }); - sortedEvents.forEach((event) => { event.start ? activeDataSources.add(event.ds) : activeDataSources.delete(event.ds); dsMap.set(event.block, Array.from(activeDataSources));