From f4692d1367d8825e6a13499e9a5aee207237892d Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 31 Oct 2023 10:25:43 +1300 Subject: [PATCH 1/7] Fix modulo block ahead of finalized --- .../node-core/src/indexer/fetch.service.ts | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index d83f259bb4..3bf140fed1 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -216,10 +216,15 @@ export abstract class BaseFetchService< return moduloBlocks; } - getEnqueuedModuloBlocks(startBlockHeight: number): number[] { + /** + * + * @param startBlockHeight + * @param endBlockHeight is either FinalizedHeight or BestHeight, ensure ModuloBlocks not greater than this number + */ + getEnqueuedModuloBlocks(startBlockHeight: number, endBlockHeight: number): number[] { return this.getModuloBlocks( startBlockHeight, - this.nodeConfig.batchSize * Math.max(...this.getModulos()) + startBlockHeight + Math.min(this.nodeConfig.batchSize * Math.max(...this.getModulos()) + startBlockHeight, endBlockHeight) ).slice(0, this.nodeConfig.batchSize); } @@ -313,7 +318,7 @@ export abstract class BaseFetchService< const enqueuingBlocks = handlers.length && this.getModulos().length === handlers.length - ? this.getEnqueuedModuloBlocks(startBlockHeight) + ? this.getEnqueuedModuloBlocks(startBlockHeight, latestHeight) : range(startBlockHeight, endHeight + 1); await this.enqueueBlocks(enqueuingBlocks); @@ -321,6 +326,14 @@ export abstract class BaseFetchService< } private async enqueueBlocks(enqueuingBlocks: number[]): Promise { + // We check enqueuingBlocks length rather than cleanedBatchBlocks + // Because when bypass blocks, cleanedBatchBlocks can be [], but latestBufferHeight could be valid to update metadata + // See comments in blockDispatcher.enqueueBlocks method + if (!enqueuingBlocks.length) { + logger.info(`No blocks to enqueue at the moment.`); + await delay(10); + return; + } const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks); await this.blockDispatcher.enqueueBlocks( cleanedBatchBlocks, From 88ab401acdce5d24ef17c599b7d79379c737353a Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 31 Oct 2023 11:47:12 +1300 Subject: [PATCH 2/7] Fix and add change to node to trigger prerelease --- .../blockDispatcher/block-dispatcher.ts | 1 + .../node-core/src/indexer/fetch.service.ts | 36 ++++++++++++------- packages/node/CHANGELOG.md | 1 + 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index d82a71fbc6..6f9f532a40 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -95,6 +95,7 @@ export abstract class BlockDispatcher } logger.info(`Enqueueing blocks ${heights[0]}...${last(heights)}, total ${heights.length} blocks`); + // Those blocks will still be filtered in the handler this.queue.putMany(heights); this.latestBufferedHeight = latestBufferHeight ?? last(heights) ?? this.latestBufferedHeight; diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 3bf140fed1..ec9a579222 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -303,7 +303,7 @@ export abstract class BaseFetchService< } else { const maxBlockSize = Math.min(batchBlocks.length, this.blockDispatcher.freeSize); const enqueueBlocks = batchBlocks.slice(0, maxBlockSize); - await this.enqueueBlocks(enqueueBlocks); + await this.enqueueBlocks(enqueueBlocks, latestHeight); } continue; // skip nextBlockRange() way } @@ -321,27 +321,37 @@ export abstract class BaseFetchService< ? this.getEnqueuedModuloBlocks(startBlockHeight, latestHeight) : range(startBlockHeight, endHeight + 1); - await this.enqueueBlocks(enqueuingBlocks); + await this.enqueueBlocks(enqueuingBlocks, latestHeight); } } - private async enqueueBlocks(enqueuingBlocks: number[]): Promise { - // We check enqueuingBlocks length rather than cleanedBatchBlocks - // Because when bypass blocks, cleanedBatchBlocks can be [], but latestBufferHeight could be valid to update metadata - // See comments in blockDispatcher.enqueueBlocks method - if (!enqueuingBlocks.length) { - logger.info(`No blocks to enqueue at the moment.`); - await delay(10); - return; - } + /** + * + * @param enqueuingBlocks + * @param latestHeight ensure LatestBufferHeight get updated if enqueuingBlocks is empty + * @private + */ + private async enqueueBlocks(enqueuingBlocks: number[], latestHeight: number): Promise { const cleanedBatchBlocks = this.filteredBlockBatch(enqueuingBlocks); await this.blockDispatcher.enqueueBlocks( cleanedBatchBlocks, - this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks) + this.getLatestBufferHeight(cleanedBatchBlocks, enqueuingBlocks, latestHeight) ); } - private getLatestBufferHeight(cleanedBatchBlocks: number[], rawBatchBlocks: number[]): number { + /** + * + * @param cleanedBatchBlocks + * @param rawBatchBlocks + * @param latestHeight + * @private + */ + private getLatestBufferHeight(cleanedBatchBlocks: number[], rawBatchBlocks: number[], latestHeight: number): number { + // When both BatchBlocks are empty, mean no blocks to enqueue and full synced, + // we are safe to update latestBufferHeight to this number + if (cleanedBatchBlocks.length === 0 && rawBatchBlocks.length === 0) { + return latestHeight; + } return Math.max(...cleanedBatchBlocks, ...rawBatchBlocks); } private filteredBlockBatch(currentBatchBlocks: number[]): number[] { diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index 797c663e43..bf71b4c12c 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + ### Changed - Use WorkerInMemoryCacheService from node core (#2125) From a594b22e465a0aae3a4d9ba8b717d65ccc46e0cc Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 31 Oct 2023 11:58:25 +1300 Subject: [PATCH 3/7] add test --- packages/node-core/src/indexer/fetch.service.spec.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index cf7c5808bc..7c4ed1a4fb 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -363,6 +363,18 @@ describe('Fetch Service', () => { expect(enqueueBlocksSpy).toHaveBeenCalledWith([2, 3, 4, 6, 8, 9, 10, 12, 15, 18], 18); }); + it('update the LatestBufferHeight when modulo blocks full synced', async () => { + fetchService.modulos = [20]; + fetchService.finalizedHeight = 60; + + const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks'); + + // simulate we have synced to block 50, ande modulos is 20, next block to handle suppose be 70 + // we will still enqueue 60 to update LatestBufferHeight + await fetchService.init(50); + expect(enqueueBlocksSpy).toHaveBeenCalledWith([], 60); + }); + it('skips bypassBlocks', async () => { (fetchService as any).networkConfig.bypassBlocks = [3]; From d2c75bb2b0009445f2b5c22ca50b6dc5a6d1c4b9 Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 31 Oct 2023 11:58:43 +1300 Subject: [PATCH 4/7] add test --- packages/node-core/src/indexer/fetch.service.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index 7c4ed1a4fb..c50414960a 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -369,7 +369,7 @@ describe('Fetch Service', () => { const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks'); - // simulate we have synced to block 50, ande modulos is 20, next block to handle suppose be 70 + // simulate we have synced to block 50, and modulos is 20, next block to handle suppose be 70 // we will still enqueue 60 to update LatestBufferHeight await fetchService.init(50); expect(enqueueBlocksSpy).toHaveBeenCalledWith([], 60); From 7570bd553d72ef6c97a8c2bf8e2b78b405d0c779 Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 31 Oct 2023 11:59:00 +1300 Subject: [PATCH 5/7] add test --- packages/node-core/src/indexer/fetch.service.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index c50414960a..23280a9741 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -369,7 +369,7 @@ describe('Fetch Service', () => { const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks'); - // simulate we have synced to block 50, and modulos is 20, next block to handle suppose be 70 + // simulate we have synced to block 50, and modulo is 20, next block to handle suppose be 70 // we will still enqueue 60 to update LatestBufferHeight await fetchService.init(50); expect(enqueueBlocksSpy).toHaveBeenCalledWith([], 60); From e3570a778b92b3cb72d4c8449f15ccd4b1d89710 Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 31 Oct 2023 12:05:06 +1300 Subject: [PATCH 6/7] fix test --- packages/node-core/src/indexer/fetch.service.spec.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index 23280a9741..8a5f21bf6b 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -365,14 +365,14 @@ describe('Fetch Service', () => { it('update the LatestBufferHeight when modulo blocks full synced', async () => { fetchService.modulos = [20]; - fetchService.finalizedHeight = 60; + fetchService.finalizedHeight = 55; const enqueueBlocksSpy = jest.spyOn(blockDispatcher, 'enqueueBlocks'); - // simulate we have synced to block 50, and modulo is 20, next block to handle suppose be 70 - // we will still enqueue 60 to update LatestBufferHeight + // simulate we have synced to block 50, and modulo is 20, next block to handle suppose be 60,80,100... + // we will still enqueue 55 to update LatestBufferHeight await fetchService.init(50); - expect(enqueueBlocksSpy).toHaveBeenCalledWith([], 60); + expect(enqueueBlocksSpy).toHaveBeenLastCalledWith([], 55); }); it('skips bypassBlocks', async () => { From a64b4212adcad057a50db1366d9238a7f3415f9f Mon Sep 17 00:00:00 2001 From: JQQQ Date: Tue, 31 Oct 2023 13:12:37 +1300 Subject: [PATCH 7/7] add changelog --- packages/node-core/CHANGELOG.md | 4 ++++ packages/node/CHANGELOG.md | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/node-core/CHANGELOG.md b/packages/node-core/CHANGELOG.md index fa960f10cf..5de6d15d77 100644 --- a/packages/node-core/CHANGELOG.md +++ b/packages/node-core/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +### Fixed +- Fixed modulo block ahead of finalized block issue (#2132) + ### Added - WorkerInMemoryCacheService from node (#2125) - New `endBlock` option on datasources (#2064) diff --git a/packages/node/CHANGELOG.md b/packages/node/CHANGELOG.md index bf71b4c12c..0b31168c60 100644 --- a/packages/node/CHANGELOG.md +++ b/packages/node/CHANGELOG.md @@ -5,7 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] - +### Fixed +- Sync with node-core, fixed modulo block ahead of finalized block issue (#2132) ### Changed - Use WorkerInMemoryCacheService from node core (#2125)