Skip to content

Commit

Permalink
fix: update assets indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
ahonn committed Sep 30, 2024
1 parent 44ecc65 commit d8849cc
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 137 deletions.
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.2.3",
"@nestjs/core": "^10.0.0",
"@nestjs/event-emitter": "^2.0.4",
"@nestjs/graphql": "^12.2.0",
"@nestjs/platform-fastify": "^10.4.3",
"@nestjs/schedule": "^4.1.0",
Expand Down
2 changes: 2 additions & 0 deletions backend/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { BullModule } from '@nestjs/bullmq';
import configModule from './config';
import { AppController } from './app.controller';
import { BootstrapService } from './bootstrap.service';
import { EventEmitterModule } from '@nestjs/event-emitter';

@Module({
imports: [
Expand Down Expand Up @@ -46,6 +47,7 @@ import { BootstrapService } from './bootstrap.service';
},
inject: [ConfigService],
}),
EventEmitterModule.forRoot(),
ScheduleModule.forRoot(),
CoreModule,
ApiModule,
Expand Down
114 changes: 44 additions & 70 deletions backend/src/core/indexer/flow/assets.flow.ts
Original file line number Diff line number Diff line change
@@ -1,68 +1,51 @@
import { Logger } from '@nestjs/common';
import { AssetType, Chain } from '@prisma/client';
import { EventEmitter } from 'node:events';
import { CKB_MIN_SAFE_CONFIRMATIONS, CKB_ONE_DAY_BLOCKS } from 'src/constants';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
import { CKB_MIN_SAFE_CONFIRMATIONS } from 'src/constants';
import { CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';

export enum IndexerAssetsEvent {
AssetIndexed = 'asset-indexed',
BlockAssetsIndexed = 'block-assets-indexed',
}

export class IndexerAssetsFlow extends EventEmitter {
export class IndexerAssetsFlow {
private readonly logger = new Logger(IndexerAssetsFlow.name);

public static readonly Event = {
AssetIndexed: 'asset-indexed',
};

constructor(
private chain: Chain,
private indexerQueueService: IndexerQueueService,
private blockchainService: BlockchainService,
private prismaService: PrismaService,
private indexerQueueService: IndexerQueueService,
private schedulerRegistry: SchedulerRegistry,
) {
super();
public eventEmitter: EventEmitter2,
) { }

private async getLatestAssetBlockNumber() {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: this.chain.id },
orderBy: { blockNumber: 'desc' },
});
return latestAsset?.blockNumber ?? -1;
}

public async start() {
const latestAsset = await this.getLatestAsset();
if (latestAsset) {
this.logger.log(`Latest asset block number: ${latestAsset.blockNumber}`);
const tipBlockNumber = await this.blockchainService.getTipBlockNumber();
if (
tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS - latestAsset.blockNumber <
CKB_ONE_DAY_BLOCKS
) {
this.logger.log(`Latest asset is near tip block number, skip indexing assets...`);
this.setupBlockAssetsIndexedListener();
this.startBlockAssetsIndexing();
return;
}
}

const assetTypeScripts = await this.prismaService.assetType.findMany({
where: { chainId: this.chain.id },
});
this.logger.log(`Indexing ${assetTypeScripts.length} asset type scripts`);
assetTypeScripts.map((assetType) => this.indexAssets(assetType));
this.setupAssetIndexedListener(assetTypeScripts.length);
assetTypeScripts.map((assetType) => this.startAssetsIndexing(assetType));
}

private async getLatestAsset() {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: this.chain.id },
orderBy: { blockNumber: 'desc' },
});
return latestAsset;
}

