Skip to content

Commit

Permalink
Merge pull request #227 from utxostack/fix/assets-n-health
Browse files Browse the repository at this point in the history
feat: add latency to healthcheck and update assets indexer
  • Loading branch information
ahonn authored Sep 30, 2024
2 parents 83f7d5b + a05f8b5 commit 0f769c7
Show file tree
Hide file tree
Showing 17 changed files with 132 additions and 152 deletions.
3 changes: 2 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@utxo-stack-explorer/backend",
"version": "0.2.0",
"version": "0.2.1",
"description": "",
"author": "",
"private": true,
Expand Down Expand Up @@ -44,6 +44,7 @@
"@nestjs/common": "^10.0.0",
"@nestjs/config": "^3.2.3",
"@nestjs/core": "^10.0.0",
"@nestjs/event-emitter": "^2.0.4",
"@nestjs/graphql": "^12.2.0",
"@nestjs/platform-fastify": "^10.4.3",
"@nestjs/schedule": "^4.1.0",
Expand Down
2 changes: 2 additions & 0 deletions backend/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { BullModule } from '@nestjs/bullmq';
import configModule from './config';
import { AppController } from './app.controller';
import { BootstrapService } from './bootstrap.service';
import { EventEmitterModule } from '@nestjs/event-emitter';

@Module({
imports: [
Expand Down Expand Up @@ -46,6 +47,7 @@ import { BootstrapService } from './bootstrap.service';
},
inject: [ConfigService],
}),
EventEmitterModule.forRoot(),
ScheduleModule.forRoot(),
CoreModule,
ApiModule,
Expand Down
11 changes: 6 additions & 5 deletions backend/src/core/bitcoin-api/bitcoin-api.health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import * as Sentry from '@sentry/nestjs';

