Skip to content

Commit

Permalink
Merge pull request #224 from utxostack/fix/indexer-assets
Browse files Browse the repository at this point in the history
fix: optimize cron job and error handling
  • Loading branch information
ahonn authored Sep 29, 2024
2 parents ca2ca10 + 968179d commit 83f7d5b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 21 deletions.
8 changes: 5 additions & 3 deletions backend/src/core/blockchain/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export class BlockchainService {
private chainPromise: Promise<Chain | null>,
) {
this.createConnection();

process.on('exit', () => {
this.close();
});
}

private createConnection() {
Expand Down Expand Up @@ -211,9 +215,7 @@ export class BlockchainService {
): Promise<Block> {
await this.websocketReady;
this.logger.debug(`get_block_by_number - blockNumber: ${blockNumber}`);
const response = await this.call('get_block_by_number', [
BI.from(blockNumber).toHexString(),
]);
const response = await this.call('get_block_by_number', [BI.from(blockNumber).toHexString()]);
const block = response as Block;
if (!withTxData) {
block.transactions = block.transactions.map((tx) => {
Expand Down
10 changes: 6 additions & 4 deletions backend/src/core/indexer/flow/assets.flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ export class IndexerAssetsFlow extends EventEmitter {
CKB_ONE_DAY_BLOCKS
) {
this.logger.log(`Latest asset is near tip block number, skip indexing assets...`);
this.startBlockAssetsIndexing();
this.setupBlockAssetsIndexedListener();
this.startBlockAssetsIndexing();
return;
}
}
Expand Down Expand Up @@ -79,8 +79,8 @@ export class IndexerAssetsFlow extends EventEmitter {
this.logger.log(`Asset type ${assetType.codeHash} indexed`);
if (completed === totalAssetTypes) {
this.off(IndexerAssetsEvent.AssetIndexed, onAssetIndexed);
this.startBlockAssetsIndexing();
this.setupBlockAssetsIndexedListener();
this.startBlockAssetsIndexing();
}
};
this.on(IndexerAssetsEvent.AssetIndexed, onAssetIndexed);
Expand All @@ -102,6 +102,7 @@ export class IndexerAssetsFlow extends EventEmitter {
}
const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS;
if (targetBlockNumber <= latestIndexedBlockNumber) {
this.logger.log(`Block assets are up to date, latest indexed block number: ${latestIndexedBlockNumber}`);
this.emit(IndexerAssetsEvent.BlockAssetsIndexed, latestIndexedBlockNumber);
return;
}
Expand All @@ -114,16 +115,17 @@ export class IndexerAssetsFlow extends EventEmitter {
}

private setupBlockAssetsIndexedListener() {
const cronJobName = `indexer-block-assets-${this.chain.id}-${process.pid}`;
this.on(IndexerAssetsEvent.BlockAssetsIndexed, () => {
if (this.schedulerRegistry.doesExist('cron', 'indexer-block-assets')) {
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}

this.logger.log(`Scheduling block assets indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockAssetsIndexing();
});
this.schedulerRegistry.addCronJob('indexer-block-assets', job);
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
});
}
Expand Down
7 changes: 4 additions & 3 deletions backend/src/core/indexer/flow/transactions.flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ export class IndexerTransactionsFlow extends EventEmitter {
}

public async start() {
this.startBlockIndexing();
this.setupBlockIndexedListener();
this.startBlockIndexing();
}

public async startBlockIndexing() {
Expand Down Expand Up @@ -59,16 +59,17 @@ export class IndexerTransactionsFlow extends EventEmitter {
}

private setupBlockIndexedListener() {
const cronJobName = `indexer-transactions-${this.chain.id}-${process.pid}`;
this.on(IndexerTransactionsEvent.BlockIndexed, () => {
if (this.schedulerRegistry.doesExist('cron', 'indexer-transactions')) {
if (this.schedulerRegistry.doesExist('cron', cronJobName)) {
return;
}

this.logger.log(`Scheduling block transactions indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockIndexing();
});
this.schedulerRegistry.addCronJob('indexer-transactions', job);
this.schedulerRegistry.addCronJob(cronJobName, job);
job.start();
});
}
Expand Down
20 changes: 11 additions & 9 deletions backend/src/core/indexer/indexer.health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,17 @@ export class IndexerHealthIndicator extends HealthIndicator {
const tipBlockNumber = await blockchainService.getTipBlockNumber();
const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS;

let currentBlockNumber = await this.indexerQueueService.getLatestIndexedAssetsBlock(CKB_CHAIN_ID);
if (!currentBlockNumber) {
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: CKB_CHAIN_ID },
orderBy: { blockNumber: 'desc' },
});
currentBlockNumber = latestAsset?.blockNumber;
}
const latestIndexedAssetsBlock =
await this.indexerQueueService.getLatestIndexedAssetsBlock(CKB_CHAIN_ID);
const latestAsset = await this.prismaService.asset.findFirst({
select: { blockNumber: true },
where: { chainId: CKB_CHAIN_ID },
orderBy: { blockNumber: 'desc' },
});
const currentBlockNumber = Math.max(
latestIndexedAssetsBlock ?? -1,
latestAsset?.blockNumber ?? -1,
);

const isHealthy =
!!currentBlockNumber && currentBlockNumber >= targetBlockNumber - INDEEXR_HEALTH_THRESHOLD;
Expand Down
8 changes: 6 additions & 2 deletions backend/src/filters/all-exceptions.filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ export class AllExceptionsFilter extends SentryGlobalGraphQLFilter {
const ctx = host.switchToHttp();
const request = ctx.getRequest();
if (SKIP_REQUEST_URLS.includes(request.url)) {
const response = (exception as HttpException).getResponse();
this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200);
const response = (exception as HttpException)?.getResponse();
if (response) {
this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, 200);
return;
}
this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), exception, 500);
return;
}
}
Expand Down

0 comments on commit 83f7d5b

Please sign in to comment.