private async indexAssets(assetType: AssetType) {
private async startAssetsIndexing(assetType: AssetType) {
const cursor = await this.indexerQueueService.getLatestAssetJobCursor(assetType);
if (cursor === '0x') {
this.emit(IndexerAssetsEvent.AssetIndexed, assetType);
this.eventEmitter.emit(IndexerAssetsFlow.Event.AssetIndexed, assetType);
return;
}
await this.indexerQueueService.addAssetJob({
Expand All @@ -78,12 +61,11 @@ export class IndexerAssetsFlow extends EventEmitter {
completed += 1;
this.logger.log(`Asset type ${assetType.codeHash} indexed`);
if (completed === totalAssetTypes) {
this.off(IndexerAssetsEvent.AssetIndexed, onAssetIndexed);
this.setupBlockAssetsIndexedListener();
this.eventEmitter.off(IndexerAssetsFlow.Event.AssetIndexed, onAssetIndexed);
this.startBlockAssetsIndexing();
}
};
this.on(IndexerAssetsEvent.AssetIndexed, onAssetIndexed);
this.eventEmitter.on(IndexerAssetsFlow.Event.AssetIndexed, onAssetIndexed);
}

private async startBlockAssetsIndexing() {
Expand All @@ -93,40 +75,32 @@ export class IndexerAssetsFlow extends EventEmitter {
this.chain.id,
);
if (!latestIndexedBlockNumber) {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: this.chain.id },
orderBy: { blockNumber: 'desc' },
});
latestIndexedBlockNumber = latestAsset!.blockNumber;
latestIndexedBlockNumber = await this.getLatestAssetBlockNumber();
}
const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS;
if (targetBlockNumber <= latestIndexedBlockNumber) {
this.logger.log(`Block assets are up to date, latest indexed block number: ${latestIndexedBlockNumber}`);
this.emit(IndexerAssetsEvent.BlockAssetsIndexed, latestIndexedBlockNumber);
return;
this.logger.log(`Block assets are up to date: ${latestIndexedBlockNumber}`);
} else {
await this.indexerQueueService.addBlockAssetsJob({
chainId: this.chain.id,
blockNumber: latestIndexedBlockNumber + 1,
targetBlockNumber,
});
}

await this.indexerQueueService.addBlockAssetsJob({
chainId: this.chain.id,
blockNumber: latestIndexedBlockNumber + 1,
targetBlockNumber,
});
this.setupBlockAssetsCronJob();
}

