From e7b8bdf2ed0d334a0e42db1a2842cb9ab32c718d Mon Sep 17 00:00:00 2001 From: ahonn Date: Sun, 29 Sep 2024 15:01:30 +1000 Subject: [PATCH 1/3] refactor(indexer): optimize initialization order and error handling --- .../src/core/blockchain/blockchain.service.ts | 8 +++++--- backend/src/core/indexer/flow/assets.flow.ts | 4 ++-- .../core/indexer/flow/transactions.flow.ts | 2 +- backend/src/core/indexer/indexer.health.ts | 20 ++++++++++--------- backend/src/filters/all-exceptions.filter.ts | 8 ++++++-- 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/backend/src/core/blockchain/blockchain.service.ts b/backend/src/core/blockchain/blockchain.service.ts index 86bc882d..6ff3ac60 100644 --- a/backend/src/core/blockchain/blockchain.service.ts +++ b/backend/src/core/blockchain/blockchain.service.ts @@ -37,6 +37,10 @@ export class BlockchainService { private chainPromise: Promise, ) { this.createConnection(); + + process.on('exit', () => { + this.close(); + }); } private createConnection() { @@ -211,9 +215,7 @@ export class BlockchainService { ): Promise { await this.websocketReady; this.logger.debug(`get_block_by_number - blockNumber: ${blockNumber}`); - const response = await this.call('get_block_by_number', [ - BI.from(blockNumber).toHexString(), - ]); + const response = await this.call('get_block_by_number', [BI.from(blockNumber).toHexString()]); const block = response as Block; if (!withTxData) { block.transactions = block.transactions.map((tx) => { diff --git a/backend/src/core/indexer/flow/assets.flow.ts b/backend/src/core/indexer/flow/assets.flow.ts index 8dae2c7f..3adea6fc 100644 --- a/backend/src/core/indexer/flow/assets.flow.ts +++ b/backend/src/core/indexer/flow/assets.flow.ts @@ -36,8 +36,8 @@ export class IndexerAssetsFlow extends EventEmitter { CKB_ONE_DAY_BLOCKS ) { this.logger.log(`Latest asset is near tip block number, skip indexing assets...`); - this.startBlockAssetsIndexing(); this.setupBlockAssetsIndexedListener(); + this.startBlockAssetsIndexing(); return; } } @@ -79,8 +79,8 @@ export class IndexerAssetsFlow extends EventEmitter { this.logger.log(`Asset type ${assetType.codeHash} indexed`); if (completed === totalAssetTypes) { this.off(IndexerAssetsEvent.AssetIndexed, onAssetIndexed); - this.startBlockAssetsIndexing(); this.setupBlockAssetsIndexedListener(); + this.startBlockAssetsIndexing(); } }; this.on(IndexerAssetsEvent.AssetIndexed, onAssetIndexed); diff --git a/backend/src/core/indexer/flow/transactions.flow.ts b/backend/src/core/indexer/flow/transactions.flow.ts index d2698b6c..55d4df0a 100644 --- a/backend/src/core/indexer/flow/transactions.flow.ts +++ b/backend/src/core/indexer/flow/transactions.flow.ts @@ -29,8 +29,8 @@ export class IndexerTransactionsFlow extends EventEmitter { } public async start() { - this.startBlockIndexing(); this.setupBlockIndexedListener(); + this.startBlockIndexing(); } public async startBlockIndexing() { diff --git a/backend/src/core/indexer/indexer.health.ts b/backend/src/core/indexer/indexer.health.ts index ac27f98a..923a9db0 100644 --- a/backend/src/core/indexer/indexer.health.ts +++ b/backend/src/core/indexer/indexer.health.ts @@ -61,15 +61,17 @@ export class IndexerHealthIndicator extends HealthIndicator { const tipBlockNumber = await blockchainService.getTipBlockNumber(); const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS; - let currentBlockNumber = await this.indexerQueueService.getLatestIndexedAssetsBlock(CKB_CHAIN_ID); - if (!currentBlockNumber) { - const latestAsset = await this.prismaService.asset.findFirst({ - select: { blockNumber: true }, - where: { chainId: CKB_CHAIN_ID }, - orderBy: { blockNumber: 'desc' }, - }); - currentBlockNumber = latestAsset?.blockNumber; - } + const latestIndexedAssetsBlock = + await this.indexerQueueService.getLatestIndexedAssetsBlock(CKB_CHAIN_ID); + const latestAsset = await this.prismaService.asset.findFirst({ + select: { blockNumber: true }, + where: { chainId: CKB_CHAIN_ID }, + orderBy: { blockNumber: 'desc' }, + }); + const currentBlockNumber = Math.max( + latestIndexedAssetsBlock ?? -1, + latestAsset?.blockNumber ?? -1, + ); const isHealthy = !!currentBlockNumber && currentBlockNumber >= targetBlockNumber - INDEEXR_HEALTH_THRESHOLD; diff --git a/backend/src/filters/all-exceptions.filter.ts b/backend/src/filters/all-exceptions.filter.ts index 309a7b08..8456679d 100644 --- a/backend/src/filters/all-exceptions.filter.ts +++ b/backend/src/filters/all-exceptions.filter.ts @@ -16,8 +16,12 @@ export class AllExceptionsFilter extends SentryGlobalGraphQLFilter { const ctx = host.switchToHttp(); const request = ctx.getRequest(); if (SKIP_REQUEST_URLS.includes(request.url)) { - const response = (exception as HttpException).getResponse(); - this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200); + const response = (exception as HttpException)?.getResponse(); + if (response) { + this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200); + return; + } + this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), exception, 500); return; } } From 31ecc6bc971a6421352578a3d53c25e22c6c739d Mon Sep 17 00:00:00 2001 From: ahonn Date: Sun, 29 Sep 2024 15:15:58 +1000 Subject: [PATCH 2/3] feat: add process pid to cron job name --- backend/src/core/indexer/flow/assets.flow.ts | 5 +++-- backend/src/core/indexer/flow/transactions.flow.ts | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/backend/src/core/indexer/flow/assets.flow.ts b/backend/src/core/indexer/flow/assets.flow.ts index 3adea6fc..89cc780a 100644 --- a/backend/src/core/indexer/flow/assets.flow.ts +++ b/backend/src/core/indexer/flow/assets.flow.ts @@ -114,8 +114,9 @@ export class IndexerAssetsFlow extends EventEmitter { } private setupBlockAssetsIndexedListener() { + const cronJobName = `indexer-block-assets-${this.chain.id}-${process.pid}`; this.on(IndexerAssetsEvent.BlockAssetsIndexed, () => { - if (this.schedulerRegistry.doesExist('cron', 'indexer-block-assets')) { + if (this.schedulerRegistry.doesExist('cron', cronJobName)) { return; } @@ -123,7 +124,7 @@ export class IndexerAssetsFlow extends EventEmitter { const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => { this.startBlockAssetsIndexing(); }); - this.schedulerRegistry.addCronJob('indexer-block-assets', job); + this.schedulerRegistry.addCronJob(cronJobName, job); job.start(); }); } diff --git a/backend/src/core/indexer/flow/transactions.flow.ts b/backend/src/core/indexer/flow/transactions.flow.ts index 55d4df0a..59dda498 100644 --- a/backend/src/core/indexer/flow/transactions.flow.ts +++ b/backend/src/core/indexer/flow/transactions.flow.ts @@ -59,8 +59,9 @@ export class IndexerTransactionsFlow extends EventEmitter { } private setupBlockIndexedListener() { + const cronJobName = `indexer-transactions-${this.chain.id}-${process.pid}`; this.on(IndexerTransactionsEvent.BlockIndexed, () => { - if (this.schedulerRegistry.doesExist('cron', 'indexer-transactions')) { + if (this.schedulerRegistry.doesExist('cron', cronJobName)) { return; } @@ -68,7 +69,7 @@ export class IndexerTransactionsFlow extends EventEmitter { const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => { this.startBlockIndexing(); }); - this.schedulerRegistry.addCronJob('indexer-transactions', job); + this.schedulerRegistry.addCronJob(cronJobName, job); job.start(); }); } From 968179d47096f6ddac03d1630e1e0684bff13234 Mon Sep 17 00:00:00 2001 From: ahonn Date: Sun, 29 Sep 2024 15:40:58 +1000 Subject: [PATCH 3/3] feat: add log --- backend/src/core/indexer/flow/assets.flow.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/core/indexer/flow/assets.flow.ts b/backend/src/core/indexer/flow/assets.flow.ts index 89cc780a..b2ce52da 100644 --- a/backend/src/core/indexer/flow/assets.flow.ts +++ b/backend/src/core/indexer/flow/assets.flow.ts @@ -102,6 +102,7 @@ export class IndexerAssetsFlow extends EventEmitter { } const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS; if (targetBlockNumber <= latestIndexedBlockNumber) { + this.logger.log(`Block assets are up to date, latest indexed block number: ${latestIndexedBlockNumber}`); this.emit(IndexerAssetsEvent.BlockAssetsIndexed, latestIndexedBlockNumber); return; }