Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework bypass blocks to not expand large block ranges #2566

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- Issues with setting a large block range for bypass blocks (#2566)

## [14.1.5] - 2024-09-25
### Changed
Expand Down
26 changes: 9 additions & 17 deletions packages/node-core/src/indexer/fetch.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import {EventEmitter2} from '@nestjs/event-emitter';
import {SchedulerRegistry} from '@nestjs/schedule';
import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry, IProjectNetworkConfig} from '@subql/types-core';
import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry} from '@subql/types-core';
import {range} from 'lodash';
import {
BaseUnfinalizedBlocksService,
Expand Down Expand Up @@ -83,11 +83,6 @@ const nodeConfig = new NodeConfig({
networkDictionary: [''],
});

const getNetworkConfig = () =>
({
dictionary: 'https://example.com',
}) as IProjectNetworkConfig;

const mockDs: BaseDataSource = {
kind: 'mock/DataSource',
startBlock: 1,
Expand Down Expand Up @@ -140,7 +135,7 @@ const getDictionaryService = () =>
initDictionaries: () => {
/* TODO */
},
}) as any as DictionaryService<any, any>;
} as any as DictionaryService<any, any>);

const getBlockDispatcher = () => {
const inst = {
Expand All @@ -164,9 +159,9 @@ describe('Fetch Service', () => {
let fetchService: TestFetchService;
let blockDispatcher: IBlockDispatcher<any>;
let dictionaryService: DictionaryService<any, any>;
let networkConfig: IProjectNetworkConfig;
let dataSources: BaseDataSource[];
let unfinalizedBlocksService: BaseUnfinalizedBlocksService<any>;
let projectService: IProjectService<any>;

let spyOnEnqueueSequential: jest.SpyInstance<
void | Promise<void>,
Expand All @@ -183,7 +178,7 @@ describe('Fetch Service', () => {
const eventEmitter = new EventEmitter2();
const schedulerRegistry = new SchedulerRegistry();

const projectService = {
projectService = {
getStartBlockFromDataSources: jest.fn(() => Math.min(...dataSources.map((ds) => ds.startBlock ?? 0))),
getAllDataSources: jest.fn(() => dataSources),
getDataSourcesMap: jest.fn(() => {
Expand All @@ -197,16 +192,15 @@ describe('Fetch Service', () => {
});
return new BlockHeightMap(x);
}),
bypassBlocks: [],
} as any as IProjectService<any>;

blockDispatcher = getBlockDispatcher();
dictionaryService = getDictionaryService();
networkConfig = getNetworkConfig();

fetchService = new TestFetchService(
nodeConfig,
projectService,
networkConfig,
blockDispatcher,
dictionaryService,
eventEmitter,
Expand Down Expand Up @@ -331,7 +325,8 @@ describe('Fetch Service', () => {
);

await fetchService.init(1);
expect((fetchService as any).bypassBlocks).toEqual(range(301, 500));

expect((fetchService as any).getDatasourceBypassBlocks()).toEqual([`301-500`]);
});

it('checks chain heads at an interval', async () => {
Expand Down Expand Up @@ -614,7 +609,7 @@ describe('Fetch Service', () => {
});

it('skips bypassBlocks', async () => {
(fetchService as any).networkConfig.bypassBlocks = [3];
projectService.bypassBlocks = [3];

await fetchService.init(1);

Expand All @@ -625,13 +620,10 @@ describe('Fetch Service', () => {

it('transforms bypassBlocks', async () => {
// Set a range so on init its transformed
(fetchService as any).networkConfig.bypassBlocks = ['2-5'];
projectService.bypassBlocks = ['2-5'];

await fetchService.init(1);

// This doesn't work as they get removed after that height is processed
// expect((fetchService as any).bypassBlocks).toEqual([2, 3, 4, 5]);

// Note the batch size is smaller because we exclude from the initial batch size
expect(enqueueBlocksSpy).toHaveBeenCalledWith([1, 6, 7, 8, 9, 10], 10);
});
Expand Down
86 changes: 31 additions & 55 deletions packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import util from 'util';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {SchedulerRegistry} from '@nestjs/schedule';
import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core';
import {range, without} from 'lodash';
import {BaseDataSource} from '@subql/types-core';
import {range} from 'lodash';
import {NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {getLogger} from '../logger';
import {cleanedBatchBlocks, delay, transformBypassBlocks, waitForBatchSize} from '../utils';
import {delay, filterBypassBlocks, waitForBatchSize} from '../utils';
import {IBlockDispatcher} from './blockDispatcher';
import {mergeNumAndBlocksToNums} from './dictionary';
import {DictionaryService} from './dictionary/dictionary.service';
import {getBlockHeight, mergeNumAndBlocks} from './dictionary/utils';
import {mergeNumAndBlocks} from './dictionary/utils';
import {StoreCacheService} from './storeCache';
import {Header, IBlock, IProjectService} from './types';
import {BypassBlocks, Header, IBlock, IProjectService} from './types';
import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service';

const logger = getLogger('FetchService');
Expand All @@ -28,7 +27,6 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
private _latestBestHeight?: number;
private _latestFinalizedHeight?: number;
private isShutdown = false;
private bypassBlocks: number[] = [];

// If the chain doesn't have a distinction between the 2 it should return the same value for finalized and best
protected abstract getFinalizedHeader(): Promise<Header>;
Expand All @@ -48,7 +46,6 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
constructor(
private nodeConfig: NodeConfig,
protected projectService: IProjectService<DS>,
protected networkConfig: IProjectNetworkConfig,
protected blockDispatcher: B,
protected dictionaryService: DictionaryService<DS, FB>,
private eventEmitter: EventEmitter2,
Expand Down Expand Up @@ -77,31 +74,7 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
this.isShutdown = true;
}

private updateBypassBlocksFromDatasources(): void {
const datasources = this.projectService.getDataSourcesMap().getAll();

const heights = Array.from(datasources.keys());

for (let i = 0; i < heights.length - 1; i++) {
const currentHeight = heights[i];
const nextHeight = heights[i + 1];

const currentDS = datasources.get(currentHeight);
// If the value for the current height is an empty array, then it's a gap
if (currentDS && currentDS.length === 0) {
this.bypassBlocks.push(...range(currentHeight, nextHeight));
}
}
}

async init(startHeight: number): Promise<void> {
this.bypassBlocks = [];

if (this.networkConfig?.bypassBlocks !== undefined) {
this.bypassBlocks = transformBypassBlocks(this.networkConfig.bypassBlocks).filter((blk) => blk >= startHeight);
}

this.updateBypassBlocksFromDatasources();
const interval = await this.getChainInterval();

await Promise.all([this.getFinalizedBlockHead(), this.getBestBlockHead()]);
Expand Down Expand Up @@ -344,49 +317,52 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
}

private async enqueueBlocks(enqueuingBlocks: (IBlock<FB> | number)[], latestHeight: number): Promise<void> {
const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks);
const cleanedBatchBlocks = filterBypassBlocks<FB>(enqueuingBlocks, [
...this.projectService.bypassBlocks,
...this.getDatasourceBypassBlocks(),
]);
await this.blockDispatcher.enqueueBlocks(
cleanedBatchBlocks,
this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks, latestHeight)
this.getLatestBufferHeight(enqueuingBlocks, latestHeight)
);
}

/**
*
* @param cleanedBatchBlocks
* @param rawBatchBlocks
* @param latestHeight
* @private
*/
private getLatestBufferHeight(
cleanedBatchBlocks: (IBlock<FB> | number)[],
rawBatchBlocks: (IBlock<FB> | number)[],
latestHeight: number
): number {
private getLatestBufferHeight(rawBatchBlocks: (IBlock<FB> | number)[], latestHeight: number): number {
// When both BatchBlocks are empty, mean no blocks to enqueue and full synced,
// we are safe to update latestBufferHeight to this number
if (cleanedBatchBlocks.length === 0 && rawBatchBlocks.length === 0) {
if (rawBatchBlocks.length === 0) {
return latestHeight;
}
return Math.max(...mergeNumAndBlocksToNums(cleanedBatchBlocks, rawBatchBlocks));
return Math.max(...mergeNumAndBlocksToNums([], rawBatchBlocks));
}

private filteredBlockBatch(currentBatchBlocks: (number | IBlock<FB>)[]): (number | IBlock<FB>)[] {
if (!this.bypassBlocks.length || !currentBatchBlocks) {
return currentBatchBlocks;
}
/**
* If a projects datasources are not continuious we can add add them to the bypass blocks
* */
private getDatasourceBypassBlocks(): BypassBlocks {
const datasources = this.projectService.getDataSourcesMap().getAll();

const heights = Array.from(datasources.keys());

const cleanedBatch = cleanedBatchBlocks(this.bypassBlocks, currentBatchBlocks);
const bypassBlocks: BypassBlocks = [];

const pollutedBlocks = this.bypassBlocks.filter(
(b) => b < Math.max(...currentBatchBlocks.map((b) => getBlockHeight(b)))
);
if (pollutedBlocks.length) {
// inspect limits the number of logged blocks to 100
logger.info(`Bypassing blocks: ${util.inspect(pollutedBlocks, {maxArrayLength: 100})}`);
for (let i = 0; i < heights.length - 1; i++) {
const currentHeight = heights[i];
const nextHeight = heights[i + 1];

const currentDS = datasources.get(currentHeight);
// If the value for the current height is an empty array, then it's a gap
if (currentDS?.length === 0) {
bypassBlocks.push(`${currentHeight}-${nextHeight - 1}`);
}
}
this.bypassBlocks = without(this.bypassBlocks, ...pollutedBlocks);
return cleanedBatch;
return bypassBlocks;
}

private nextEndBlockHeight(startBlockHeight: number, scaledBatchSize: number): number {
Expand Down
8 changes: 6 additions & 2 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {MetadataKeys} from './entities';
import {PoiSyncService} from './poi';
import {PoiService} from './poi/poi.service';
import {StoreService} from './store.service';
import {ISubqueryProject, IProjectService} from './types';
import {ISubqueryProject, IProjectService, BypassBlocks} from './types';
import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service';

const logger = getLogger('Project');
Expand All @@ -33,7 +33,7 @@ class NotInitError extends Error {
export abstract class BaseProjectService<
API extends IApi,
DS extends BaseDataSource,
UnfinalizedBlocksService extends IUnfinalizedBlocksService<any> = IUnfinalizedBlocksService<any>,
UnfinalizedBlocksService extends IUnfinalizedBlocksService<any> = IUnfinalizedBlocksService<any>
> implements IProjectService<DS>
{
private _schema?: string;
Expand Down Expand Up @@ -83,6 +83,10 @@ export abstract class BaseProjectService<
return this._blockOffset;
}

get bypassBlocks(): BypassBlocks {
return this.project.network.bypassBlocks ?? [];
}

protected get isHistorical(): boolean {
return this.storeService.historical;
}
Expand Down
5 changes: 4 additions & 1 deletion packages/node-core/src/indexer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface ISubqueryProject<
N extends IProjectNetworkConfig = IProjectNetworkConfig,
DS extends BaseDataSource = BaseDataSource,
T extends BaseTemplateDataSource<DS> = BaseTemplateDataSource<DS>,
C = unknown,
C = unknown
> extends Omit<CommonSubqueryProject<N, DS, T>, 'schema' | 'version' | 'name' | 'specVersion' | 'description'> {
readonly schema: GraphQLSchema;
applyCronTimestamps: (getBlockTimestamp: (height: number) => Promise<Date | undefined>) => Promise<void>;
Expand All @@ -43,6 +43,7 @@ export interface IIndexerManager<B, DS> {
export interface IProjectService<DS> {
blockOffset: number | undefined;
startHeight: number;
bypassBlocks: BypassBlocks;
reindex(lastCorrectHeight: number): Promise<void>;
/**
* This is used everywhere but within indexing blocks, see comment on getDataSources for more info
Expand All @@ -68,3 +69,5 @@ export type Header = {
blockHash: string;
parentHash: string | undefined;
};

export type BypassBlocks = (number | `${number}-${number}`)[];
16 changes: 8 additions & 8 deletions packages/node-core/src/utils/blocks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@
// SPDX-License-Identifier: GPL-3.0

import {range} from 'lodash';
import {transformBypassBlocks, cleanedBatchBlocks} from './blocks';
import {BypassBlocks} from '../indexer';
import {filterBypassBlocks} from './blocks';

describe('bypass logic', () => {
it('process bypassBlocks with ranges', () => {
let bypassBlocks = transformBypassBlocks([20, 40, '5-10', 20, 140]);
expect(bypassBlocks).toEqual([20, 40, 5, 6, 7, 8, 9, 10, 140]);
let bypassBlocks: BypassBlocks = [20, 40, '5-10', 20, 140];
let currentBlockBatch = [1, 5, 7, 8, 20, 40, 100, 120];
const case_1 = cleanedBatchBlocks(bypassBlocks, currentBlockBatch);
const case_1 = filterBypassBlocks(currentBlockBatch, bypassBlocks);

expect(case_1).toEqual([1, 100, 120]);

bypassBlocks = transformBypassBlocks([' 5 - 10 ', 20, 140]);
bypassBlocks = [' 5 - 10 ', 20, 140];
currentBlockBatch = [1, 5, 7, 8, 20, 40, 100, 120];
const case_2 = cleanedBatchBlocks(bypassBlocks, currentBlockBatch);
const case_2 = filterBypassBlocks(currentBlockBatch, bypassBlocks);

expect(case_2).toEqual([1, 40, 100, 120]);
});

it('cleanedBatchBlocks with large amount blocks should not throw error Maximum call stack size exceeded', () => {
const bypassBlocks = transformBypassBlocks(['50051722-54939220']);
const bypassBlocks: BypassBlocks = ['50051722-54939220'];
const currentBlockBatch = range(34312396, 34312495);
const case_1 = cleanedBatchBlocks(bypassBlocks, currentBlockBatch);
const case_1 = filterBypassBlocks(currentBlockBatch, bypassBlocks);
expect(case_1).toEqual(currentBlockBatch);
});
});
Loading
Loading