Skip to content

Commit

Permalink
Merge pull request #204 from utxostack/fix/cache-n-dataloader
Browse files Browse the repository at this point in the history
refactor(indexer): improve asset indexing and health check
  • Loading branch information
ahonn authored Sep 26, 2024
2 parents 3ece2ef + 7b0d851 commit f42dc5b
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 13 deletions.
1 change: 1 addition & 0 deletions backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const BtcTestnetTypeMap: Record<NetworkType.testnet | NetworkType.signet,
};

export const CKB_MIN_SAFE_CONFIRMATIONS = 24;
export const CKB_ONE_DAY_BLOCKS = 8640;
export const CKB_CHAIN_ID = 1;

export const XUDT_TYPESCRIPTS = {
Expand Down
30 changes: 27 additions & 3 deletions backend/src/core/indexer/flow/assets.flow.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Logger } from '@nestjs/common';
import { AssetType, Chain } from '@prisma/client';
import { EventEmitter } from 'node:events';
import { CKB_MIN_SAFE_CONFIRMATIONS } from 'src/constants';
import { CKB_MIN_SAFE_CONFIRMATIONS, CKB_ONE_DAY_BLOCKS } from 'src/constants';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
Expand All @@ -24,6 +24,21 @@ export class IndexerAssetsFlow extends EventEmitter {
}

public async start() {
const latestAsset = await this.getLatestAsset();
if (latestAsset) {
this.logger.log(`Latest asset block number: ${latestAsset.blockNumber}`);
const tipBlockNumber = await this.blockchainService.getTipBlockNumber();
if (
tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS - latestAsset.blockNumber <
CKB_ONE_DAY_BLOCKS
) {
this.logger.log(`Latest asset is near tip block number, skip indexing assets...`);
this.startBlockAssetsIndexing();
this.setupBlockAssetsIndexedListener();
return;
}
}

const assetTypeScripts = await this.prismaService.assetType.findMany({
where: { chainId: this.chain.id },
});
Expand All @@ -32,6 +47,15 @@ export class IndexerAssetsFlow extends EventEmitter {
this.setupAssetIndexedListener(assetTypeScripts.length);
}

private async getLatestAsset() {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: this.chain.id },
orderBy: { blockNumber: 'desc' },
});
return latestAsset;
}

private async indexAssets(assetType: AssetType) {
const cursor = await this.indexerQueueService.getLatestAssetJobCursor(assetType);
if (cursor === '0x') {
Expand Down Expand Up @@ -62,7 +86,7 @@ export class IndexerAssetsFlow extends EventEmitter {
private async startBlockAssetsIndexing() {
const tipBlockNumber = await this.blockchainService.getTipBlockNumber();

let latestIndexedBlockNumber = await this.indexerQueueService.getLatestIndexedBlock(
let latestIndexedBlockNumber = await this.indexerQueueService.getLatestIndexedAssetsBlock(
this.chain.id,
);
if (!latestIndexedBlockNumber) {
Expand All @@ -75,7 +99,7 @@ export class IndexerAssetsFlow extends EventEmitter {
}
const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS;
if (targetBlockNumber <= latestIndexedBlockNumber) {
this.emit(IndexerAssetsEvent.BlockAssetsIndexed);
this.emit(IndexerAssetsEvent.BlockAssetsIndexed, latestIndexedBlockNumber);
return;
}

Expand Down
11 changes: 10 additions & 1 deletion backend/src/core/indexer/indexer.health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,16 @@ export class IndexerHealthIndicator extends HealthIndicator {
const blockchainService = this.blockchainServiceFactory.getService(CKB_CHAIN_ID);
const tipBlockNumber = await blockchainService.getTipBlockNumber();
const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS;
const currentBlockNumber = await this.indexerQueueService.getLatestIndexedBlock(CKB_CHAIN_ID);

let currentBlockNumber = await this.indexerQueueService.getLatestIndexedAssetsBlock(CKB_CHAIN_ID);
if (!currentBlockNumber) {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: CKB_CHAIN_ID },
orderBy: { blockNumber: 'desc' },
});
currentBlockNumber = latestAsset?.blockNumber;
}

const isHealthy =
!!currentBlockNumber && currentBlockNumber >= targetBlockNumber - INDEEXR_HEALTH_THRESHOLD;
Expand Down
8 changes: 6 additions & 2 deletions backend/src/core/indexer/indexer.queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,17 @@ export class IndexerQueueService {
await this.cacheManager.set(`${INDEXER_ASSETS_QUEUE}:${typeHash}`, cursor || '');
}

public async getLatestIndexedBlock(chainId: number) {
public async getLatestIndexedAssetsBlock(chainId: number) {
const blockNumber = await this.cacheManager.get<number>(
`${INDEXER_BLOCK_ASSETS_QUEUE}:${chainId}`,
);
return blockNumber;
}

public async setLatestIndexedAssetsBlock(chainId: number, blockNumber: number) {
await this.cacheManager.set(`${INDEXER_BLOCK_ASSETS_QUEUE}:${chainId}`, blockNumber);
}

public async addBlockAssetsJob(data: IndexerBlockAssetsJobData) {
const { chainId, blockNumber } = data;
const params = new URLSearchParams();
Expand All @@ -95,7 +99,7 @@ export class IndexerQueueService {
`Added block assets job ${jobId} for chain ${chainId} with block number ${blockNumber}`,
);
await this.blockAssetsQueue.add(jobId, data, { jobId });
await this.cacheManager.set(`${INDEXER_BLOCK_ASSETS_QUEUE}:${chainId}`, blockNumber);
await this.setLatestIndexedAssetsBlock(chainId, blockNumber);
}

public async addLockJob(data: IndexerLockJobData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class IndexerBlockAssetsProcessor extends WorkerHost {
} else {
const indexerServiceFactory = this.moduleRef.get(IndexerServiceFactory);
const indexerService = await indexerServiceFactory.getService(chainId);
indexerService.assetsFlow.emit(IndexerAssetsEvent.BlockAssetsIndexed, block);
indexerService.assetsFlow.emit(IndexerAssetsEvent.BlockAssetsIndexed, blockNumber);
return;
}
}
Expand Down
15 changes: 9 additions & 6 deletions backend/src/filters/all-exceptions.filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ export class AllExceptionsFilter extends SentryGlobalGraphQLFilter {
}

catch(exception: unknown, host: ArgumentsHost) {
const ctx = host.switchToHttp();
const request = ctx.getRequest();
if (SKIP_REQUEST_URLS.includes(request.url)) {
const response = (exception as HttpException).getResponse();
this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200);
return;
const type = host.getType();
if (type === 'http') {
const ctx = host.switchToHttp();
const request = ctx.getRequest();
if (SKIP_REQUEST_URLS.includes(request.url)) {
const response = (exception as HttpException).getResponse();
this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200);
return;
}
}

super.catch(exception, host);
Expand Down

0 comments on commit f42dc5b

Please sign in to comment.