Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endblock feature #2064

Merged
merged 15 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions packages/common-substrate/src/project/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import {RegisteredTypes, RegistryTypes, OverrideModuleType, OverrideBundleType} from '@polkadot/types/types';
import {BlockFilterImpl, ProcessorImpl} from '@subql/common';
import {BaseDataSource, BlockFilterImpl, ProcessorImpl} from '@subql/common';
import {
SubstrateBlockFilter,
SubstrateBlockHandler,
Expand All @@ -24,7 +24,6 @@ import {
IsArray,
IsBoolean,
IsEnum,
IsInt,
IsOptional,
IsString,
IsObject,
Expand Down Expand Up @@ -149,15 +148,12 @@ export class CustomMapping implements BaseMapping<SubstrateCustomHandler> {
file: string;
}

export class RuntimeDataSourceBase implements SubstrateRuntimeDatasource {
export class RuntimeDataSourceBase extends BaseDataSource implements SubstrateRuntimeDatasource {
@IsEnum(SubstrateDatasourceKind, {groups: [SubstrateDatasourceKind.Runtime]})
kind: SubstrateDatasourceKind.Runtime;
@Type(() => RuntimeMapping)
@ValidateNested()
mapping: RuntimeMapping;
@IsOptional()
@IsInt()
startBlock?: number;
}

export class FileReferenceImpl implements FileReference {
Expand All @@ -166,16 +162,14 @@ export class FileReferenceImpl implements FileReference {
}

export class CustomDataSourceBase<K extends string, M extends CustomMapping, O = any>
extends BaseDataSource
implements SubstrateCustomDatasource<K, M, O>
{
@IsString()
kind: K;
@Type(() => CustomMapping)
@ValidateNested()
mapping: M;
@IsOptional()
@IsInt()
startBlock?: number;
@Type(() => FileReferenceImpl)
@ValidateNested({each: true})
assets: Map<string, FileReference>;
Expand Down
16 changes: 15 additions & 1 deletion packages/common/src/project/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import fs from 'fs';
import os from 'os';
import path from 'path';
import {FileReference, MultichainProjectManifest, ProjectRootAndManifest} from '@subql/types-core';
import {BaseDataSource, FileReference, MultichainProjectManifest, ProjectRootAndManifest} from '@subql/types-core';
import {
registerDecorator,
validateSync,
Expand Down Expand Up @@ -294,3 +294,17 @@ export class FileReferenceImp<T> implements ValidatorConstraintInterface {
}

export const tsProjectYamlPath = (tsManifestEntry: string) => tsManifestEntry.replace('.ts', '.yaml');

@ValidatorConstraint({async: false})
export class IsEndBlockGreater implements ValidatorConstraintInterface {
validate(endBlock: number, args: ValidationArguments) {
const object = args.object as BaseDataSource;
return object.startBlock !== undefined && object.endBlock !== undefined
? object.endBlock >= object.startBlock
: true;
}

defaultMessage(args: ValidationArguments) {
return 'End block must be greater than or equal to start block';
}
}
24 changes: 22 additions & 2 deletions packages/common/src/project/versioned/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@

import {FileReference, ParentProject, Processor} from '@subql/types-core';
import {plainToInstance, Type} from 'class-transformer';
import {Allow, Equals, IsObject, IsOptional, IsString, ValidateNested, validateSync} from 'class-validator';
import {
Allow,
Equals,
IsInt,
IsObject,
IsOptional,
IsString,
Validate,
ValidateNested,
validateSync,
} from 'class-validator';
import yaml from 'js-yaml';
import {toJsonObject} from '../utils';
import {IsEndBlockGreater, toJsonObject} from '../utils';
import {ParentProjectModel} from './v1_0_0/models';

export abstract class ProjectManifestBaseImpl<D extends BaseDeploymentV1_0_0> {
Expand Down Expand Up @@ -78,3 +88,13 @@ export class BaseDeploymentV1_0_0 {
});
}
}

export class BaseDataSource {
@IsOptional()
@IsInt()
startBlock?: number;
@Validate(IsEndBlockGreater)
@IsOptional()
@IsInt()
endBlock?: number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS> implements IBloc
// Flush all data from cache and wait
await this.storeCacheService.flushCache(false, true);
}

if (!(await this.projectService.hasDataSourcesAfterHeight(height))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is right. What happens if we run into bypassblocks at the next height?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can do this check with datasources map to make sure we are not exiting prematurely

logger.info(`All data sources have been processed up to block number ${height}. Exiting gracefully...`);
await this.storeCacheService.flushCache(false, true);
process.exit(0);
}
}

// First creation of POI
Expand Down
21 changes: 16 additions & 5 deletions packages/node-core/src/indexer/dictionary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {getLogger} from '../logger';
import {profiler} from '../profiler';
import {timeout} from '../utils';
import {BlockHeightMap} from '../utils/blockHeightMap';
import {maxEndBlockHeight} from '../utils/endBlock';

export type SpecVersion = {
id: string;
Expand Down Expand Up @@ -341,7 +342,19 @@ export class DictionaryService {
dataSources: BlockHeightMap<DS[]>,
buildDictionaryQueryEntries: (dataSources: DS[]) => DictionaryQueryEntry[]
): void {
this.queriesMap = dataSources.map((dataSources) => buildDictionaryQueryEntries(dataSources));
this.queriesMap = dataSources.map(buildDictionaryQueryEntries);
const newQueriesMap = this.queriesMap?.getAll() || new Map();

dataSources.getAll().forEach((ds, height) => {
const endBlock = maxEndBlockHeight(ds);
const queryDetails = this.queriesMap?.getDetails(height);

if (!queryDetails?.endHeight || queryDetails.endHeight > endBlock) {
newQueriesMap.set(endBlock + 1, []);
}
});

this.queriesMap = new BlockHeightMap(newQueriesMap);
}

async scopedDictionaryEntries(
Expand All @@ -352,10 +365,8 @@ export class DictionaryService {
const queryDetails = this.queriesMap?.getDetails(startBlockHeight);
const queryEntry: DictionaryQueryEntry[] = queryDetails?.value ?? [];

// Update end block if query changes
if (queryDetails?.endHeight && queryDetails?.endHeight < queryEndBlock) {
queryEndBlock = queryDetails?.endHeight;
}
queryEndBlock =
queryDetails?.endHeight && queryDetails?.endHeight < queryEndBlock ? queryDetails.endHeight : queryEndBlock;

const dict = await this.getDictionary(startBlockHeight, queryEndBlock, scaledBatchSize, queryEntry);

Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ export abstract class BaseFetchService<
startBlockHeight >= this.dictionaryService.startHeight &&
startBlockHeight < this.latestFinalizedHeight
) {
/* queryEndBlock needs to be limited by the latest height.
/* queryEndBlock needs to be limited by the latest height or the maximum value of endBlock in datasources.
* Dictionaries could be in the future depending on if they index unfinalized blocks or the node is using an RPC endpoint that is behind.
*/
const queryEndBlock = Math.min(startBlockHeight + DICTIONARY_MAX_QUERY_SIZE, this.latestFinalizedHeight);
Expand Down
9 changes: 7 additions & 2 deletions packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ export abstract class BaseIndexerManager<
private assertDataSources(ds: DS[], blockHeight: number) {
if (!ds.length) {
logger.error(
`Your start block of all the datasources is greater than the current indexed block height in your database. Either change your startBlock (project.yaml) to <= ${blockHeight}
or delete your database and start again from the currently specified startBlock`
`Issue detected with data sources: \n
Either all data sources have a 'startBlock' greater than the current indexed block height (${blockHeight}),
or they have an 'endBlock' less than the current block. \n
Solution options: \n
1. Adjust 'startBlock' in project.yaml to be less than or equal to ${blockHeight},
and 'endBlock' to be greater than or equal to ${blockHeight}. \n
2. Delete your database and start again with the currently specified 'startBlock' and 'endBlock'.`
);
process.exit(1);
}
Expand Down
70 changes: 70 additions & 0 deletions packages/node-core/src/indexer/project.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {NodeConfig} from '../configure';
import {BaseDsProcessorService} from './ds-processor.service';
import {DynamicDsService} from './dynamic-ds.service';
import {BaseProjectService} from './project.service';
import {ISubqueryProject} from './types';

class TestProjectService extends BaseProjectService<any, any> {
packageVersion = '1.0.0';

async getBlockTimestamp(height: number): Promise<Date> {
return Promise.resolve(new Date());
}

onProjectChange(project: any): void {
return;
}
}

describe('BaseProjectService', () => {
guplersaxanoid marked this conversation as resolved.
Show resolved Hide resolved
let service: TestProjectService;

beforeEach(() => {
service = new TestProjectService(
null as unknown as BaseDsProcessorService,
null as unknown as any,
null as unknown as any,
null as unknown as any,
{dataSources: []} as unknown as ISubqueryProject<any>,
null as unknown as any,
null as unknown as any,
{unsafe: false} as unknown as NodeConfig,
{getDynamicDatasources: jest.fn()} as unknown as DynamicDsService<any>,
null as unknown as any,
null as unknown as any
);
});

test('hasDataSourcesAfterHeight', async () => {
guplersaxanoid marked this conversation as resolved.
Show resolved Hide resolved
service.getDataSources = jest.fn().mockResolvedValue([{endBlock: 100}, {endBlock: 200}, {endBlock: 300}]);

const result = await service.hasDataSourcesAfterHeight(150);
expect(result).toBe(true);
});

test('hasDataSourcesAfterHeight - undefined endBlock', async () => {
service.getDataSources = jest.fn().mockResolvedValue([{endBlock: 100}, {endBlock: undefined}]);

const result = await service.hasDataSourcesAfterHeight(150);
expect(result).toBe(true);
});

test('getDataSources', async () => {
(service as any).project.dataSources = [
{startBlock: 100, endBlock: 200},
{startBlock: 1, endBlock: 100},
];
(service as any).dynamicDsService.getDynamicDatasources = jest
.fn()
.mockResolvedValue([{startBlock: 150, endBlock: 250}]);

const result = await service.getDataSources(175);
expect(result).toEqual([
{startBlock: 100, endBlock: 200},
{startBlock: 150, endBlock: 250},
]);
});
});
11 changes: 10 additions & 1 deletion packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,22 @@ export abstract class BaseProjectService<API extends IApi, DS extends BaseDataSo
return [...dataSources, ...dynamicDs];
}

async hasDataSourcesAfterHeight(height: number): Promise<boolean> {
const dataSources = await this.getDataSources();
return dataSources.some((ds) => ds.endBlock === undefined || ds.endBlock > height);
}

// This gets used when indexing blocks, it needs to be async to ensure dynamicDs is updated within workers
async getDataSources(blockHeight?: number): Promise<DS[]> {
const dataSources = this.project.dataSources;
const dynamicDs = await this.dynamicDsService.getDynamicDatasources();

return [...dataSources, ...dynamicDs].filter(
(ds) => blockHeight === undefined || (ds.startBlock !== undefined && ds.startBlock <= blockHeight)
(ds) =>
blockHeight === undefined ||
(ds.startBlock !== undefined &&
ds.startBlock <= blockHeight &&
(ds.endBlock === undefined || ds.endBlock >= blockHeight))
);
}

Expand Down
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ export interface IProjectService<DS> {
getDataSources(blockHeight?: number): Promise<DS[]>;
getStartBlockFromDataSources(): number;
getDataSourcesMap(): BlockHeightMap<DS[]>;
hasDataSourcesAfterHeight(height: number): Promise<boolean>;
}
9 changes: 9 additions & 0 deletions packages/node-core/src/utils/endBlock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {BaseDataSource} from '@subql/common';

export function maxEndBlockHeight<DS extends BaseDataSource>(dataSources: DS[]): number {
const endBlocks = dataSources.map((ds) => (ds.endBlock !== undefined ? ds.endBlock : Number.MAX_SAFE_INTEGER));
return Math.max(...endBlocks);
}
8 changes: 7 additions & 1 deletion packages/types-core/src/project/versioned/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ export interface BaseDataSource<H extends BaseHandler = BaseHandler, M extends B
* @default 1
*/
startBlock?: number;
/**
* The ending block number for the datasource (optional).
* @type {number}
*/
endBlock?: number;
/**
* The mapping associated with the datasource.
* This contains the handlers.
Expand Down Expand Up @@ -69,4 +74,5 @@ export interface TemplateBase {
name: string;
}

export type BaseTemplateDataSource<DS extends BaseDataSource = BaseDataSource> = Omit<DS, 'startBlock'> & TemplateBase;
export type BaseTemplateDataSource<DS extends BaseDataSource = BaseDataSource> = Omit<DS, 'startBlock' | 'endBlock'> &
TemplateBase;