diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index fa960f10cf..5de6d15d77 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +### Fixed +- Fixed modulo block ahead of finalized block issue (#2132) + ### Added - WorkerInMemoryCacheService from node (#2125) - New `endBlock` option on datasources (#2064) diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index d82a71fbc6..6f9f532a40 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -95,6 +95,7 @@ export abstract class BlockDispatcher } logger.info(`Enqueueing blocks ${heights[0]}...${last(heights)}, total ${heights.length} blocks`); + // Those blocks will still be filtered in the handler this.queue.putMany(heights); this.latestBufferedHeight = latestBufferHeight ?? last(heights) ?? this.latestBufferedHeight; diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index cf7c5808bc..8a5f21bf6b 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -363,6 +363,18 @@ describe('Fetch Service', () => { expect(enqueueBlocksSpy).toHaveBeenCalledWith([2, 3, 4, 6, 8, 9, 10, 12, 15, 18], 18); }); + it('update the LatestBufferHeight when modulo blocks full synced', async () => { + fetchService.modulos = [20]; + fetchService.finalizedHeight = 55; + + const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks'); + + // simulate we have synced to block 50, and modulo is 20, next block to handle suppose be 60,80,100... + // we will still enqueue 55 to update LatestBufferHeight + await fetchService.init(50); + expect(enqueueBlocksSpy).toHaveBeenLastCalledWith([], 55); + }); + it('skips bypassBlocks', async () => { (fetchService as any).networkConfig.bypassBlocks = [3]; diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index d83f259bb4..ec9a579222 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -216,10 +216,15 @@ export abstract class BaseFetchService< return moduloBlocks; } - getEnqueuedModuloBlocks(startBlockHeight: number): number[] { + /** + * + * @param startBlockHeight + * @param endBlockHeight is either FinalizedHeight or BestHeight, ensure ModuloBlocks not greater than this number + */ + getEnqueuedModuloBlocks(startBlockHeight: number, endBlockHeight: number): number[] { return this.getModuloBlocks( startBlockHeight, - this.nodeConfig.batchSize * Math.max(...this.getModulos()) + startBlockHeight + Math.min(this.nodeConfig.batchSize * Math.max(...this.getModulos()) + startBlockHeight, endBlockHeight) ).slice(0, this.nodeConfig.batchSize); } @@ -298,7 +303,7 @@ export abstract class BaseFetchService< } else { const maxBlockSize = Math.min(batchBlocks.length, this.blockDispatcher.freeSize); const enqueueBlocks = batchBlocks.slice(0, maxBlockSize); - await this.enqueueBlocks(enqueueBlocks); + await this.enqueueBlocks(enqueueBlocks, latestHeight); } continue; // skip nextBlockRange() way } @@ -313,22 +318,40 @@ export abstract class BaseFetchService< const enqueuingBlocks = handlers.length && this.getModulos().length === handlers.length - ? this.getEnqueuedModuloBlocks(startBlockHeight) + ? this.getEnqueuedModuloBlocks(startBlockHeight, latestHeight) : range(startBlockHeight, endHeight + 1); - await this.enqueueBlocks(enqueuingBlocks); + await this.enqueueBlocks(enqueuingBlocks, latestHeight); } } - private async enqueueBlocks(enqueuingBlocks: number[]): Promise { + /** + * + * @param enqueuingBlocks + * @param latestHeight ensure LatestBufferHeight get updated if enqueuingBlocks is empty + * @private + */ + private async enqueueBlocks(enqueuingBlocks: number[], latestHeight: number): Promise { const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks); await this.blockDispatcher.enqueueBlocks( cleanedBatchBlocks, - this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks) + this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks, latestHeight) ); } - private getLatestBufferHeight(cleanedBatchBlocks: number[], rawBatchBlocks: number[]): number { + /** + * + * @param cleanedBatchBlocks + * @param rawBatchBlocks + * @param latestHeight + * @private + */ + private getLatestBufferHeight(cleanedBatchBlocks: number[], rawBatchBlocks: number[], latestHeight: number): number { + // When both BatchBlocks are empty, mean no blocks to enqueue and full synced, + // we are safe to update latestBufferHeight to this number + if (cleanedBatchBlocks.length === 0 && rawBatchBlocks.length === 0) { + return latestHeight; + } return Math.max(...cleanedBatchBlocks, ...rawBatchBlocks); } private filteredBlockBatch(currentBatchBlocks: number[]): number[] { diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index 797c663e43..0b31168c60 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- Sync with node-core, fixed modulo block ahead of finalized block issue (#2132) ### Changed - Use WorkerInMemoryCacheService from node core (#2125)