diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 7392ea1269..45c2794357 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -3,21 +3,21 @@ import assert from 'assert'; -import { EventEmitter2, OnEvent } from '@nestjs/event-emitter'; -import { hexToU8a, u8aEq } from '@subql/utils'; -import { Transaction } from '@subql/x-sequelize'; -import { NodeConfig, IProjectUpgradeService } from '../../configure'; -import { AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload } from '../../events'; -import { getLogger } from '../../logger'; -import { monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite } from '../../process'; -import { IQueue, mainThreadOnly } from '../../utils'; -import { MonitorServiceInterface } from '../monitor.service'; -import { PoiBlock, PoiSyncService } from '../poi'; -import { SmartBatchService } from '../smartBatch.service'; -import { StoreService } from '../store.service'; -import { IStoreModelProvider } from '../storeModelProvider'; -import { IPoi } from '../storeModelProvider/poi'; -import { Header, IBlock, IProjectService, ISubqueryProject } from '../types'; +import {EventEmitter2, OnEvent} from '@nestjs/event-emitter'; +import {hexToU8a, u8aEq} from '@subql/utils'; +import {Transaction} from '@subql/x-sequelize'; +import {NodeConfig, IProjectUpgradeService} from '../../configure'; +import {AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload} from '../../events'; +import {getLogger} from '../../logger'; +import {monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process'; +import {IQueue, mainThreadOnly} from '../../utils'; +import {MonitorServiceInterface} from '../monitor.service'; +import {PoiBlock, PoiSyncService} from '../poi'; +import {SmartBatchService} from '../smartBatch.service'; +import {StoreService} from '../store.service'; +import {IStoreModelProvider} from '../storeModelProvider'; +import {IPoi} from '../storeModelProvider/poi'; +import {Header, IBlock, IProjectService, ISubqueryProject} from '../types'; const logger = getLogger('BaseBlockDispatcherService'); @@ -156,7 +156,7 @@ export abstract class BaseBlockDispatcher implements IB // Is called directly before a block is processed @mainThreadOnly() protected async preProcessBlock(header: Header): Promise { - const { blockHeight } = header; + const {blockHeight} = header; monitorCreateBlockStart(blockHeight); await this.storeService.setBlockHeader(header); @@ -172,8 +172,8 @@ export abstract class BaseBlockDispatcher implements IB // Is called directly after a block is processed @mainThreadOnly() protected async postProcessBlock(header: Header, processBlockResponse: ProcessBlockResponse): Promise { - const { blockHash, blockHeight: height } = header; - const { dynamicDsCreated, reindexBlockHeader: processReindexBlockHeader } = processBlockResponse; + const {blockHash, blockHeight: height} = header; + const {dynamicDsCreated, reindexBlockHeader: processReindexBlockHeader} = processBlockResponse; // Rewind height received from admin api have higher priority than processed reindexBlockHeight const reindexBlockHeader = this._pendingRewindHeader ?? processReindexBlockHeader; monitorWrite(`Finished block ${height}`); @@ -191,15 +191,15 @@ export abstract class BaseBlockDispatcher implements IB if (this.nodeConfig.proofOfIndex) { void this.poiSyncService.syncPoi(); } - this.eventEmitter.emit(IndexerEvent.RewindSuccess, { success: true, height: reindexBlockHeader.blockHeight }); + this.eventEmitter.emit(IndexerEvent.RewindSuccess, {success: true, height: reindexBlockHeader.blockHeight}); return; } catch (e: any) { - this.eventEmitter.emit(IndexerEvent.RewindFailure, { success: false, message: e.message }); + this.eventEmitter.emit(IndexerEvent.RewindFailure, {success: false, message: e.message}); monitorWrite(`***** Rewind failed: ${e.message}`); throw e; } } else { - this.updateStoreMetadata(height, header.timestamp, undefined, this.storeService.transaction); + await this.updateStoreMetadata(height, header.timestamp, undefined, this.storeService.transaction); const operationHash = this.storeService.getOperationMerkleRoot(); await this.createPOI(height, blockHash, operationHash, this.storeService.transaction); @@ -269,7 +269,7 @@ export abstract class BaseBlockDispatcher implements IB const poiBlock = PoiBlock.create(height, blockHash, operationHash, this.project.id); // This is the first creation of POI await this.poi.bulkUpsert([poiBlock], tx); - await this.storeModelProvider.metadata.setBulk([{ key: 'lastCreatedPoiHeight', value: height }], tx); + await this.storeModelProvider.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}], tx); this.eventEmitter.emit(PoiEvent.PoiTarget, { height, timestamp: Date.now(), @@ -277,13 +277,18 @@ export abstract class BaseBlockDispatcher implements IB } @mainThreadOnly() - private async updateStoreMetadata(height: number, blockTimestamp?: Date, updateProcessed = true, tx?: Transaction): Promise { + private async updateStoreMetadata( + height: number, + blockTimestamp?: Date, + updateProcessed = true, + tx?: Transaction + ): Promise { const meta = this.storeModelProvider.metadata; // Update store metadata await meta.setBulk( [ - { key: 'lastProcessedHeight', value: height }, - { key: 'lastProcessedTimestamp', value: Date.now() }, + {key: 'lastProcessedHeight', value: height}, + {key: 'lastProcessedTimestamp', value: Date.now()}, ], tx ); @@ -292,7 +297,7 @@ export abstract class BaseBlockDispatcher implements IB await meta.setIncrement('processedBlockCount', undefined, tx); } if (blockTimestamp) { - meta.set('lastProcessedBlockTimestamp', blockTimestamp.getTime()); + await meta.set('lastProcessedBlockTimestamp', blockTimestamp.getTime(), tx); } }