Skip to content

Commit

Permalink
fix db store issues
Browse files Browse the repository at this point in the history
  • Loading branch information
yoozo committed Nov 22, 2024
1 parent a19ae83 commit 5a42faa
Showing 1 changed file with 31 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@

import assert from 'assert';

import { EventEmitter2, OnEvent } from '@nestjs/event-emitter';
import { hexToU8a, u8aEq } from '@subql/utils';
import { Transaction } from '@subql/x-sequelize';
import { NodeConfig, IProjectUpgradeService } from '../../configure';
import { AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload } from '../../events';
import { getLogger } from '../../logger';
import { monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite } from '../../process';
import { IQueue, mainThreadOnly } from '../../utils';
import { MonitorServiceInterface } from '../monitor.service';
import { PoiBlock, PoiSyncService } from '../poi';
import { SmartBatchService } from '../smartBatch.service';
import { StoreService } from '../store.service';
import { IStoreModelProvider } from '../storeModelProvider';
import { IPoi } from '../storeModelProvider/poi';
import { Header, IBlock, IProjectService, ISubqueryProject } from '../types';
import {EventEmitter2, OnEvent} from '@nestjs/event-emitter';
import {hexToU8a, u8aEq} from '@subql/utils';
import {Transaction} from '@subql/x-sequelize';
import {NodeConfig, IProjectUpgradeService} from '../../configure';
import {AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload} from '../../events';
import {getLogger} from '../../logger';
import {monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process';
import {IQueue, mainThreadOnly} from '../../utils';
import {MonitorServiceInterface} from '../monitor.service';
import {PoiBlock, PoiSyncService} from '../poi';
import {SmartBatchService} from '../smartBatch.service';
import {StoreService} from '../store.service';
import {IStoreModelProvider} from '../storeModelProvider';
import {IPoi} from '../storeModelProvider/poi';
import {Header, IBlock, IProjectService, ISubqueryProject} from '../types';

const logger = getLogger('BaseBlockDispatcherService');

Expand Down Expand Up @@ -156,7 +156,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
// Is called directly before a block is processed
@mainThreadOnly()
protected async preProcessBlock(header: Header): Promise<void> {
const { blockHeight } = header;
const {blockHeight} = header;
monitorCreateBlockStart(blockHeight);
await this.storeService.setBlockHeader(header);

Expand All @@ -172,8 +172,8 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
// Is called directly after a block is processed
@mainThreadOnly()
protected async postProcessBlock(header: Header, processBlockResponse: ProcessBlockResponse): Promise<void> {
const { blockHash, blockHeight: height } = header;
const { dynamicDsCreated, reindexBlockHeader: processReindexBlockHeader } = processBlockResponse;
const {blockHash, blockHeight: height} = header;
const {dynamicDsCreated, reindexBlockHeader: processReindexBlockHeader} = processBlockResponse;
// Rewind height received from admin api have higher priority than processed reindexBlockHeight
const reindexBlockHeader = this._pendingRewindHeader ?? processReindexBlockHeader;
monitorWrite(`Finished block ${height}`);
Expand All @@ -191,15 +191,15 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
if (this.nodeConfig.proofOfIndex) {
void this.poiSyncService.syncPoi();
}
this.eventEmitter.emit(IndexerEvent.RewindSuccess, { success: true, height: reindexBlockHeader.blockHeight });
this.eventEmitter.emit(IndexerEvent.RewindSuccess, {success: true, height: reindexBlockHeader.blockHeight});
return;
} catch (e: any) {
this.eventEmitter.emit(IndexerEvent.RewindFailure, { success: false, message: e.message });
this.eventEmitter.emit(IndexerEvent.RewindFailure, {success: false, message: e.message});
monitorWrite(`***** Rewind failed: ${e.message}`);
throw e;
}
} else {
this.updateStoreMetadata(height, header.timestamp, undefined, this.storeService.transaction);
await this.updateStoreMetadata(height, header.timestamp, undefined, this.storeService.transaction);

const operationHash = this.storeService.getOperationMerkleRoot();
await this.createPOI(height, blockHash, operationHash, this.storeService.transaction);
Expand Down Expand Up @@ -269,21 +269,26 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
const poiBlock = PoiBlock.create(height, blockHash, operationHash, this.project.id);
// This is the first creation of POI
await this.poi.bulkUpsert([poiBlock], tx);
await this.storeModelProvider.metadata.setBulk([{ key: 'lastCreatedPoiHeight', value: height }], tx);
await this.storeModelProvider.metadata.setBulk([{key: 'lastCreatedPoiHeight', value: height}], tx);
this.eventEmitter.emit(PoiEvent.PoiTarget, {
height,
timestamp: Date.now(),
});
}

@mainThreadOnly()
private async updateStoreMetadata(height: number, blockTimestamp?: Date, updateProcessed = true, tx?: Transaction): Promise<void> {
private async updateStoreMetadata(
height: number,
blockTimestamp?: Date,
updateProcessed = true,
tx?: Transaction
): Promise<void> {
const meta = this.storeModelProvider.metadata;
// Update store metadata
await meta.setBulk(
[
{ key: 'lastProcessedHeight', value: height },
{ key: 'lastProcessedTimestamp', value: Date.now() },
{key: 'lastProcessedHeight', value: height},
{key: 'lastProcessedTimestamp', value: Date.now()},
],
tx
);
Expand All @@ -292,7 +297,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
await meta.setIncrement('processedBlockCount', undefined, tx);
}
if (blockTimestamp) {
meta.set('lastProcessedBlockTimestamp', blockTimestamp.getTime());
await meta.set('lastProcessedBlockTimestamp', blockTimestamp.getTime(), tx);
}
}

Expand Down

0 comments on commit 5a42faa

Please sign in to comment.