diff --git a/backend/src/core/blockchain/blockchain.service.ts b/backend/src/core/blockchain/blockchain.service.ts index 86bc882d..6ff3ac60 100644 --- a/backend/src/core/blockchain/blockchain.service.ts +++ b/backend/src/core/blockchain/blockchain.service.ts @@ -37,6 +37,10 @@ export class BlockchainService { private chainPromise: Promise, ) { this.createConnection(); + + process.on('exit', () => { + this.close(); + }); } private createConnection() { @@ -211,9 +215,7 @@ export class BlockchainService { ): Promise { await this.websocketReady; this.logger.debug(`get_block_by_number - blockNumber: ${blockNumber}`); - const response = await this.call('get_block_by_number', [ - BI.from(blockNumber).toHexString(), - ]); + const response = await this.call('get_block_by_number', [BI.from(blockNumber).toHexString()]); const block = response as Block; if (!withTxData) { block.transactions = block.transactions.map((tx) => { diff --git a/backend/src/core/indexer/flow/assets.flow.ts b/backend/src/core/indexer/flow/assets.flow.ts index 8dae2c7f..b2ce52da 100644 --- a/backend/src/core/indexer/flow/assets.flow.ts +++ b/backend/src/core/indexer/flow/assets.flow.ts @@ -36,8 +36,8 @@ export class IndexerAssetsFlow extends EventEmitter { CKB_ONE_DAY_BLOCKS ) { this.logger.log(`Latest asset is near tip block number, skip indexing assets...`); - this.startBlockAssetsIndexing(); this.setupBlockAssetsIndexedListener(); + this.startBlockAssetsIndexing(); return; } } @@ -79,8 +79,8 @@ export class IndexerAssetsFlow extends EventEmitter { this.logger.log(`Asset type ${assetType.codeHash} indexed`); if (completed === totalAssetTypes) { this.off(IndexerAssetsEvent.AssetIndexed, onAssetIndexed); - this.startBlockAssetsIndexing(); this.setupBlockAssetsIndexedListener(); + this.startBlockAssetsIndexing(); } }; this.on(IndexerAssetsEvent.AssetIndexed, onAssetIndexed); @@ -102,6 +102,7 @@ export class IndexerAssetsFlow extends EventEmitter { } const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS; if (targetBlockNumber <= latestIndexedBlockNumber) { + this.logger.log(`Block assets are up to date, latest indexed block number: ${latestIndexedBlockNumber}`); this.emit(IndexerAssetsEvent.BlockAssetsIndexed, latestIndexedBlockNumber); return; } @@ -114,8 +115,9 @@ export class IndexerAssetsFlow extends EventEmitter { } private setupBlockAssetsIndexedListener() { + const cronJobName = `indexer-block-assets-${this.chain.id}-${process.pid}`; this.on(IndexerAssetsEvent.BlockAssetsIndexed, () => { - if (this.schedulerRegistry.doesExist('cron', 'indexer-block-assets')) { + if (this.schedulerRegistry.doesExist('cron', cronJobName)) { return; } @@ -123,7 +125,7 @@ export class IndexerAssetsFlow extends EventEmitter { const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => { this.startBlockAssetsIndexing(); }); - this.schedulerRegistry.addCronJob('indexer-block-assets', job); + this.schedulerRegistry.addCronJob(cronJobName, job); job.start(); }); } diff --git a/backend/src/core/indexer/flow/transactions.flow.ts b/backend/src/core/indexer/flow/transactions.flow.ts index d2698b6c..59dda498 100644 --- a/backend/src/core/indexer/flow/transactions.flow.ts +++ b/backend/src/core/indexer/flow/transactions.flow.ts @@ -29,8 +29,8 @@ export class IndexerTransactionsFlow extends EventEmitter { } public async start() { - this.startBlockIndexing(); this.setupBlockIndexedListener(); + this.startBlockIndexing(); } public async startBlockIndexing() { @@ -59,8 +59,9 @@ export class IndexerTransactionsFlow extends EventEmitter { } private setupBlockIndexedListener() { + const cronJobName = `indexer-transactions-${this.chain.id}-${process.pid}`; this.on(IndexerTransactionsEvent.BlockIndexed, () => { - if (this.schedulerRegistry.doesExist('cron', 'indexer-transactions')) { + if (this.schedulerRegistry.doesExist('cron', cronJobName)) { return; } @@ -68,7 +69,7 @@ export class IndexerTransactionsFlow extends EventEmitter { const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => { this.startBlockIndexing(); }); - this.schedulerRegistry.addCronJob('indexer-transactions', job); + this.schedulerRegistry.addCronJob(cronJobName, job); job.start(); }); } diff --git a/backend/src/core/indexer/indexer.health.ts b/backend/src/core/indexer/indexer.health.ts index ac27f98a..923a9db0 100644 --- a/backend/src/core/indexer/indexer.health.ts +++ b/backend/src/core/indexer/indexer.health.ts @@ -61,15 +61,17 @@ export class IndexerHealthIndicator extends HealthIndicator { const tipBlockNumber = await blockchainService.getTipBlockNumber(); const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS; - let currentBlockNumber = await this.indexerQueueService.getLatestIndexedAssetsBlock(CKB_CHAIN_ID); - if (!currentBlockNumber) { - const latestAsset = await this.prismaService.asset.findFirst({ - select: { blockNumber: true }, - where: { chainId: CKB_CHAIN_ID }, - orderBy: { blockNumber: 'desc' }, - }); - currentBlockNumber = latestAsset?.blockNumber; - } + const latestIndexedAssetsBlock = + await this.indexerQueueService.getLatestIndexedAssetsBlock(CKB_CHAIN_ID); + const latestAsset = await this.prismaService.asset.findFirst({ + select: { blockNumber: true }, + where: { chainId: CKB_CHAIN_ID }, + orderBy: { blockNumber: 'desc' }, + }); + const currentBlockNumber = Math.max( + latestIndexedAssetsBlock ?? -1, + latestAsset?.blockNumber ?? -1, + ); const isHealthy = !!currentBlockNumber && currentBlockNumber >= targetBlockNumber - INDEEXR_HEALTH_THRESHOLD; diff --git a/backend/src/filters/all-exceptions.filter.ts b/backend/src/filters/all-exceptions.filter.ts index 309a7b08..8456679d 100644 --- a/backend/src/filters/all-exceptions.filter.ts +++ b/backend/src/filters/all-exceptions.filter.ts @@ -16,8 +16,12 @@ export class AllExceptionsFilter extends SentryGlobalGraphQLFilter { const ctx = host.switchToHttp(); const request = ctx.getRequest(); if (SKIP_REQUEST_URLS.includes(request.url)) { - const response = (exception as HttpException).getResponse(); - this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200); + const response = (exception as HttpException)?.getResponse(); + if (response) { + this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200); + return; + } + this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), exception, 500); return; } }