diff --git a/app/apps/onebox/src/main.module.ts b/app/apps/onebox/src/main.module.ts index cece1c2..b67d735 100644 --- a/app/apps/onebox/src/main.module.ts +++ b/app/apps/onebox/src/main.module.ts @@ -1,12 +1,14 @@ import { DatabaseModule } from '@app/database'; import { GlobalModule, GlobalService } from '@app/global'; +import { SharedModulesModule } from '@app/shared_modules'; import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; +import { ScheduleModule } from '@nestjs/schedule'; import { SwaggerModule } from '@nestjs/swagger'; import { AuthModule } from './modules/auth/auth.module'; import { UsersModule } from './modules/users/users.module'; import { WalletModule } from './modules/wallet/wallet.module'; -import { EthMonitorModule } from './modules/webhooks/ethereum/eth.monitor.module'; +import { PollingBlockService } from './polling.block/polling.block.service'; @Module({ imports: [ @@ -21,8 +23,9 @@ import { EthMonitorModule } from './modules/webhooks/ethereum/eth.monitor.module GlobalModule, WalletModule, SwaggerModule, - EthMonitorModule, + SharedModulesModule, + ScheduleModule.forRoot(), ], - providers: [GlobalService], + providers: [GlobalService, PollingBlockService], }) export class MainModule {} diff --git a/app/apps/onebox/src/modules/webhooks/ethereum/dto/eth.create-monitor.dto.ts b/app/apps/onebox/src/modules/webhooks/ethereum/dto/eth.create-monitor.dto.ts index 6af947f..a51298e 100644 --- a/app/apps/onebox/src/modules/webhooks/ethereum/dto/eth.create-monitor.dto.ts +++ b/app/apps/onebox/src/modules/webhooks/ethereum/dto/eth.create-monitor.dto.ts @@ -9,7 +9,7 @@ import { ValidateNested, } from 'class-validator'; import { - FilterValue, + FilterRange, MonitorCondition, MonitoringType, NotificationMethod, @@ -57,9 +57,6 @@ export class CreateEthMonitorDto { @Type(() => NotificationMethod) notificationMethods: NotificationMethod[]; - @ApiProperty() - filter: FilterValue; - @ApiProperty() note: string; } diff --git a/app/apps/onebox/src/polling.block/polling.block.service.spec.ts b/app/apps/onebox/src/polling.block/polling.block.service.spec.ts new file mode 100644 index 0000000..cfb4684 --- /dev/null +++ b/app/apps/onebox/src/polling.block/polling.block.service.spec.ts @@ -0,0 +1,18 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { PollingBlockService } from './polling.block.service'; + +describe('PollingBlockService', () => { + let service: PollingBlockService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [PollingBlockService], + }).compile(); + + service = module.get(PollingBlockService); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); +}); diff --git a/app/apps/onebox/src/polling.block/polling.block.service.ts b/app/apps/onebox/src/polling.block/polling.block.service.ts new file mode 100644 index 0000000..cb78916 --- /dev/null +++ b/app/apps/onebox/src/polling.block/polling.block.service.ts @@ -0,0 +1,139 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Client, ClientKafka, Transport } from '@nestjs/microservices'; +import { BlockSyncService } from '../modules/blocksync/blocksync.service'; +import { ethers } from 'ethers'; +import { EthereumWorker } from 'apps/worker-service/src/worker/evm.worker'; +import { Cron, CronExpression } from '@nestjs/schedule'; + +@Injectable() +export class PollingBlockService { + private readonly confirmBlock = 12; + private detectInfo = { flag: false, blockNumber: 0 }; + + private readonly logger = new Logger(EthereumWorker.name); + @Client({ + transport: Transport.KAFKA, + options: { + client: { + clientId: 'worker', + brokers: process.env.KAFKA_BROKERS.split(','), + }, + consumer: { + groupId: 'worker-consumer', + }, + }, + }) + private readonly workerClient: ClientKafka; + + @Inject(BlockSyncService) + private readonly blockSyncService: BlockSyncService; + + rpcUrl: string; + provider: ethers.Provider; + + onModuleInit() { + console.log(`The module has been initialized.`); + if (process.env.EVM_DISABLE === 'true') { + this.detectInfo.flag = true; + return; + } + this.rpcUrl = process.env.ETH_PROVIDER_URL; + this.provider = new ethers.JsonRpcProvider(process.env.ETH_PROVIDER_URL); + this.init(); + } + + async init() { + this.detectInfo.flag = true; + let blockSync = await this.blockSyncService.findOne(this.rpcUrl); + if (!blockSync) { + blockSync = await this.blockSyncService.create({ + rpcUrl: this.rpcUrl, + lastSync: parseInt(process.env.EVM_START_BLOCK), + }); + } + // checking force latest block config + const startBlockConfig = process.env.EVM_START_BLOCK_CONFIG; + if (startBlockConfig === 'latest') { + const latestBlockNumber = await this.provider.getBlockNumber(); + this.logger.warn( + 'force running latest block from network ' + latestBlockNumber, + ); + this.updateLastSyncBlock(latestBlockNumber); + this.detectInfo.blockNumber = latestBlockNumber - 1; + } else if (startBlockConfig === 'config') { + this.logger.warn( + 'force running start block from config ' + process.env.EVM_START_BLOCK, + ); + this.updateLastSyncBlock(parseInt(process.env.EVM_START_BLOCK)); + this.detectInfo.blockNumber = parseInt(process.env.EVM_START_BLOCK) - 1; + } else { + this.logger.warn('running start block from db ' + blockSync.lastSync); + this.detectInfo.blockNumber = blockSync.lastSync + this.confirmBlock; + } + this.detectInfo.flag = false; + } + + private async updateLastSyncBlock(blockNumber: number): Promise { + // Update the last sync block in MongoDB + await this.blockSyncService.updateLastSync(this.rpcUrl, blockNumber); + } + + @Cron(CronExpression.EVERY_10_SECONDS, { + disabled: process.env.EVM_DISABLE === 'true', + }) + async ethPollingBlock() { + console.log('Start detect block'); + if (this.detectInfo.flag) { + return; + } + this.detectInfo.flag = true; + const lastDetectedBlock = this.detectInfo.blockNumber + 1; + + // Get the latest block number + const latestDetectedBlockNumber = await this.getBlockNumber(); + + // Scan each block + for ( + let blockNumber = lastDetectedBlock; + blockNumber <= latestDetectedBlockNumber; + blockNumber++ + ) { + try { + this.logger.debug(['DETECT', `Scanning block ${blockNumber}`]); + + // emit event detect block with blocknumber + this.workerClient.emit('eth-detect-block', { + blockNumber: blockNumber, + }); + + // emit event confirm block with block number - confirm block + this.workerClient.emit('eth-confirm-block', { + blockNumber: blockNumber - this.confirmBlock, + }); + + //only update last sync for confirm + await this.updateLastSyncBlock(blockNumber - this.confirmBlock); + } catch (error) { + this.logger.error([ + 'DETECT', + `Error scanning block ${blockNumber}:`, + error, + ]); + break; + } + } + + this.detectInfo.flag = false; + return; + } + + private async getBlockNumber(): Promise { + try { + const blockNumber = await this.provider.getBlockNumber(); + return blockNumber; + } catch (error) { + this.logger.error('error while getting block number', error); + } + return 0; + } +} diff --git a/app/apps/worker-service/src/blocksync/schemas/blocksync.schema.ts b/app/apps/worker-service/src/blocksync/schemas/blocksync.schema.ts index f08ddc8..ce76051 100644 --- a/app/apps/worker-service/src/blocksync/schemas/blocksync.schema.ts +++ b/app/apps/worker-service/src/blocksync/schemas/blocksync.schema.ts @@ -5,7 +5,7 @@ export type BlockSyncDocument = HydratedDocument; @Schema() export class BlockSync { - @Prop({ required: true, unique: true, lowercase: true }) + @Prop({ required: true, unique: true, lowercase: true, index: true }) rpcUrl: string; @Prop({ default: 'ETH' }) diff --git a/app/apps/worker-service/src/worker/evm.worker.ts b/app/apps/worker-service/src/worker/evm.worker.ts index 5427371..726f033 100644 --- a/app/apps/worker-service/src/worker/evm.worker.ts +++ b/app/apps/worker-service/src/worker/evm.worker.ts @@ -75,7 +75,7 @@ export class EthereumWorker { this.logger.warn( 'force running latest block from network ' + latestBlockNumber, ); - this.blockSyncService.updateLastSync(this.rpcUrl, latestBlockNumber); + this.updateLastSyncBlock(latestBlockNumber); this.detectInfo.blockNumber = latestBlockNumber - 1; this.confirmInfo.blockNumber = latestBlockNumber - 1; } else if (startBlockConfig === 'config') { diff --git a/app/libs/shared_modules/src/eth.monitor/schemas/eth.monitor.schema.ts b/app/libs/shared_modules/src/eth.monitor/schemas/eth.monitor.schema.ts index 9c79448..700db12 100644 --- a/app/libs/shared_modules/src/eth.monitor/schemas/eth.monitor.schema.ts +++ b/app/libs/shared_modules/src/eth.monitor/schemas/eth.monitor.schema.ts @@ -12,6 +12,9 @@ export class MonitorCondition { @Prop() native: boolean; + @Prop() + internal: boolean; + @Prop() erc721: boolean; @@ -23,7 +26,9 @@ export class MonitorCondition { @Prop({ type: Object }) cryptos: { - [key: string]: boolean; + [key: string]: { + filterValue: FilterRange; // @todo check this later + }; }; // ALL = 'ALL', // native, erc20, erc721, erc1155 @@ -51,7 +56,7 @@ export class NotificationMethod { // note: only apply for purchased user // tokens not having price will be excluded in when calculating USD value // filter value in USD -export class FilterValue { +export class FilterRange { @Prop() min: bigint; @Prop() @@ -80,9 +85,6 @@ export class EthMonitor { @Prop({ required: true }) notificationMethods: NotificationMethod[]; - @Prop({ required: false }) - filter: FilterValue; - @Prop({ require: false, maxlength: 200 }) note: string;