Skip to content

Commit

Permalink
Merge pull request #190 from utxostack/fix/healthcheck-n-queue
Browse files Browse the repository at this point in the history
fix: standardize BullMQ job options and add stalled interval
  • Loading branch information
ahonn authored Sep 24, 2024
2 parents 364b25a + 36ed274 commit e177ae3
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 30 deletions.
7 changes: 1 addition & 6 deletions backend/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { APP_FILTER } from '@nestjs/core';
import { CacheModule, CacheStore } from '@nestjs/cache-manager';
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { SentryGlobalGraphQLFilter, SentryModule } from '@sentry/nestjs/setup';
import { SentryModule } from '@sentry/nestjs/setup';
import type { RedisClientOptions } from 'redis';
import { redisStore } from 'cache-manager-redis-yet';
import { Env } from './env';
Expand Down Expand Up @@ -56,10 +55,6 @@ import { BootstrapService } from './bootstrap.service';
providers: [
AppController,
BootstrapService,
{
provide: APP_FILTER,
useClass: SentryGlobalGraphQLFilter,
},
],
controllers: [AppController],
})
Expand Down
32 changes: 9 additions & 23 deletions backend/src/core/indexer/indexer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,56 +19,42 @@ import {
} from './processor/transaction.processor';
import { IndexerHealthIndicator } from './indexer.health';

