Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endblock feature #2064

Merged
merged 15 commits into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions packages/common-substrate/src/project/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import {RegisteredTypes, RegistryTypes, OverrideModuleType, OverrideBundleType} from '@polkadot/types/types';
import {BlockFilterImpl, ProcessorImpl} from '@subql/common';
import {BaseDataSource, BlockFilterImpl, ProcessorImpl} from '@subql/common';
import {
SubstrateBlockFilter,
SubstrateBlockHandler,
Expand All @@ -24,7 +24,6 @@ import {
IsArray,
IsBoolean,
IsEnum,
IsInt,
IsOptional,
IsString,
IsObject,
Expand Down Expand Up @@ -149,15 +148,12 @@ export class CustomMapping implements BaseMapping<SubstrateCustomHandler> {
file: string;
}

export class RuntimeDataSourceBase implements SubstrateRuntimeDatasource {
export class RuntimeDataSourceBase extends BaseDataSource implements SubstrateRuntimeDatasource {
@IsEnum(SubstrateDatasourceKind, {groups: [SubstrateDatasourceKind.Runtime]})
kind: SubstrateDatasourceKind.Runtime;
@Type(() => RuntimeMapping)
@ValidateNested()
mapping: RuntimeMapping;
@IsOptional()
@IsInt()
startBlock?: number;
}

export class FileReferenceImpl implements FileReference {
Expand All @@ -166,16 +162,14 @@ export class FileReferenceImpl implements FileReference {
}

export class CustomDataSourceBase<K extends string, M extends CustomMapping, O = any>
extends BaseDataSource
implements SubstrateCustomDatasource<K, M, O>
{
@IsString()
kind: K;
@Type(() => CustomMapping)
@ValidateNested()
mapping: M;
@IsOptional()
@IsInt()
startBlock?: number;
@Type(() => FileReferenceImpl)
@ValidateNested({each: true})
assets: Map<string, FileReference>;
Expand Down
16 changes: 15 additions & 1 deletion packages/common/src/project/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import fs from 'fs';
import os from 'os';
import path from 'path';
import {FileReference, MultichainProjectManifest, ProjectRootAndManifest} from '@subql/types-core';
import {BaseDataSource, FileReference, MultichainProjectManifest, ProjectRootAndManifest} from '@subql/types-core';
import {
registerDecorator,
validateSync,
Expand Down Expand Up @@ -294,3 +294,17 @@ export class FileReferenceImp<T> implements ValidatorConstraintInterface {
}

export const tsProjectYamlPath = (tsManifestEntry: string) => tsManifestEntry.replace('.ts', '.yaml');

@ValidatorConstraint({async: false})
export class IsEndBlockGreater implements ValidatorConstraintInterface {
validate(endBlock: number, args: ValidationArguments) {
const object = args.object as BaseDataSource;
return object.startBlock !== undefined && object.endBlock !== undefined
? object.endBlock >= object.startBlock
: true;
}

defaultMessage(args: ValidationArguments) {
return 'End block must be greater than or equal to start block';
}
}
24 changes: 22 additions & 2 deletions packages/common/src/project/versioned/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@

import {FileReference, ParentProject, Processor} from '@subql/types-core';
import {plainToInstance, Type} from 'class-transformer';
import {Allow, Equals, IsObject, IsOptional, IsString, ValidateNested, validateSync} from 'class-validator';
import {
Allow,
Equals,
IsInt,
IsObject,
IsOptional,
IsString,
Validate,
ValidateNested,
validateSync,
} from 'class-validator';
import yaml from 'js-yaml';
import {toJsonObject} from '../utils';
import {IsEndBlockGreater, toJsonObject} from '../utils';
import {ParentProjectModel} from './v1_0_0/models';

export abstract class ProjectManifestBaseImpl<D extends BaseDeploymentV1_0_0> {
Expand Down Expand Up @@ -78,3 +88,13 @@ export class BaseDeploymentV1_0_0 {
});
}
}

export class BaseDataSource {
@IsOptional()
@IsInt()
startBlock?: number;
@Validate(IsEndBlockGreater)
@IsOptional()
@IsInt()
endBlock?: number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS> implements IBloc
// Flush all data from cache and wait
await this.storeCacheService.flushCache(false, true);
}

if (!(await this.projectService.hasDataSourcesAfterHeight(height))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is right. What happens if we run into bypassblocks at the next height?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can do this check with datasources map to make sure we are not exiting prematurely

logger.info(`All data sources have been processed up to block number ${height}. Exiting gracefully...`);
await this.storeCacheService.flushCache(false, true);
process.exit(0);
}
}

/**
Expand Down
8 changes: 3 additions & 5 deletions packages/node-core/src/indexer/dictionary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ export class DictionaryService {
dataSources: BlockHeightMap<DS[]>,
buildDictionaryQueryEntries: (dataSources: DS[]) => DictionaryQueryEntry[]
): void {
this.queriesMap = dataSources.map((dataSources) => buildDictionaryQueryEntries(dataSources));
this.queriesMap = dataSources.map(buildDictionaryQueryEntries);
}

async scopedDictionaryEntries(
Expand All @@ -360,10 +360,8 @@ export class DictionaryService {
const queryDetails = this.queriesMap?.getDetails(startBlockHeight);
const queryEntry: DictionaryQueryEntry[] = queryDetails?.value ?? [];

// Update end block if query changes
if (queryDetails?.endHeight && queryDetails?.endHeight < queryEndBlock) {
queryEndBlock = queryDetails?.endHeight;
}
queryEndBlock =
queryDetails?.endHeight && queryDetails?.endHeight < queryEndBlock ? queryDetails.endHeight : queryEndBlock;

const dict = await this.getDictionary(startBlockHeight, queryEndBlock, scaledBatchSize, queryEntry);

Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ export abstract class BaseFetchService<
startBlockHeight >= this.dictionaryService.startHeight &&
startBlockHeight < this.latestFinalizedHeight
) {
/* queryEndBlock needs to be limited by the latest height.
/* queryEndBlock needs to be limited by the latest height or the maximum value of endBlock in datasources.
* Dictionaries could be in the future depending on if they index unfinalized blocks or the node is using an RPC endpoint that is behind.
*/
const queryEndBlock = Math.min(startBlockHeight + DICTIONARY_MAX_QUERY_SIZE, this.latestFinalizedHeight);
Expand Down
9 changes: 7 additions & 2 deletions packages/node-core/src/indexer/indexer.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,13 @@ export abstract class BaseIndexerManager<
private assertDataSources(ds: DS[], blockHeight: number) {
if (!ds.length) {
logger.error(
`Your start block of all the datasources is greater than the current indexed block height in your database. Either change your startBlock (project.yaml) to <= ${blockHeight}
or delete your database and start again from the currently specified startBlock`
`Issue detected with data sources: \n
Either all data sources have a 'startBlock' greater than the current indexed block height (${blockHeight}),
or they have an 'endBlock' less than the current block. \n
Solution options: \n
1. Adjust 'startBlock' in project.yaml to be less than or equal to ${blockHeight},
and 'endBlock' to be greater than or equal to ${blockHeight}. \n
2. Delete your database and start again with the currently specified 'startBlock' and 'endBlock'.`
);
process.exit(1);
}
Expand Down
167 changes: 167 additions & 0 deletions packages/node-core/src/indexer/project.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2020-2023 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {NodeConfig} from '../configure';
import {BaseDsProcessorService} from './ds-processor.service';
import {DynamicDsService} from './dynamic-ds.service';
import {BaseProjectService} from './project.service';
import {ISubqueryProject} from './types';

class TestProjectService extends BaseProjectService<any, any> {
packageVersion = '1.0.0';

async getBlockTimestamp(height: number): Promise<Date> {
return Promise.resolve(new Date());
}

onProjectChange(project: any): void {
return;
}
}

describe('BaseProjectService', () => {
guplersaxanoid marked this conversation as resolved.
Show resolved Hide resolved
let service: TestProjectService;

beforeEach(() => {
service = new TestProjectService(
null as unknown as BaseDsProcessorService,
null as unknown as any,
null as unknown as any,
null as unknown as any,
null as unknown as any,
{dataSources: []} as unknown as ISubqueryProject<any>,
null as unknown as any,
null as unknown as any,
{unsafe: false} as unknown as NodeConfig,
{getDynamicDatasources: jest.fn()} as unknown as DynamicDsService<any>,
null as unknown as any,
null as unknown as any
);
});

it('hasDataSourcesAfterHeight', async () => {
service.getDataSources = jest.fn().mockResolvedValue([{endBlock: 100}, {endBlock: 200}, {endBlock: 300}]);

const result = await service.hasDataSourcesAfterHeight(150);
expect(result).toBe(true);
});

it('hasDataSourcesAfterHeight - undefined endBlock', async () => {
service.getDataSources = jest.fn().mockResolvedValue([{endBlock: 100}, {endBlock: undefined}]);

const result = await service.hasDataSourcesAfterHeight(150);
expect(result).toBe(true);
});

it('getDataSources', async () => {
(service as any).project.dataSources = [
{startBlock: 100, endBlock: 200},
{startBlock: 1, endBlock: 100},
];
(service as any).dynamicDsService.getDynamicDatasources = jest
.fn()
.mockResolvedValue([{startBlock: 150, endBlock: 250}]);

const result = await service.getDataSources(175);
expect(result).toEqual([
{startBlock: 100, endBlock: 200},
{startBlock: 150, endBlock: 250},
]);
});

describe('getDatasourceMap', () => {
it('should add endBlock heights correctly', () => {
(service as any).dynamicDsService.dynamicDatasources = [];
(service as any).projectUpgradeService = {
projects: [
[
1,
{
dataSources: [
{startBlock: 1, endBlock: 300},
{startBlock: 10, endBlock: 20},
{startBlock: 1, endBlock: 100},
{startBlock: 50, endBlock: 200},
{startBlock: 500},
],
},
],
],
} as any;

const result = service.getDataSourcesMap();
expect(result.getAll()).toEqual(
new Map([
[
1,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
],
],
[
10,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
{startBlock: 10, endBlock: 20},
],
],
[
21,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
],
],
[
50,
[
{startBlock: 1, endBlock: 300},
{startBlock: 1, endBlock: 100},
{startBlock: 50, endBlock: 200},
],
],
[
101,
[
{startBlock: 1, endBlock: 300},
{startBlock: 50, endBlock: 200},
],
],
[201, [{startBlock: 1, endBlock: 300}]],
[301, []],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this can happen then the indexer will exit because of

if (!(await this.projectService.hasDataSourcesAfterHeight(height))) {
logger.info(`All data sources have been processed up to block number ${height}. Exiting gracefully...`);
await this.storeCacheService.flushCache(false, true);
process.exit(0);

We either need to support this correctly in the fetch service when enqueuing blocks or throw an error if this happens

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no datasources are available to index a specific blockheight we throw this error:

https://github.com/subquery/subql/blob/5c3142bb41f9196e4fbe433ede8f66021dfbf3cc/packages/node-core/src/indexer/indexer.manager.ts#L145C1-L159C4

should we change this behaviour to somehow skip blocks that has no datasources?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that happens too late on having to index to that height to see the error.

I think we can skip blocks, its should be relatively easy to do

[500, [{startBlock: 500}]],
])
);
});

it('should contain datasources from current project only', () => {
(service as any).dynamicDsService.dynamicDatasources = [];
(service as any).projectUpgradeService = {
projects: [
[
1,
{
dataSources: [{startBlock: 1}, {startBlock: 200}],
},
],
[
100,
{
dataSources: [{startBlock: 100}],
},
],
],
} as any;

const result = service.getDataSourcesMap();
expect(result.getAll()).toEqual(
new Map([
[1, [{startBlock: 1}]],
[100, [{startBlock: 100}]],
])
);
});
});
});
Loading