From 3d51501473ad19c92b5c4b9fbafe72ea19d80fdc Mon Sep 17 00:00:00 2001 From: JQQQ Date: Thu, 11 Jul 2024 14:26:08 +1200 Subject: [PATCH] improve fetch handle rpc finalized height rollback --- packages/node-core/CHANGELOG.md | 3 +++ .../src/indexer/fetch.service.spec.ts | 19 ++++++++++++++++-- .../node-core/src/indexer/fetch.service.ts | 20 +++++++++++++------ .../src/indexer/unfinalizedBlocks.service.ts | 11 ++++++++-- .../worker.unfinalizedBlocks.service.ts | 4 ++++ packages/node/CHANGELOG.md | 3 +++ packages/node/src/indexer/fetch.service.ts | 18 +++++++++-------- .../src/indexer/unfinalizedBlocks.service.ts | 1 - 8 files changed, 60 insertions(+), 19 deletions(-) diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index b64d038a69..3ace4832b7 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Create interval for flushing the cache, this is to support chains that only produce blocks with new transactions (#2485) - Improved types for strict TS setting (#2484) +### Fixed +- Improve indexer could stall due to rpc finalized height could be smaller than previous result (#2487) + ## [10.10.2] - 2024-07-10 ### Fixed - Fix issue admin api can not get `dbSize` due to it not been set in \_metadata table diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index 4247ed293d..927c37c10f 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -5,7 +5,16 @@ import {EventEmitter2} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry, IProjectNetworkConfig} from '@subql/types-core'; import {range} from 'lodash'; -import {BlockDispatcher, delay, IBlock, IBlockDispatcher, IProjectService, NodeConfig} from '../'; +import { + BaseUnfinalizedBlocksService, + BlockDispatcher, + delay, + Header, + IBlock, + IBlockDispatcher, + IProjectService, + NodeConfig, +} from '../'; import {BlockHeightMap} from '../utils/blockHeightMap'; import {DictionaryService} from './dictionary/dictionary.service'; import {BaseFetchService} from './fetch.service'; @@ -64,6 +73,10 @@ class TestFetchService extends BaseFetchService): void { this.projectService.getDataSourcesMap = jest.fn(() => blockHeightMap); } + + protected async getFinalizedHeader(): Promise
{ + return Promise.resolve({blockHeight: this.finalizedHeight, blockHash: '0xxx', parentHash: '0xxx'}); + } } const nodeConfig = new NodeConfig({ @@ -156,6 +169,7 @@ describe('Fetch Service', () => { let dictionaryService: DictionaryService; let networkConfig: IProjectNetworkConfig; let dataSources: BaseDataSource[]; + let unfinalizedBlocksService: BaseUnfinalizedBlocksService; let spyOnEnqueueSequential: jest.SpyInstance< void | Promise, @@ -199,7 +213,8 @@ describe('Fetch Service', () => { blockDispatcher, dictionaryService, eventEmitter, - schedulerRegistry + schedulerRegistry, + unfinalizedBlocksService ); spyOnEnqueueSequential = jest.spyOn(fetchService as any, 'enqueueSequential') as any; diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 3d1c64f395..74995c9a24 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -16,7 +16,8 @@ import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; import {getBlockHeight, mergeNumAndBlocks} from './dictionary/utils'; -import {IBlock, IProjectService} from './types'; +import {Header, IBlock, IProjectService} from './types'; +import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service'; const logger = getLogger('FetchService'); @@ -29,7 +30,7 @@ export abstract class BaseFetchService; + protected abstract getFinalizedHeader(): Promise
; protected abstract getBestHeight(): Promise; // The rough interval at which new blocks are produced @@ -50,7 +51,8 @@ export abstract class BaseFetchService, private eventEmitter: EventEmitter2, - private schedulerRegistry: SchedulerRegistry + private schedulerRegistry: SchedulerRegistry, + private unfinalizedBlocksService: IUnfinalizedBlocksServiceUtil ) {} private get latestBestHeight(): number { @@ -144,9 +146,15 @@ export abstract class BaseFetchService { try { - const currentFinalizedHeight = await this.getFinalizedHeight(); - if (this._latestFinalizedHeight !== currentFinalizedHeight) { - this._latestFinalizedHeight = currentFinalizedHeight; + const currentFinalizedHeader = await this.getFinalizedHeader(); + // Rpc could return finalized height below last finalized height due to unmatched nodes, and this could lead indexing stall + // See how this could happen in https://gist.github.com/jiqiang90/ea640b07d298bca7cbeed4aee50776de + if ( + this._latestFinalizedHeight === undefined || + currentFinalizedHeader.blockHeight > this._latestFinalizedHeight + ) { + this._latestFinalizedHeight = currentFinalizedHeader.blockHeight; + this.unfinalizedBlocksService.registerFinalizedBlock(currentFinalizedHeader); if (!this.nodeConfig.unfinalizedBlocks) { this.eventEmitter.emit(IndexerEvent.BlockTarget, { height: this.latestFinalizedHeight, diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index 6e16d59d3a..e6dca92ccf 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -23,7 +23,7 @@ const UNFINALIZED_THRESHOLD = 200; type UnfinalizedBlocks = Header[]; -export interface IUnfinalizedBlocksService { +export interface IUnfinalizedBlocksService extends IUnfinalizedBlocksServiceUtil { init(reindex: (targetHeight: number) => Promise): Promise; processUnfinalizedBlocks(block: IBlock | undefined): Promise; processUnfinalizedBlockHeader(header: Header | undefined): Promise; @@ -32,6 +32,10 @@ export interface IUnfinalizedBlocksService { getMetadataUnfinalizedBlocks(): Promise; } +export interface IUnfinalizedBlocksServiceUtil { + registerFinalizedBlock(header: Header): void; +} + export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlocksService { private _unfinalizedBlocks?: UnfinalizedBlocks; private _finalizedHeader?: Header; @@ -65,7 +69,10 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo return this._finalizedHeader; } - constructor(protected readonly nodeConfig: NodeConfig, protected readonly storeCache: StoreCacheService) {} + constructor( + protected readonly nodeConfig: NodeConfig, + protected readonly storeCache: StoreCacheService + ) {} async init(reindex: (targetHeight: number) => Promise): Promise { logger.info(`Unfinalized blocks is ${this.nodeConfig.unfinalizedBlocks ? 'enabled' : 'disabled'}`); diff --git a/packages/node-core/src/indexer/worker/worker.unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/worker/worker.unfinalizedBlocks.service.ts index 4474b3e4f7..59a0b46556 100644 --- a/packages/node-core/src/indexer/worker/worker.unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/worker/worker.unfinalizedBlocks.service.ts @@ -42,4 +42,8 @@ export class WorkerUnfinalizedBlocksService implements IUnfinalizedBlocksServ getMetadataUnfinalizedBlocks(): Promise { throw new Error('This method should not be called from a worker'); } + + registerFinalizedBlock(header: Header): void { + throw new Error('This method should not be called from a worker'); + } } diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index 4ca0d6e642..f0df5c2702 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Removed - Unused type (#2484) +### Changed +- Make change with `node-core` fetch service, change `getFinalizedHeight` to `getFinalizedHeader` (#2487) + ## [4.8.0] - 2024-07-10 ### Changed - Bump with `@subql/node-core`, fix admin api `dbSize` issue diff --git a/packages/node/src/indexer/fetch.service.ts b/packages/node/src/indexer/fetch.service.ts index 1599bcec70..563c1beadb 100644 --- a/packages/node/src/indexer/fetch.service.ts +++ b/packages/node/src/indexer/fetch.service.ts @@ -7,7 +7,12 @@ import { SchedulerRegistry } from '@nestjs/schedule'; import { ApiPromise } from '@polkadot/api'; import { isCustomDs, SubstrateHandlerKind } from '@subql/common-substrate'; -import { NodeConfig, BaseFetchService, getModulos } from '@subql/node-core'; +import { + NodeConfig, + BaseFetchService, + getModulos, + Header, +} from '@subql/node-core'; import { SubstrateDatasource, SubstrateBlock } from '@subql/types'; import { SubqueryProject } from '../configure/SubqueryProject'; import { calcInterval, substrateHeaderToHeader } from '../utils/substrate'; @@ -35,7 +40,7 @@ export class FetchService extends BaseFetchService< @Inject('IBlockDispatcher') blockDispatcher: ISubstrateBlockDispatcher, dictionaryService: SubstrateDictionaryService, - private unfinalizedBlocksService: UnfinalizedBlocksService, + unfinalizedBlocksService: UnfinalizedBlocksService, eventEmitter: EventEmitter2, schedulerRegistry: SchedulerRegistry, private runtimeService: RuntimeService, @@ -48,6 +53,7 @@ export class FetchService extends BaseFetchService< dictionaryService, eventEmitter, schedulerRegistry, + unfinalizedBlocksService, ); } @@ -55,14 +61,10 @@ export class FetchService extends BaseFetchService< return this.apiService.unsafeApi; } - protected async getFinalizedHeight(): Promise { + protected async getFinalizedHeader(): Promise
{ const finalizedHash = await this.api.rpc.chain.getFinalizedHead(); const finalizedHeader = await this.api.rpc.chain.getHeader(finalizedHash); - - const header = substrateHeaderToHeader(finalizedHeader); - - this.unfinalizedBlocksService.registerFinalizedBlock(header); - return header.blockHeight; + return substrateHeaderToHeader(finalizedHeader); } protected async getBestHeight(): Promise { diff --git a/packages/node/src/indexer/unfinalizedBlocks.service.ts b/packages/node/src/indexer/unfinalizedBlocks.service.ts index 1c6b5603b7..196564212a 100644 --- a/packages/node/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node/src/indexer/unfinalizedBlocks.service.ts @@ -4,7 +4,6 @@ import { Injectable } from '@nestjs/common'; import { BaseUnfinalizedBlocksService, - getLogger, Header, mainThreadOnly, NodeConfig,