From 7b0d851bbb516c30591bb4b135a43fa96bd921f2 Mon Sep 17 00:00:00 2001 From: ahonn Date: Thu, 26 Sep 2024 17:30:41 +1000 Subject: [PATCH] refactor(indexer): improve asset indexing and health check --- backend/src/constants.ts | 1 + backend/src/core/indexer/flow/assets.flow.ts | 30 +++++++++++++++++-- backend/src/core/indexer/indexer.health.ts | 11 ++++++- backend/src/core/indexer/indexer.queue.ts | 8 +++-- .../processor/block-assets.processor.ts | 2 +- backend/src/filters/all-exceptions.filter.ts | 15 ++++++---- 6 files changed, 54 insertions(+), 13 deletions(-) diff --git a/backend/src/constants.ts b/backend/src/constants.ts index c3263cab..1b0572f5 100644 --- a/backend/src/constants.ts +++ b/backend/src/constants.ts @@ -19,6 +19,7 @@ export const BtcTestnetTypeMap: Record= targetBlockNumber - INDEEXR_HEALTH_THRESHOLD; diff --git a/backend/src/core/indexer/indexer.queue.ts b/backend/src/core/indexer/indexer.queue.ts index d0f7beb1..55049c96 100644 --- a/backend/src/core/indexer/indexer.queue.ts +++ b/backend/src/core/indexer/indexer.queue.ts @@ -76,13 +76,17 @@ export class IndexerQueueService { await this.cacheManager.set(`${INDEXER_ASSETS_QUEUE}:${typeHash}`, cursor || ''); } - public async getLatestIndexedBlock(chainId: number) { + public async getLatestIndexedAssetsBlock(chainId: number) { const blockNumber = await this.cacheManager.get( `${INDEXER_BLOCK_ASSETS_QUEUE}:${chainId}`, ); return blockNumber; } + public async setLatestIndexedAssetsBlock(chainId: number, blockNumber: number) { + await this.cacheManager.set(`${INDEXER_BLOCK_ASSETS_QUEUE}:${chainId}`, blockNumber); + } + public async addBlockAssetsJob(data: IndexerBlockAssetsJobData) { const { chainId, blockNumber } = data; const params = new URLSearchParams(); @@ -95,7 +99,7 @@ export class IndexerQueueService { `Added block assets job ${jobId} for chain ${chainId} with block number ${blockNumber}`, ); await this.blockAssetsQueue.add(jobId, data, { jobId }); - await this.cacheManager.set(`${INDEXER_BLOCK_ASSETS_QUEUE}:${chainId}`, blockNumber); + await this.setLatestIndexedAssetsBlock(chainId, blockNumber); } public async addLockJob(data: IndexerLockJobData) { diff --git a/backend/src/core/indexer/processor/block-assets.processor.ts b/backend/src/core/indexer/processor/block-assets.processor.ts index b2b00818..f84daf86 100644 --- a/backend/src/core/indexer/processor/block-assets.processor.ts +++ b/backend/src/core/indexer/processor/block-assets.processor.ts @@ -107,7 +107,7 @@ export class IndexerBlockAssetsProcessor extends WorkerHost { } else { const indexerServiceFactory = this.moduleRef.get(IndexerServiceFactory); const indexerService = await indexerServiceFactory.getService(chainId); - indexerService.assetsFlow.emit(IndexerAssetsEvent.BlockAssetsIndexed, block); + indexerService.assetsFlow.emit(IndexerAssetsEvent.BlockAssetsIndexed, blockNumber); return; } } diff --git a/backend/src/filters/all-exceptions.filter.ts b/backend/src/filters/all-exceptions.filter.ts index 5f831d39..309a7b08 100644 --- a/backend/src/filters/all-exceptions.filter.ts +++ b/backend/src/filters/all-exceptions.filter.ts @@ -11,12 +11,15 @@ export class AllExceptionsFilter extends SentryGlobalGraphQLFilter { } catch(exception: unknown, host: ArgumentsHost) { - const ctx = host.switchToHttp(); - const request = ctx.getRequest(); - if (SKIP_REQUEST_URLS.includes(request.url)) { - const response = (exception as HttpException).getResponse(); - this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200); - return; + const type = host.getType(); + if (type === 'http') { + const ctx = host.switchToHttp(); + const request = ctx.getRequest(); + if (SKIP_REQUEST_URLS.includes(request.url)) { + const response = (exception as HttpException).getResponse(); + this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200); + return; + } } super.catch(exception, host);