const commonAttemptsConfig: Pick<DefaultJobOptions, 'attempts' | 'backoff'> = {
const defaultJobOptions: DefaultJobOptions = {
attempts: 10,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: true,
removeOnFail: true,
};

@Global()
@Module({
imports: [
BullModule.registerQueue({
name: INDEXER_ASSETS_QUEUE,
defaultJobOptions: {
...commonAttemptsConfig,
},
defaultJobOptions,
}),
BullModule.registerQueue({
name: INDEXER_BLOCK_ASSETS_QUEUE,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
...commonAttemptsConfig,
},
defaultJobOptions,
}),
BullModule.registerQueue({
name: INDEXER_BLOCK_QUEUE,
defaultJobOptions: {
removeOnComplete: true,
...commonAttemptsConfig,
},
defaultJobOptions,
}),
BullModule.registerQueue({
name: INDEXER_TRANSACTION_QUEUE,
defaultJobOptions: {
removeOnComplete: true,
...commonAttemptsConfig,
},
defaultJobOptions,
}),
BullModule.registerQueue({
name: INDEXER_LOCK_QUEUE,
defaultJobOptions: {
...commonAttemptsConfig,
},
defaultJobOptions,
}),
BullModule.registerQueue({
name: INDEXER_TYPE_QUEUE,
defaultJobOptions: {
...commonAttemptsConfig,
},
defaultJobOptions,
}),
forwardRef(() => CoreModule),
],
Expand Down
1 change: 1 addition & 0 deletions backend/src/core/indexer/processor/assets.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const BATCH_SIZE = BI.from(400).toHexString();

@Processor(INDEXER_ASSETS_QUEUE, {
concurrency: 100,
stalledInterval: 60_000,
})
export class IndexerAssetsProcessor extends WorkerHost {
private logger = new Logger(IndexerAssetsProcessor.name);
Expand Down
2 changes: 2 additions & 0 deletions backend/src/core/indexer/processor/block-assets.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface IndexerBlockAssetsJobData {

@Processor(INDEXER_BLOCK_ASSETS_QUEUE, {
concurrency: 100,
stalledInterval: 60_000,
})
export class IndexerBlockAssetsProcessor extends WorkerHost {
private logger = new Logger(IndexerBlockAssetsProcessor.name);
Expand Down Expand Up @@ -61,6 +62,7 @@ export class IndexerBlockAssetsProcessor extends WorkerHost {
const assetTypeScripts = await this.prismaService.assetType.findMany({ where: { chainId } });
await Promise.all(
block.transactions.map(async (tx, txIndex) => {
await job.updateProgress((txIndex / block.transactions.length) * 100);
await this.updateInputAssetCellStatus(chainId, tx);

for (let index = 0; index < tx.outputs.length; index += 1) {
Expand Down
1 change: 1 addition & 0 deletions backend/src/core/indexer/processor/block.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export interface IndexerBlockJobData {

@Processor(INDEXER_BLOCK_QUEUE, {
concurrency: 100,
stalledInterval: 60_000,
})
export class IndexerBlockProcessor extends WorkerHost {
private logger = new Logger(IndexerBlockProcessor.name);
Expand Down
1 change: 1 addition & 0 deletions backend/src/core/indexer/processor/lock.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class IndexerLockError extends Error {

@Processor(INDEXER_LOCK_QUEUE, {
concurrency: 100,
stalledInterval: 60_000,
})
export class IndexerLockProcessor extends WorkerHost {
private logger = new Logger(IndexerLockProcessor.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface IndexerTransactionJobData {

@Processor(INDEXER_TRANSACTION_QUEUE, {
concurrency: 100,
stalledInterval: 60_000,
})
export class IndexerTransactionProcessor extends WorkerHost {
private logger = new Logger(IndexerTransactionProcessor.name);
Expand Down
1 change: 1 addition & 0 deletions backend/src/core/indexer/processor/type.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface IndexerTypeJobData {

@Processor(INDEXER_TYPE_QUEUE, {
concurrency: 100,
stalledInterval: 60_000,
})
export class IndexerTypeProcessor extends WorkerHost {
private logger = new Logger(IndexerTypeProcessor.name);
Expand Down
25 changes: 25 additions & 0 deletions backend/src/filters/all-exceptions.filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Catch, ArgumentsHost, HttpException } from '@nestjs/common';
import { HttpAdapterHost } from '@nestjs/core';
import { SentryGlobalGraphQLFilter } from '@sentry/nestjs/setup';

const SKIP_REQUEST_URLS = ['/health', '/version'];

@Catch()
export class AllExceptionsFilter extends SentryGlobalGraphQLFilter {
constructor(private readonly httpAdapterHost: HttpAdapterHost) {
super();
}

catch(exception: unknown, host: ArgumentsHost) {
const ctx = host.switchToHttp();
const request = ctx.getRequest();
if (SKIP_REQUEST_URLS.includes(request.url)) {
const response = (exception as HttpException).getResponse();
const status = (exception as HttpException).getStatus();
this.httpAdapterHost.httpAdapter.reply(ctx.getResponse(), response, status);
return;
}

super.catch(exception, host);
}
}
8 changes: 7 additions & 1 deletion backend/src/modules/api.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { join } from 'node:path';
import { ExecutionContext, Injectable, Module } from '@nestjs/common';
import { APP_GUARD, APP_INTERCEPTOR } from '@nestjs/core';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR } from '@nestjs/core';
import { ConfigService } from '@nestjs/config';
import { GqlExecutionContext, GraphQLModule } from '@nestjs/graphql';
import { DataLoaderInterceptor } from 'src/common/dataloader';
Expand All @@ -15,6 +15,8 @@ import { ComplexityPlugin } from './complexity.plugin';
import * as Sentry from '@sentry/nestjs';
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
import { FastifyReply, FastifyRequest } from 'fastify';
import { SentryGlobalGraphQLFilter } from '@sentry/nestjs/setup';
import { AllExceptionsFilter } from 'src/filters/all-exceptions.filter';

@Injectable()
export class GqlThrottlerGuard extends ThrottlerGuard {
Expand Down Expand Up @@ -77,6 +79,10 @@ export class GqlThrottlerGuard extends ThrottlerGuard {
provide: APP_INTERCEPTOR,
useClass: DataLoaderInterceptor,
},
{
provide: APP_FILTER,
useClass: AllExceptionsFilter,
},
ComplexityPlugin,
],
})
Expand Down

0 comments on commit e177ae3

Please sign in to comment.