@Injectable()
export class BitcoinApiHealthIndicator extends HealthIndicator {
constructor(
private bitcoinApiService: BitcoinApiService,

) {
constructor(private bitcoinApiService: BitcoinApiService) {
super();
}

public async isHealthy(): Promise<HealthIndicatorResult> {
try {
const now = performance.now();
const info = await this.bitcoinApiService.getBlockchainInfo();
const isHealthy = !!info.blocks;
const result = this.getStatus('bitcoin-api', isHealthy, { info });
const result = this.getStatus('bitcoin-api', isHealthy, {
info,
latency: performance.now() - now,
});
if (isHealthy) {
return result;
}
Expand Down
7 changes: 3 additions & 4 deletions backend/src/core/ckb-explorer/ckb-explorer.health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@ import * as Sentry from '@sentry/nestjs';

@Injectable()
export class CkbExplorerHealthIndicator extends HealthIndicator {
constructor(
private ckbExplorerService: CkbExplorerService,

) {
constructor(private ckbExplorerService: CkbExplorerService) {
super();
}

public async isHealthy(): Promise<HealthIndicatorResult> {
try {
const now = performance.now();
const stats = await this.ckbExplorerService.getStatistics();
const isHealthy = !!stats.data.attributes.tip_block_number;
const result = this.getStatus('ckb-explorer', isHealthy, {
stats: stats.data.attributes,
latency: performance.now() - now,
});
if (isHealthy) {
return result;
Expand Down
6 changes: 5 additions & 1 deletion backend/src/core/ckb-rpc/ckb-rpc.health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ export class CkbRpcHealthIndicator extends HealthIndicator {
isHealthy: boolean;
result: HealthIndicatorResult;
}> {
const now = performance.now();
const tipBlockNumber = await this.ckbRpcWebsocketService.getTipBlockNumber();
const isHealthy = !!tipBlockNumber;
const result = this.getStatus('ckb-rpc.websocket', isHealthy, { tipBlockNumber });
const result = this.getStatus('ckb-rpc.websocket', isHealthy, {
tipBlockNumber,
latency: performance.now() - now,
});
return {
isHealthy,
result,
Expand Down
9 changes: 5 additions & 4 deletions backend/src/core/database/database.health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import { PrismaService } from './prisma/prisma.service';

@Injectable()
export class DatabaseHealthIndicator extends HealthIndicator {
constructor(
private prismaService: PrismaService,
) {
constructor(private prismaService: PrismaService) {
super();
}

public async isHealthy(): Promise<HealthIndicatorResult> {
try {
const now = performance.now();
await this.prismaService.$queryRaw`SELECT 1`;
return this.getStatus('database.prisma', true);
return this.getStatus('database.prisma', true, {
latency: performance.now() - now,
});
} catch (e) {
Sentry.captureException(e);
throw new HealthCheckError('BitcoinApiService failed', e);
Expand Down
114 changes: 44 additions & 70 deletions backend/src/core/indexer/flow/assets.flow.ts
Original file line number Diff line number Diff line change
@@ -1,68 +1,51 @@
import { Logger } from '@nestjs/common';
import { AssetType, Chain } from '@prisma/client';
import { EventEmitter } from 'node:events';
import { CKB_MIN_SAFE_CONFIRMATIONS, CKB_ONE_DAY_BLOCKS } from 'src/constants';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
import { CKB_MIN_SAFE_CONFIRMATIONS } from 'src/constants';
import { CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';

export enum IndexerAssetsEvent {
AssetIndexed = 'asset-indexed',
BlockAssetsIndexed = 'block-assets-indexed',
}

export class IndexerAssetsFlow extends EventEmitter {
export class IndexerAssetsFlow {
private readonly logger = new Logger(IndexerAssetsFlow.name);

public static readonly Event = {
AssetIndexed: 'asset-indexed',
};

constructor(
private chain: Chain,
private indexerQueueService: IndexerQueueService,
private blockchainService: BlockchainService,
private prismaService: PrismaService,
private indexerQueueService: IndexerQueueService,
private schedulerRegistry: SchedulerRegistry,
) {
super();
public eventEmitter: EventEmitter2,
) { }

private async getLatestAssetBlockNumber() {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: this.chain.id },
orderBy: { blockNumber: 'desc' },
});
return latestAsset?.blockNumber ?? -1;
}

public async start() {
const latestAsset = await this.getLatestAsset();
if (latestAsset) {
this.logger.log(`Latest asset block number: ${latestAsset.blockNumber}`);
const tipBlockNumber = await this.blockchainService.getTipBlockNumber();
if (
tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS - latestAsset.blockNumber <
CKB_ONE_DAY_BLOCKS
) {
this.logger.log(`Latest asset is near tip block number, skip indexing assets...`);
this.setupBlockAssetsIndexedListener();
this.startBlockAssetsIndexing();
return;
}
}

const assetTypeScripts = await this.prismaService.assetType.findMany({
where: { chainId: this.chain.id },
});
this.logger.log(`Indexing ${assetTypeScripts.length} asset type scripts`);
assetTypeScripts.map((assetType) => this.indexAssets(assetType));
this.setupAssetIndexedListener(assetTypeScripts.length);
assetTypeScripts.map((assetType) => this.startAssetsIndexing(assetType));
}

private async getLatestAsset() {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: this.chain.id },
orderBy: { blockNumber: 'desc' },
});
return latestAsset;
}

private async indexAssets(assetType: AssetType) {
private async startAssetsIndexing(assetType: AssetType) {
const cursor = await this.indexerQueueService.getLatestAssetJobCursor(assetType);
if (cursor === '0x') {
this.emit(IndexerAssetsEvent.AssetIndexed, assetType);
this.eventEmitter.emit(IndexerAssetsFlow.Event.AssetIndexed, assetType);
return;
}
await this.indexerQueueService.addAssetJob({
Expand All @@ -78,12 +61,11 @@ export class IndexerAssetsFlow extends EventEmitter {
completed += 1;
this.logger.log(`Asset type ${assetType.codeHash} indexed`);
if (completed === totalAssetTypes) {
this.off(IndexerAssetsEvent.AssetIndexed, onAssetIndexed);
this.setupBlockAssetsIndexedListener();
this.eventEmitter.off(IndexerAssetsFlow.Event.AssetIndexed, onAssetIndexed);
this.startBlockAssetsIndexing();
}
};
this.on(IndexerAssetsEvent.AssetIndexed, onAssetIndexed);
this.eventEmitter.on(IndexerAssetsFlow.Event.AssetIndexed, onAssetIndexed);
}

private async startBlockAssetsIndexing() {
Expand All @@ -93,40 +75,32 @@ export class IndexerAssetsFlow extends EventEmitter {
this.chain.id,
);
if (!latestIndexedBlockNumber) {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: this.chain.id },
orderBy: { blockNumber: 'desc' },
});
latestIndexedBlockNumber = latestAsset!.blockNumber;
latestIndexedBlockNumber = await this.getLatestAssetBlockNumber();
}
const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS;
if (targetBlockNumber <= latestIndexedBlockNumber) {
this.logger.log(`Block assets are up to date, latest indexed block number: ${latestIndexedBlockNumber}`);
this.emit(IndexerAssetsEvent.BlockAssetsIndexed, latestIndexedBlockNumber);
return;
this.logger.log(`Block assets are up to date: ${latestIndexedBlockNumber}`);
} else {
await this.indexerQueueService.addBlockAssetsJob({
chainId: this.chain.id,
blockNumber: latestIndexedBlockNumber + 1,
targetBlockNumber,
});
}

await this.indexerQueueService.addBlockAssetsJob({
chainId: this.chain.id,
blockNumber: latestIndexedBlockNumber + 1,
targetBlockNumber,
});
this.setupBlockAssetsCronJob();
}

private setupBlockAssetsIndexedListener() {
private setupBlockAssetsCronJob() {
const cronJobName = `indexer-block-assets-${this.chain.id}-${process.pid}`;
this.on(IndexerAssetsEvent.BlockAssetsIndexed, () => {
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}

this.logger.log(`Scheduling block assets indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockAssetsIndexing();
});
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
this.logger.log(`Scheduling block assets indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockAssetsIndexing();
});
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
}
}
48 changes: 21 additions & 27 deletions backend/src/core/indexer/flow/transactions.flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,31 @@ import { Logger } from '@nestjs/common';
import { Chain } from '@prisma/client';
import { EventEmitter } from 'node:events';
import { CKB_MIN_SAFE_CONFIRMATIONS } from 'src/constants';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
import { ONE_DAY_MS } from 'src/common/date';
import { CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';

const CKB_24_HOURS_BLOCK_NUMBER = ONE_DAY_MS / 10000;

export enum IndexerTransactionsEvent {
BlockIndexed = 'block-indexed',
}

export class IndexerTransactionsFlow extends EventEmitter {
private readonly logger = new Logger(IndexerTransactionsFlow.name);

constructor(
private chain: Chain,
private indexerQueueService: IndexerQueueService,
private blockchainService: BlockchainService,
private prismaService: PrismaService,
private indexerQueueService: IndexerQueueService,
private schedulerRegistry: SchedulerRegistry,
public eventEmitter: EventEmitter2,
) {
super();
}

public async start() {
this.setupBlockIndexedListener();
this.startBlockIndexing();
}

Expand All @@ -46,31 +43,28 @@ export class IndexerTransactionsFlow extends EventEmitter {
startBlockNumber = Math.max(startBlockNumber, block.number + 1);
}

if (startBlockNumber >= targetBlockNumber) {
this.emit(IndexerTransactionsEvent.BlockIndexed);
return;
if (startBlockNumber < targetBlockNumber) {
this.logger.log(`Indexing blocks from ${startBlockNumber} to ${targetBlockNumber}`);
this.indexerQueueService.addBlockJob({
chainId: this.chain.id,
blockNumber: startBlockNumber,
targetBlockNumber,
});
}
this.logger.log(`Indexing blocks from ${startBlockNumber} to ${targetBlockNumber}`);
this.indexerQueueService.addBlockJob({
chainId: this.chain.id,
blockNumber: startBlockNumber,
targetBlockNumber,
});
this.setupBlockIndexCronJob();
}

private setupBlockIndexedListener() {
private setupBlockIndexCronJob() {
const cronJobName = `indexer-transactions-${this.chain.id}-${process.pid}`;
this.on(IndexerTransactionsEvent.BlockIndexed, () => {
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}

this.logger.log(`Scheduling block transactions indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockIndexing();
});
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
this.logger.log(`Scheduling block transactions indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockIndexing();
});
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
}
}
Loading

0 comments on commit 0f769c7

Please sign in to comment.