Skip to content

Commit

Permalink
add empty datasource heights to bypassBlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
guplersaxanoid committed Oct 23, 2023
1 parent 36d630c commit c9967bc
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 12 deletions.
61 changes: 58 additions & 3 deletions packages/node-core/src/indexer/fetch.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import {EventEmitter2} from '@nestjs/event-emitter';
import {SchedulerRegistry} from '@nestjs/schedule';
import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry, IProjectNetworkConfig} from '@subql/types-core';
import {range} from 'lodash';
import {
BlockDispatcher,
delay,
Expand Down Expand Up @@ -80,7 +81,6 @@ const mockDs: BaseDataSource = {

const projectService = {
getStartBlockFromDataSources: jest.fn(() => mockDs.startBlock),
getDataSourcesMap: jest.fn(() => new BlockHeightMap(new Map([[1, [mockDs, mockDs]]]))), // TODO
getAllDataSources: jest.fn(() => [mockDs]),
} as any as IProjectService<any>;

Expand Down Expand Up @@ -147,6 +147,10 @@ describe('Fetch Service', () => {
eventEmitter,
schedulerRegistry
);

(fetchService as any).projectService.getDataSourcesMap = jest.fn(
() => new BlockHeightMap(new Map([[1, [mockDs, mockDs]]]))
);
});

const enableDictionary = () => {
Expand Down Expand Up @@ -176,6 +180,58 @@ describe('Fetch Service', () => {
expect(preHookLoopSpy).toHaveBeenCalled();
});

it('adds bypassBlocks for empty datasources', async () => {
(fetchService as any).projectService.getDataSourcesMap = jest.fn().mockReturnValueOnce(
new BlockHeightMap(
new Map([
[
1,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
],
],
[
10,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
{startBlock: 10, endBlock: 20},
],
],
[
21,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
],
],
[
50,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
{startBlock: 50, endBlock: 200},
],
],
[
101,
[
{startBlock: 1, endBlock: 300},
{startBlock: 50, endBlock: 200},
],
],
[201, [{startBlock: 1, endBlock: 300}]],
[301, []],
[500, [{startBlock: 500}]],
])
)
);

await fetchService.init(1);
expect((fetchService as any).bypassBlocks).toEqual(range(301, 500));
});

it('checks chain heads at an interval', async () => {
const finalizedSpy = jest.spyOn(fetchService, 'getFinalizedHeight');
const bestSpy = jest.spyOn(fetchService, 'getBestHeight');
Expand Down Expand Up @@ -308,8 +364,7 @@ describe('Fetch Service', () => {
});

it('skips bypassBlocks', async () => {
// This doesn't get set in init because networkConfig doesn't define it, so we can set it
(fetchService as any).bypassBlocks = [3];
(fetchService as any).networkConfig.bypassBlocks = [3];

const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks');

Expand Down
18 changes: 18 additions & 0 deletions packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,27 @@ export abstract class BaseFetchService<
}

async init(startHeight: number): Promise<void> {
this.bypassBlocks = [];

if (this.networkConfig?.bypassBlocks !== undefined) {
this.bypassBlocks = transformBypassBlocks(this.networkConfig.bypassBlocks).filter((blk) => blk >= startHeight);
}

const datasources = this.projectService.getDataSourcesMap().getAll();

const sortedHeights = Array.from(datasources.keys()).sort((a, b) => a - b);

for (let i = 0; i < sortedHeights.length - 1; i++) {
const currentHeight = sortedHeights[i];
const nextHeight = sortedHeights[i + 1];

const currentDS = datasources.get(currentHeight);
// If the value for the current height is an empty array, then it's a gap
if (currentDS && currentDS.length === 0) {
this.bypassBlocks.push(...range(currentHeight, nextHeight));
}
}

const interval = await this.getChainInterval();

await Promise.all([this.getFinalizedBlockHead(), this.getBestBlockHead()]);
Expand Down
7 changes: 6 additions & 1 deletion packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ export abstract class BaseIndexerManager<
private filterDataSources(nextProcessingHeight: number, dataSources: DS[]): DS[] {
let filteredDs: DS[];

filteredDs = dataSources.filter((ds) => ds.startBlock !== undefined && ds.startBlock <= nextProcessingHeight);
filteredDs = dataSources.filter(
(ds) =>
ds.startBlock !== undefined &&
ds.startBlock <= nextProcessingHeight &&
(ds.endBlock ?? Number.MAX_SAFE_INTEGER) >= nextProcessingHeight
);

// perform filter for custom ds
filteredDs = filteredDs.filter((ds) => {
Expand Down
24 changes: 16 additions & 8 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,19 @@ export abstract class BaseProjectService<API extends IApi, DS extends BaseDataSo
const dynamicDs = this.dynamicDsService.dynamicDatasources;
const dsMap = new Map<number, DS[]>();

for (const [height, project] of this.projectUpgradeService.projects) {
const projects = [...this.projectUpgradeService.projects];

for (let i = 0; i < projects.length; i++) {
const [height, project] = projects[i];
let nextMinStartHeight: number;

if (i + 1 < projects.length) {
const nextProject = projects[i + 1][1];
nextMinStartHeight = nextProject.dataSources
.filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock)
.sort((a, b) => a.startBlock - b.startBlock)[0].startBlock;
}

const activeDataSources = new Set<DS>();
//events denote addition or deletion of datasources from height-datasource map entries at the specified block height
const events: {
Expand All @@ -302,7 +314,9 @@ export abstract class BaseProjectService<API extends IApi, DS extends BaseDataSo
}[] = [];

[...project.dataSources, ...dynamicDs]
.filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock)
.filter((ds): ds is DS & {startBlock: number} => {
return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock);
})
.forEach((ds) => {
events.push({block: Math.max(height, ds.startBlock), start: true, ds});
if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds});
Expand All @@ -311,12 +325,6 @@ export abstract class BaseProjectService<API extends IApi, DS extends BaseDataSo
// sort events by block in ascending order, start events come before end events
const sortedEvents = events.sort((a, b) => a.block - b.block || Number(b.start) - Number(a.start));

// remove all dsMap entries after this height because it will be replaced by current project datasources
const minStartHeight = sortedEvents[0].block;
[...dsMap.entries()].forEach(([height, ds]) => {
if (height >= minStartHeight) dsMap.delete(height);
});

sortedEvents.forEach((event) => {
event.start ? activeDataSources.add(event.ds) : activeDataSources.delete(event.ds);
dsMap.set(event.block, Array.from(activeDataSources));
Expand Down

0 comments on commit c9967bc

Please sign in to comment.