private setupBlockAssetsIndexedListener() {
private setupBlockAssetsCronJob() {
const cronJobName = `indexer-block-assets-${this.chain.id}-${process.pid}`;
this.on(IndexerAssetsEvent.BlockAssetsIndexed, () => {
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}

this.logger.log(`Scheduling block assets indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockAssetsIndexing();
});
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
this.logger.log(`Scheduling block assets indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockAssetsIndexing();
});
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
}
}
48 changes: 21 additions & 27 deletions backend/src/core/indexer/flow/transactions.flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,31 @@ import { Logger } from '@nestjs/common';
import { Chain } from '@prisma/client';
import { EventEmitter } from 'node:events';
import { CKB_MIN_SAFE_CONFIRMATIONS } from 'src/constants';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
import { ONE_DAY_MS } from 'src/common/date';
import { CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';

const CKB_24_HOURS_BLOCK_NUMBER = ONE_DAY_MS / 10000;

export enum IndexerTransactionsEvent {
BlockIndexed = 'block-indexed',
}

export class IndexerTransactionsFlow extends EventEmitter {
private readonly logger = new Logger(IndexerTransactionsFlow.name);

constructor(
private chain: Chain,
private indexerQueueService: IndexerQueueService,
private blockchainService: BlockchainService,
private prismaService: PrismaService,
private indexerQueueService: IndexerQueueService,
private schedulerRegistry: SchedulerRegistry,
public eventEmitter: EventEmitter2,
) {
super();
}

public async start() {
this.setupBlockIndexedListener();
this.startBlockIndexing();
}

Expand All @@ -46,31 +43,28 @@ export class IndexerTransactionsFlow extends EventEmitter {
startBlockNumber = Math.max(startBlockNumber, block.number + 1);
}

if (startBlockNumber >= targetBlockNumber) {
this.emit(IndexerTransactionsEvent.BlockIndexed);
return;
if (startBlockNumber < targetBlockNumber) {
this.logger.log(`Indexing blocks from ${startBlockNumber} to ${targetBlockNumber}`);
this.indexerQueueService.addBlockJob({
chainId: this.chain.id,
blockNumber: startBlockNumber,
targetBlockNumber,
});
}
this.logger.log(`Indexing blocks from ${startBlockNumber} to ${targetBlockNumber}`);
this.indexerQueueService.addBlockJob({
chainId: this.chain.id,
blockNumber: startBlockNumber,
targetBlockNumber,
});
this.setupBlockIndexCronJob();
}

private setupBlockIndexedListener() {
private setupBlockIndexCronJob() {
const cronJobName = `indexer-transactions-${this.chain.id}-${process.pid}`;
this.on(IndexerTransactionsEvent.BlockIndexed, () => {
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}

this.logger.log(`Scheduling block transactions indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockIndexing();
});
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
this.logger.log(`Scheduling block transactions indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockIndexing();
});
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
}
}
5 changes: 4 additions & 1 deletion backend/src/core/indexer/indexer.factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { BlockchainServiceFactory } from '../blockchain/blockchain.factory';
import { IndexerQueueService } from './indexer.queue';
import { ModuleRef } from '@nestjs/core';
import { SchedulerRegistry } from '@nestjs/schedule';
import { EventEmitter2 } from '@nestjs/event-emitter';

export class IndexerServiceFactoryError extends Error {
constructor(message: string) {
Expand All @@ -21,6 +22,7 @@ export class IndexerServiceFactory implements OnModuleDestroy {
private blockchainServiceFactory: BlockchainServiceFactory,
private prismaService: PrismaService,
private schedulerRegistry: SchedulerRegistry,
private eventEmitter: EventEmitter2,
private moduleRef: ModuleRef,
) {}

Expand All @@ -42,10 +44,11 @@ export class IndexerServiceFactory implements OnModuleDestroy {
const blockchainService = this.blockchainServiceFactory.getService(chain.id);
const service = new IndexerService(
chain,
indexerQueueService,
blockchainService,
this.prismaService,
indexerQueueService,
this.schedulerRegistry,
this.eventEmitter,
);
this.services.set(chain.id, service);
}
Expand Down
15 changes: 8 additions & 7 deletions backend/src/core/indexer/indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,41 @@ import { PrismaService } from '../database/prisma/prisma.service';
import { IndexerQueueService } from './indexer.queue';
import { IndexerTransactionsFlow } from './flow/transactions.flow';
import { SchedulerRegistry } from '@nestjs/schedule';
import { EventEmitter2 } from '@nestjs/event-emitter';

export class IndexerService {
public assetsFlow: IndexerAssetsFlow;
public transactionsFlow: IndexerTransactionsFlow;

constructor(
private chain: Chain,
private indexerQueueService: IndexerQueueService,
private blockchainService: BlockchainService,
private prismaService: PrismaService,
private indexerQueueService: IndexerQueueService,
private schedulerRegistry: SchedulerRegistry,
private eventEmitter: EventEmitter2,
) {
this.assetsFlow = new IndexerAssetsFlow(
this.chain,
this.indexerQueueService,
this.blockchainService,
this.prismaService,
this.indexerQueueService,
this.schedulerRegistry,
this.eventEmitter,
);
this.transactionsFlow = new IndexerTransactionsFlow(
this.chain,
this.indexerQueueService,
this.blockchainService,
this.prismaService,
this.indexerQueueService,
this.schedulerRegistry,
this.eventEmitter,
);
}

public async start() {
await this.indexerQueueService.moveActiveJobToDelay();
await Promise.all([
this.assetsFlow.start(),
this.transactionsFlow.start(),
]);
await Promise.all([this.assetsFlow.start(), this.transactionsFlow.start()]);
}

public async close() {
Expand Down
5 changes: 2 additions & 3 deletions backend/src/core/indexer/processor/assets.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { ModuleRef } from '@nestjs/core';
import { IndexerServiceFactory } from '../indexer.factory';
import { IndexerAssetsService } from '../service/assets.service';
import * as Sentry from '@sentry/node';
import { IndexerAssetsEvent } from '../flow/assets.flow';
import { IndexerAssetsFlow } from '../flow/assets.flow';

export const INDEXER_ASSETS_QUEUE = 'indexer-assets-queue';

Expand All @@ -24,7 +24,6 @@ const BATCH_SIZE = BI.from(400).toHexString();

@Processor(INDEXER_ASSETS_QUEUE, {
stalledInterval: 60_000,
useWorkerThreads: true,
})
export class IndexerAssetsProcessor extends WorkerHost {
private logger = new Logger(IndexerAssetsProcessor.name);
Expand Down Expand Up @@ -67,7 +66,7 @@ export class IndexerAssetsProcessor extends WorkerHost {
if (cursor === '0x') {
const indexerServiceFactory = this.moduleRef.get(IndexerServiceFactory);
const indexerService = await indexerServiceFactory.getService(chainId);
indexerService.assetsFlow.emit(IndexerAssetsEvent.AssetIndexed, assetType);
indexerService.assetsFlow.eventEmitter.emit(IndexerAssetsFlow.Event.AssetIndexed, assetType);
return;
}

Expand Down
Loading

0 comments on commit d8849cc

Please sign in to comment.