Skip to content

Commit

Permalink
Merge pull request #220 from utxostack/develop
Browse files Browse the repository at this point in the history
Merge develop to main (20240927)
  • Loading branch information
Vibes-INS authored Sep 27, 2024
2 parents 215f6ab + ca2ca10 commit fc2b74f
Show file tree
Hide file tree
Showing 37 changed files with 796 additions and 498 deletions.
4 changes: 2 additions & 2 deletions backend/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@utxo-stack-explorer/backend",
"version": "0.1.1",
"version": "0.2.0",
"description": "",
"author": "",
"private": true,
Expand Down Expand Up @@ -68,13 +68,13 @@
"graphql-query-complexity": "^1.0.0",
"ioredis": "^5.4.1",
"lodash": "^4.17.21",
"nestjs-cacheable": "^1.0.0",
"p-limit": "^3.1.0",
"prisma": "^5.16.2",
"redis": "^4.6.7",
"reflect-metadata": "^0.2.0",
"rpc-websockets": "^7.11.2",
"rxjs": "^7.8.1",
"serialize-javascript": "^6.0.2",
"ws": "^8.18.0",
"zod": "^3.23.8"
},
Expand Down
2 changes: 0 additions & 2 deletions backend/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { redisStore } from 'cache-manager-redis-yet';
import { Env } from './env';
import { CoreModule } from './core/core.module';
import { ApiModule } from './modules/api.module';
import { CacheableModule } from 'nestjs-cacheable';
import { ScheduleModule } from '@nestjs/schedule';
import { BullModule } from '@nestjs/bullmq';
import configModule from './config';
Expand All @@ -18,7 +17,6 @@ import { BootstrapService } from './bootstrap.service';
imports: [
configModule,
SentryModule.forRoot(),
CacheableModule.register(),
CacheModule.registerAsync<RedisClientOptions>({
isGlobal: true,
imports: [ConfigModule],
Expand Down
2 changes: 2 additions & 0 deletions backend/src/core/bitcoin-api/bitcoin-api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { ChainInfo, Transaction } from './bitcoin-api.schema';
import { ONE_HOUR_MS, ONE_MONTH_MS, TEN_MINUTES_MS } from 'src/common/date';
import { Cacheable } from 'src/decorators/cacheable.decorator';
import * as Sentry from '@sentry/nestjs';
import { PLimit } from 'src/decorators/plimit.decorator';

type MethodParameters<T, K extends keyof T> = T[K] extends (...args: infer P) => any ? P : never;
type MethodReturnType<T, K extends keyof T> = T[K] extends (...args: any[]) => infer R ? R : never;
Expand Down Expand Up @@ -87,6 +88,7 @@ export class BitcoinApiService {
}
}

@PLimit({ concurrency: 200 })
private async call<K extends keyof IBitcoinDataProvider>(
method: K,
...args: MethodParameters<IBitcoinDataProvider, K>
Expand Down
33 changes: 26 additions & 7 deletions backend/src/core/blockchain/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { ONE_MONTH_MS } from 'src/common/date';
import { CKB_MIN_SAFE_CONFIRMATIONS } from 'src/constants';
import * as Sentry from '@sentry/nestjs';
import { Chain } from '@prisma/client';
import { PLimit } from 'src/decorators/plimit.decorator';

class WebsocketError extends Error {
constructor(message: string) {
Expand Down Expand Up @@ -70,6 +71,12 @@ export class BlockchainService {
});
}

@PLimit({ concurrency: 200 })
private async call(method: string, params: any[]): Promise<any> {
await this.websocketReady;
return this.websocket.call(method, params);
}

private handleReconnection() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
Expand Down Expand Up @@ -122,7 +129,7 @@ export class BlockchainService {
): Promise<TransactionWithStatusResponse> {
await this.websocketReady;
this.logger.debug(`get_transaction - txHash: ${txHash}`);
const response = await this.websocket.call('get_transaction', [txHash]);
const response = await this.call('get_transaction', [txHash]);
const tx = response as TransactionWithStatusResponse;
// XXX: we don't need these fields by default, remove them to save cache/memory space
if (!withData) {
Expand Down Expand Up @@ -162,7 +169,7 @@ export class BlockchainService {
): Promise<Block> {
await this.websocketReady;
this.logger.debug(`get_block - blockHash: ${blockHash}`);
const response = await this.websocket.call('get_block', [blockHash]);
const response = await this.call('get_block', [blockHash]);
const block = response as Block;
if (!withTxData) {
block.transactions = block.transactions.map((tx) => {
Expand Down Expand Up @@ -204,7 +211,7 @@ export class BlockchainService {
): Promise<Block> {
await this.websocketReady;
this.logger.debug(`get_block_by_number - blockNumber: ${blockNumber}`);
const response = await this.websocket.call('get_block_by_number', [
const response = await this.call('get_block_by_number', [
BI.from(blockNumber).toHexString(),
]);
const block = response as Block;
Expand All @@ -231,7 +238,7 @@ export class BlockchainService {
public async getBlockEconomicState(blockHash: string): Promise<BlockEconomicState> {
await this.websocketReady;
this.logger.debug(`get_block_economic_state - blockHash: ${blockHash}`);
const blockEconomicState = await this.websocket.call('get_block_economic_state', [blockHash]);
const blockEconomicState = await this.call('get_block_economic_state', [blockHash]);
return blockEconomicState as BlockEconomicState;
}

Expand All @@ -244,10 +251,16 @@ export class BlockchainService {
public async getTipBlockNumber(): Promise<number> {
await this.websocketReady;
this.logger.debug('get_tip_block_number');
const tipBlockNumber = await this.websocket.call('get_tip_block_number', []);
const tipBlockNumber = await this.call('get_tip_block_number', []);
return BI.from(tipBlockNumber).toNumber();
}

@Cacheable({
namespace: 'BlockchainService',
key: (searchKey: SearchKey, order: 'asc' | 'desc', limit: string, after?: string) =>
`getTransactions:${JSON.stringify(searchKey)}:${order}:${limit}:${after}`,
ttl: 10_000,
})
public async getTransactions(
searchKey: SearchKey,
order: 'asc' | 'desc',
Expand All @@ -258,11 +271,17 @@ export class BlockchainService {
this.logger.debug(
`get_transactions - searchKey: ${JSON.stringify(searchKey)}, order: ${order}, limit: ${limit}, after: ${after}`,
);
const result = await this.websocket.call('get_transactions', [searchKey, order, limit, after]);
const result = await this.call('get_transactions', [searchKey, order, limit, after]);
const transactions = result as GetTransactionsResult;
return transactions;
}

@Cacheable({
namespace: 'BlockchainService',
key: (searchKey: SearchKey, order: 'asc' | 'desc', limit: string, after?: string) =>
`getCells:${JSON.stringify(searchKey)}:${order}:${limit}:${after}`,
ttl: 10_000,
})
public async getCells(
searchKey: SearchKey,
order: 'asc' | 'desc',
Expand All @@ -274,7 +293,7 @@ export class BlockchainService {
this.logger.debug(
`get_cells - searchKey: ${JSON.stringify(searchKey)}, order: ${order}, limit: ${limit}, after: ${after}`,
);
const result = await this.websocket.call('get_cells', [searchKey, order, limit, after]);
const result = await this.call('get_cells', [searchKey, order, limit, after]);
const cells = result as GetCellsResult;
cells.objects = cells.objects.map((cell) => {
if (!withData) {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/core/core.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import { Env } from 'src/env';
import { Transaction } from './blockchain/blockchain.interface';
import { BlockchainServiceFactory } from './blockchain/blockchain.factory';
import { LeapDirection } from '@prisma/client';
import { Cacheable } from 'nestjs-cacheable';
import { ONE_MONTH_MS } from 'src/common/date';
import { Cacheable } from 'src/decorators/cacheable.decorator';

export const CELLBASE_TX_HASH =
'0x0000000000000000000000000000000000000000000000000000000000000000';
Expand Down
14 changes: 13 additions & 1 deletion backend/src/core/indexer/flow/assets.flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { CKB_MIN_SAFE_CONFIRMATIONS, CKB_ONE_DAY_BLOCKS } from 'src/constants';
import { BlockchainService } from 'src/core/blockchain/blockchain.service';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
import { CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';

export enum IndexerAssetsEvent {
AssetIndexed = 'asset-indexed',
Expand All @@ -19,6 +21,7 @@ export class IndexerAssetsFlow extends EventEmitter {
private blockchainService: BlockchainService,
private prismaService: PrismaService,
private indexerQueueService: IndexerQueueService,
private schedulerRegistry: SchedulerRegistry,
) {
super();
}
Expand Down Expand Up @@ -112,7 +115,16 @@ export class IndexerAssetsFlow extends EventEmitter {

private setupBlockAssetsIndexedListener() {
this.on(IndexerAssetsEvent.BlockAssetsIndexed, () => {
setTimeout(this.startBlockAssetsIndexing.bind(this), 1000 * 10);
if (this.schedulerRegistry.doesExist('cron', 'indexer-block-assets')) {
return;
}

this.logger.log(`Scheduling block assets indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockAssetsIndexing();
});
this.schedulerRegistry.addCronJob('indexer-block-assets', job);
job.start();
});
}
}
22 changes: 17 additions & 5 deletions backend/src/core/indexer/flow/transactions.flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { BlockchainService } from 'src/core/blockchain/blockchain.service';
import { PrismaService } from 'src/core/database/prisma/prisma.service';
import { IndexerQueueService } from '../indexer.queue';
import { ONE_DAY_MS } from 'src/common/date';
import { CronExpression, SchedulerRegistry } from '@nestjs/schedule';
import { CronJob } from 'cron';

const CKB_24_HOURS_BLOCK_NUMBER = ONE_DAY_MS / 10000;

Expand All @@ -21,16 +23,17 @@ export class IndexerTransactionsFlow extends EventEmitter {
private blockchainService: BlockchainService,
private prismaService: PrismaService,
private indexerQueueService: IndexerQueueService,
private schedulerRegistry: SchedulerRegistry,
) {
super();
}

public async start() {
this.startBlockAssetsIndexing();
this.setupBlockAssetsIndexedListener();
this.startBlockIndexing();
this.setupBlockIndexedListener();
}

public async startBlockAssetsIndexing() {
public async startBlockIndexing() {
const tipBlockNumber = await this.blockchainService.getTipBlockNumber();
let startBlockNumber = tipBlockNumber - CKB_24_HOURS_BLOCK_NUMBER;
const targetBlockNumber = tipBlockNumber - CKB_MIN_SAFE_CONFIRMATIONS;
Expand All @@ -55,9 +58,18 @@ export class IndexerTransactionsFlow extends EventEmitter {
});
}

private setupBlockAssetsIndexedListener() {
private setupBlockIndexedListener() {
this.on(IndexerTransactionsEvent.BlockIndexed, () => {
setTimeout(this.startBlockAssetsIndexing.bind(this), 1000 * 10);
if (this.schedulerRegistry.doesExist('cron', 'indexer-transactions')) {
return;
}

this.logger.log(`Scheduling block transactions indexing cron job`);
const job = new CronJob(CronExpression.EVERY_10_SECONDS, () => {
this.startBlockIndexing();
});
this.schedulerRegistry.addCronJob('indexer-transactions', job);
job.start();
});
}
}
3 changes: 3 additions & 0 deletions backend/src/core/indexer/indexer.factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { IndexerService } from './indexer.service';
import { BlockchainServiceFactory } from '../blockchain/blockchain.factory';
import { IndexerQueueService } from './indexer.queue';
import { ModuleRef } from '@nestjs/core';
import { SchedulerRegistry } from '@nestjs/schedule';

export class IndexerServiceFactoryError extends Error {
constructor(message: string) {
Expand All @@ -19,6 +20,7 @@ export class IndexerServiceFactory implements OnModuleDestroy {
constructor(
private blockchainServiceFactory: BlockchainServiceFactory,
private prismaService: PrismaService,
private schedulerRegistry: SchedulerRegistry,
private moduleRef: ModuleRef,
) {}

Expand All @@ -43,6 +45,7 @@ export class IndexerServiceFactory implements OnModuleDestroy {
blockchainService,
this.prismaService,
indexerQueueService,
this.schedulerRegistry,
);
this.services.set(chain.id, service);
}
Expand Down
4 changes: 4 additions & 0 deletions backend/src/core/indexer/indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { BlockchainService } from '../blockchain/blockchain.service';
import { PrismaService } from '../database/prisma/prisma.service';
import { IndexerQueueService } from './indexer.queue';
import { IndexerTransactionsFlow } from './flow/transactions.flow';
import { SchedulerRegistry } from '@nestjs/schedule';

export class IndexerService {
public assetsFlow: IndexerAssetsFlow;
Expand All @@ -14,18 +15,21 @@ export class IndexerService {
private blockchainService: BlockchainService,
private prismaService: PrismaService,
private indexerQueueService: IndexerQueueService,
private schedulerRegistry: SchedulerRegistry,
) {
this.assetsFlow = new IndexerAssetsFlow(
this.chain,
this.blockchainService,
this.prismaService,
this.indexerQueueService,
this.schedulerRegistry,
);
this.transactionsFlow = new IndexerTransactionsFlow(
this.chain,
this.blockchainService,
this.prismaService,
this.indexerQueueService,
this.schedulerRegistry,
);
}

Expand Down
Loading

1 comment on commit fc2b74f

@vercel
Copy link

@vercel vercel bot commented on fc2b74f Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.