diff --git a/app/apps/monitor-service/src/mantle/mantle.controller.ts b/app/apps/monitor-service/src/mantle/mantle.controller.ts new file mode 100644 index 0000000..5ddcc75 --- /dev/null +++ b/app/apps/monitor-service/src/mantle/mantle.controller.ts @@ -0,0 +1,34 @@ +import { TopicName } from '@app/utils/topicUtils'; +import { Controller, Logger } from '@nestjs/common'; +import { EventPattern } from '@nestjs/microservices'; +import { MantleService } from './mantle.service'; + +@Controller() +export class MantleController { + private readonly logger = new Logger(MantleController.name); + constructor(private readonly mantle: MantleService) {} + + @EventPattern(TopicName.MANTLE_NATIVE_TRANSFER) + async handleNativeTransfer(data: any) { + const start = Date.now(); + await this.mantle.handleNativeTransfer(data); + if (Date.now() - start > 20) + this.logger.warn(`Handle native transfer in : ${Date.now() - start}`); + } + + @EventPattern(TopicName.MANTLE_ERC20_TRANSFER) + async handleErc20Transfer(data: any) { + const start = Date.now(); + await this.mantle.handleErc20Transfer(data); + if (Date.now() - start > 20) + this.logger.warn(`Handle erc20 transfer in : ${Date.now() - start}`); + } + + @EventPattern(TopicName.MANTLE_ERC721_TRANSFER) + async handleErc721Transfer(data: any) { + const start = Date.now(); + await this.mantle.handleErc721Transfer(data); + if (Date.now() - start > 20) + this.logger.warn(`Handle erc721 transfer in : ${Date.now() - start}`); + } +} diff --git a/app/apps/monitor-service/src/mantle/mantle.module.ts b/app/apps/monitor-service/src/mantle/mantle.module.ts new file mode 100644 index 0000000..ea40f86 --- /dev/null +++ b/app/apps/monitor-service/src/mantle/mantle.module.ts @@ -0,0 +1,54 @@ +import { EventHistoryModelModule } from '@app/shared_modules/event_history/event_history.module'; +import { MonitorModule } from '@app/shared_modules/monitor/monitor.module'; +import { ProjectModule } from '@app/shared_modules/project/project.module'; +import { WebhookModule } from '@app/shared_modules/webhook/webhook.module'; +import { Module } from '@nestjs/common'; +import { ClientsModule, Transport } from '@nestjs/microservices'; +import { MantleController } from './mantle.controller'; +import { MantleService } from './mantle.service'; + +@Module({ + providers: [MantleService], + controllers: [MantleController], + exports: [MantleService], + imports: [ + ClientsModule.registerAsync([ + { + name: 'WEBHOOK_SERVICE', + useFactory: () => ({ + transport: Transport.KAFKA, + options: { + client: { + clientId: 'webhook', + brokers: process.env.KAFKA_BROKERS.split(','), + ssl: process.env.KAFKA_SSL === 'true', + sasl: + process.env.KAFKA_AUTH_ENABLE === 'true' + ? { + mechanism: 'plain', + username: process.env.KAFKA_USERNAME || '', + password: process.env.KAFKA_PASSWORD || '', + } + : null, + retry: { + restartOnFailure: async (e) => { + console.log('RESTART ON FAILURE mantle module'); + console.log(e); + return true; + }, + }, + }, + consumer: { + groupId: 'webhook-consumer', + }, + }, + }), + }, + ]), + WebhookModule, + MonitorModule, + ProjectModule, + EventHistoryModelModule, + ], +}) +export class MantleModule {} diff --git a/app/apps/monitor-service/src/mantle/mantle.service.ts b/app/apps/monitor-service/src/mantle/mantle.service.ts new file mode 100644 index 0000000..457d39e --- /dev/null +++ b/app/apps/monitor-service/src/mantle/mantle.service.ts @@ -0,0 +1,387 @@ +import { MantleEventHistoryRepository } from '@app/shared_modules/event_history/repositories/event_history.repository'; +import { + EventHistory, + WebhookType, +} from '@app/shared_modules/event_history/schemas/event_history.schema'; +import { MantleMonitorAddressRepository } from '@app/shared_modules/monitor/repositories/monitor.address.repository'; +import { MonitorRepository } from '@app/shared_modules/monitor/repositories/monitor.repository'; +import { MonitorAddress } from '@app/shared_modules/monitor/schemas/monitor.address.schema'; +import { + Monitor, + MonitoringType, + WebhookNotification, +} from '@app/shared_modules/monitor/schemas/monitor.schema'; +import { MonitorWebhookService } from '@app/shared_modules/monitor/services/monitor.webhook.service'; +import { ProjectQuotaService } from '@app/shared_modules/project/services/project.quota.service'; +import { + DispatchWebhookResponse, + WebhookService, +} from '@app/shared_modules/webhook/webhook.service'; +import { SupportedChain } from '@app/utils/supportedChain.util'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ethers, Log, TransactionResponse } from 'ethers'; + +@Injectable() +export class MantleService { + private readonly logger = new Logger(MantleService.name); + + @Inject() + private readonly monitorAddressRepository: MantleMonitorAddressRepository; + + @Inject() + private readonly monitorRepository: MonitorRepository; + + // @Inject('WEBHOOK_SERVICE') + // private readonly webhookClient: ClientKafka; + + @Inject() + private readonly webhookService: WebhookService; + + @Inject() + private readonly projectQuotaService: ProjectQuotaService; + + @Inject() + private readonly monitorWebhookService: MonitorWebhookService; + + @Inject() + private readonly eventHistoryRepository: MantleEventHistoryRepository; + + async findEthAddress(address: string): Promise { + return this.monitorAddressRepository.findByAddress(address); + } + + async findMonitor(monitorId: string): Promise { + return this.monitorRepository.findById(monitorId); + } + + async handleErc20Transfer(data: any): Promise { + const event = data.event as Log; + const confirm = data.confirm as boolean; + this.logger.debug([ + 'ERC20', + `received transaction ${event.transactionHash} from block ${event.blockNumber}`, + ]); + // Extract relevant information from the event + // const contractAddress = ethers.getAddress(event.address).toLowerCase(); + const fromAddress = ethers + .getAddress(event.topics[1].substring(26)) + .toLowerCase(); + const toAddress = ethers + .getAddress(event.topics[2].substring(26)) + .toLowerCase(); + const value = ethers.toBigInt(event.data).toString(); + + // handle from wallet + const fromWallet_monitors = await this.findEthAddress(fromAddress); + if (fromWallet_monitors) { + this.handleMatchConditionERC20( + fromWallet_monitors, + confirm, + event, + value, + WebhookType.out, + ); + } + + // handle to wallet + const toWallet_monitors = await this.findEthAddress(toAddress); + if (toWallet_monitors) { + this.handleMatchConditionERC20( + toWallet_monitors, + confirm, + event, + value, + WebhookType.in, + ); + } + } + + async handleErc721Transfer(data: any) { + const event = data.event as Log; + const confirm = data.confirm as boolean; + + this.logger.debug([ + 'ERC721', + `received transaction ${event.transactionHash} from block ${event.blockNumber}`, + ]); + + const fromAddress = ethers + .getAddress(event.topics[1].substring(26)) + .toLowerCase(); + const toAddress = ethers + .getAddress(event.topics[2].substring(26)) + .toLowerCase(); + const tokenId = ethers.toBigInt(event.topics[3]).toString(); + + // handle from wallet + const fromWallet_monitors = await this.findEthAddress(fromAddress); + if (fromWallet_monitors) { + this.handleMatchConditionERC721( + fromWallet_monitors, + confirm, + event, + tokenId, + WebhookType.out, + ); + } + + // handle to wallet + const toWallet_monitors = await this.findEthAddress(toAddress); + if (toWallet_monitors) { + this.handleMatchConditionERC721( + toWallet_monitors, + confirm, + event, + tokenId, + WebhookType.in, + ); + } + } + + async handleNativeTransfer(data: any): Promise { + const transaction = data.transaction as TransactionResponse; + const confirm = data.confirm as boolean; + + this.logger.debug([ + 'NATIVE', + `receive new transaction ${transaction.hash} from block ${transaction.blockNumber}`, + ]); + + // return if value is zero + if (transaction.value == 0n) { + return; + } + + const fromWallet_monitors = await this.findEthAddress( + transaction.from.toLowerCase(), + ); + if (fromWallet_monitors) { + this.handleMatchConditionNative( + fromWallet_monitors, + confirm, + transaction, + WebhookType.out, + ); + } + // return on to address is null. this is transaction create contract + if (!transaction.to) { + return; + } + const toWallet_monitors = await this.findEthAddress( + transaction.to.toLowerCase(), + ); + if (toWallet_monitors) { + this.handleMatchConditionNative( + toWallet_monitors, + confirm, + transaction, + WebhookType.in, + ); + } + } + + private async handleMatchConditionNative( + addresses: MonitorAddress[], + confirm: boolean, + transaction: TransactionResponse, + type: WebhookType, + ) { + // @todo check condition of monitor and event log if it match + for (const address of addresses) { + const monitor = await this.findMonitor(address.monitorId); + if (!monitor.condition.native) { + continue; + } + if ( + monitor.type !== MonitoringType.ALL && + monitor.type.toString() !== type.toString() + ) { + continue; + } + + const txnHistory = EventHistory.fromTransactionToNative( + transaction, + SupportedChain.MANTLE.name, + monitor.monitorId, + type, + confirm, + ); + + const response = await this.dispatchMessageToWebhook(monitor, txnHistory); + this.saveHistory(txnHistory, response); + + this.logger.debug( + `Confirmed: ${confirm} native transfer:\n${JSON.stringify(txnHistory)}`, + ); + } + } + + private async handleMatchConditionERC721( + addresses: MonitorAddress[], + confirm: boolean, + event: Log, + tokenId: string, + type: WebhookType, + ) { + for (const address of addresses) { + const monitor = await this.findMonitor(address.monitorId); + // ignore monitor condition on erc721 + if (!monitor.condition.erc721) { + continue; + } + if ( + monitor.type !== MonitoringType.ALL && + monitor.type.toString() !== type.toString() + ) { + continue; + } + // @todo check condition on specific cryptos + const transaction = EventHistory.fromLogToERC721( + event, + SupportedChain.MANTLE.name, + monitor.monitorId, + type, + confirm, + tokenId, + ); + + const response = await this.dispatchMessageToWebhook( + monitor, + transaction, + ); + this.saveHistory(transaction, response); + + this.logger.debug( + `Confirmed: ${confirm} ERC721 transfer ${type.toUpperCase()}:\n${JSON.stringify( + transaction, + )}`, + ); + } + } + + private async handleMatchConditionERC20( + addresses: MonitorAddress[], + confirm: boolean, + event: Log, + value: string, + type: WebhookType, + ) { + for (const address of addresses) { + const monitor = await this.findMonitor(address.monitorId); + // ignore monitor condition on erc20 + if (!monitor.condition.erc20) { + continue; + } + if ( + monitor.type !== MonitoringType.ALL && + monitor.type.toString() !== type.toString() + ) { + continue; + } + // @todo check condition on specific cryptos + const txnHistory = EventHistory.fromLogToERC20( + event, + SupportedChain.MANTLE.name, + monitor.monitorId, + type, + confirm, + value, + ); + + const response = await this.dispatchMessageToWebhook(monitor, txnHistory); + await this.saveHistory(txnHistory, response); + + this.logger.debug( + `Confirmed: ${confirm} ERC20 transfer ${type.toUpperCase()}:\n${JSON.stringify( + txnHistory, + )}`, + ); + } + } + + private async saveHistory( + event: EventHistory, + delivery: DispatchWebhookResponse, + ) { + let deliveryId: string; + if (!delivery) { + this.logger.error( + `Save event ${event.confirm ? 'CONFIRMED' : 'DETECT'} ${ + event.eventId + } with error can not dispatch this event`, + ); + deliveryId = 'ERROR'; + } else { + deliveryId = delivery.id; + } + + if (!event.confirm) { + event.deliveryIds = [deliveryId]; + await this.eventHistoryRepository.saveEventHistory(event); + } else { + await this.eventHistoryRepository.pushConfirmDeliveryId( + event.eventId, + deliveryId, + ); + } + } + + private async sendMessage(monitor: Monitor, body: EventHistory) { + if (!monitor.notification) { + return; + } + const webhook = monitor.notification as WebhookNotification; + body.tags = monitor.tags; + try { + const response = await fetch(webhook.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: webhook.authorization, + }, + body: JSON.stringify(body), + }); + if (!response.ok) { + this.logger.error( + `Error while sending webhook request to: ${webhook.url}`, + response, + ); + } + } catch (error) { + this.logger.error( + `Error while sending webhook request to: ${webhook.url}`, + error, + ); + } + } + + private async dispatchMessageToWebhook( + monitor: Monitor, + body: EventHistory, + ): Promise { + if (!monitor.notification) { + return; + } + const webhook = monitor.notification as WebhookNotification; + body.tags = monitor.tags; + try { + const respone = await this.webhookService.dispatchMessage( + monitor.webhookId, + body, + ); + this.logger.debug( + `Dispatch webhook successfully response: ${JSON.stringify(respone)}`, + ); + await Promise.all([ + this.projectQuotaService.increaseUsed(monitor.projectId), + this.monitorWebhookService.increaseWebhookCount(monitor.monitorId), + ]); + return respone; + } catch (error) { + this.logger.error( + `Error while sending webhook request to: ${webhook.url}`, + error, + ); + } + } +} diff --git a/app/apps/monitor-service/src/monitor-service.module.ts b/app/apps/monitor-service/src/monitor-service.module.ts index bed01ed..0606f6e 100644 --- a/app/apps/monitor-service/src/monitor-service.module.ts +++ b/app/apps/monitor-service/src/monitor-service.module.ts @@ -3,6 +3,7 @@ import { ConfigModule } from '@nestjs/config'; import { EthereumModule } from './ethereum/ethereum.module'; import { PolygonModule } from './polygon/polygon.module'; import { AvaxModule } from './avax/avax.module'; +import { MantleModule } from './mantle/mantle.module'; @Module({ imports: [ @@ -14,6 +15,7 @@ import { AvaxModule } from './avax/avax.module'; EthereumModule, PolygonModule, AvaxModule, + MantleModule, ], controllers: [], providers: [], diff --git a/app/apps/onebox/src/main.module.ts b/app/apps/onebox/src/main.module.ts index 37f9238..003f11a 100644 --- a/app/apps/onebox/src/main.module.ts +++ b/app/apps/onebox/src/main.module.ts @@ -18,6 +18,7 @@ import { UsersModule } from './modules/users/users.module'; import { EthereumPollingBlockService } from './polling.block/ethereum.polling.block.service'; import { PolygonPollingBlockService } from './polling.block/polygon.polling.block.service'; import { AvaxPollingBlockService } from './polling.block/avax.polling.block.service'; +import { MantlePollingBlockService } from './polling.block/mantle.polling.block.service'; @Module({ imports: [ @@ -78,6 +79,7 @@ import { AvaxPollingBlockService } from './polling.block/avax.polling.block.serv EthereumPollingBlockService, PolygonPollingBlockService, AvaxPollingBlockService, + MantlePollingBlockService, ], }) export class MainModule {} diff --git a/app/apps/onebox/src/polling.block/avax.polling.block.service.ts b/app/apps/onebox/src/polling.block/avax.polling.block.service.ts index 96acf39..bc9eb9c 100644 --- a/app/apps/onebox/src/polling.block/avax.polling.block.service.ts +++ b/app/apps/onebox/src/polling.block/avax.polling.block.service.ts @@ -110,8 +110,6 @@ export class AvaxPollingBlockService { blockNumber++ ) { try { - this.logger.log(`last emitted block ${blockNumber}`); - // emit event detect block with blocknumber this.workerClient.emit( TopicName.AVAX_DETECTED_BLOCK, @@ -127,7 +125,7 @@ export class AvaxPollingBlockService { ); this.detectInfo.blockNumber = blockNumber; - + this.logger.debug(`last emitted block ${blockNumber}`); //only update last sync for confirm await this.updateLastSyncBlock( blockNumber - SupportedChain.AVALANCHE.confirmationBlock, diff --git a/app/apps/onebox/src/polling.block/ethereum.polling.block.service.ts b/app/apps/onebox/src/polling.block/ethereum.polling.block.service.ts index 7793696..ad4c152 100644 --- a/app/apps/onebox/src/polling.block/ethereum.polling.block.service.ts +++ b/app/apps/onebox/src/polling.block/ethereum.polling.block.service.ts @@ -110,8 +110,6 @@ export class EthereumPollingBlockService { blockNumber++ ) { try { - this.logger.log(`last emitted block ${blockNumber}`); - // emit event detect block with blocknumber this.workerClient.emit( TopicName.ETH_DETECTED_BLOCK, @@ -127,7 +125,7 @@ export class EthereumPollingBlockService { ); this.detectInfo.blockNumber = blockNumber; - + this.logger.debug(`last emitted block ${blockNumber}`); //only update last sync for confirm await this.updateLastSyncBlock( blockNumber - SupportedChain.ETH.confirmationBlock, diff --git a/app/apps/onebox/src/polling.block/mantle.polling.block.service.ts b/app/apps/onebox/src/polling.block/mantle.polling.block.service.ts new file mode 100644 index 0000000..5377a54 --- /dev/null +++ b/app/apps/onebox/src/polling.block/mantle.polling.block.service.ts @@ -0,0 +1,168 @@ +import { TopicName } from '@app/utils/topicUtils'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { CronExpression, SchedulerRegistry } from '@nestjs/schedule'; +import { CronJob } from 'cron'; +import { ethers } from 'ethers'; +import { BlockSyncService } from '../modules/blocksync/blocksync.service'; +import { SupportedChain } from '@app/utils/supportedChain.util'; +import { BlockTransportDto } from '@app/utils/dto/transport.dto'; + +@Injectable() +export class MantlePollingBlockService { + private detectInfo = { flag: false, blockNumber: 0 }; + + private readonly logger = new Logger(MantlePollingBlockService.name); + + constructor( + private schedulerRegistry: SchedulerRegistry, + private readonly blockSyncService: BlockSyncService, + @Inject('WORKER_CLIENT_SERVICE') + private readonly workerClient: ClientKafka, + ) {} + + rpcUrl: string; + provider: ethers.Provider; + + onModuleInit() { + this.logger.log(`The module has been initialized.`); + if (process.env.MANTLE_DISABLE === 'true') { + this.detectInfo.flag = true; + return; + } + this.rpcUrl = process.env.MANTLE_PROVIDER_URL; + this.provider = new ethers.JsonRpcProvider( + process.env.MANTLE_PROVIDER_URL, + null, + { staticNetwork: true }, + ); + this.init(); + } + + async init() { + this.detectInfo.flag = true; + let blockSync = await this.blockSyncService.findOne(this.rpcUrl); + if (!blockSync) { + blockSync = await this.blockSyncService.create({ + rpcUrl: this.rpcUrl, + chain: SupportedChain.AVALANCHE.name, + lastSync: await this.getBlockNumber(), + }); + } + // checking force latest block config + const startBlockConfig = process.env.MANTLE_START_BLOCK_CONFIG; + if (startBlockConfig === 'latest') { + const latestBlockNumber = await this.provider.getBlockNumber(); + this.logger.warn( + 'force running latest block from network ' + latestBlockNumber, + ); + this.updateLastSyncBlock(latestBlockNumber); + // if start at latest block, we need to minus 1 + // we suppose that we already scan at (latest block - 1) + this.detectInfo.blockNumber = latestBlockNumber - 1; + } else if (startBlockConfig === 'config') { + this.logger.warn( + 'force running start block from config ' + + process.env.MANTLE_START_BLOCK, + ); + this.updateLastSyncBlock(parseInt(process.env.MANTLE_START_BLOCK)); + // if we start at config block, we suppose that we already scan at (config block - 1) + this.detectInfo.blockNumber = + parseInt(process.env.MANTLE_START_BLOCK) - 1; + } else { + this.logger.warn('running start block from db ' + blockSync.lastSync); + // if we start at db block, we suppose that we already scan at db block + this.detectInfo.blockNumber = + blockSync.lastSync + SupportedChain.AVALANCHE.confirmationBlock; + } + this.detectInfo.flag = false; + this.addCronJob('MantlePollingBlock', CronExpression.EVERY_5_SECONDS); + } + + addCronJob(name: string, seconds: string) { + const job = new CronJob(seconds, () => this.pollingBlock()); + + this.schedulerRegistry.addCronJob(name, job); + job.start(); + + this.logger.warn(`job ${name} added for each ${seconds} seconds!`); + } + + private async updateLastSyncBlock(blockNumber: number): Promise { + // Update the last sync block in MongoDB + await this.blockSyncService.updateLastSync(this.rpcUrl, blockNumber); + } + + async pollingBlock() { + this.logger.debug('Start polling block number'); + if (this.detectInfo.flag) { + this.logger.error('conflict with last job. quit current job'); + return; + } + this.detectInfo.flag = true; + const lastDetectedBlock = this.detectInfo.blockNumber + 1; + + // Get the latest block number + const latestDetectedBlockNumber = await this.getBlockNumber(); + + // Scan each block + for ( + let blockNumber = lastDetectedBlock; + blockNumber <= latestDetectedBlockNumber; + blockNumber++ + ) { + try { + // emit event detect block with blocknumber + this.workerClient.emit( + TopicName.MANTLE_DETECTED_BLOCK, + new BlockTransportDto(blockNumber, false), + ); + // emit event confirm block with block number - confirm block + this.workerClient.emit( + TopicName.MANTLE_DETECTED_BLOCK, + new BlockTransportDto( + blockNumber - SupportedChain.AVALANCHE.confirmationBlock, + true, + ), + ); + + this.detectInfo.blockNumber = blockNumber; + this.logger.debug(`last emitted block ${blockNumber}`); + + //only update last sync for confirm + await this.updateLastSyncBlock( + blockNumber - SupportedChain.AVALANCHE.confirmationBlock, + ); + } catch (error) { + this.logger.error([`Error polling block ${blockNumber}:`, error]); + break; + } + } + + this.detectInfo.flag = false; + return; + } + + private async getBlockNumber(): Promise { + try { + // Perform an asynchronous operation (e.g., fetching data) + const blockNumber = await Promise.race([ + this.provider.getBlockNumber(), // Your asynchronous operation + delay(5000).then(() => { + throw new Error('Get block number Timeout'); + }), // Timeout promise + ]); + this.logger.log('got latest block from network: ' + blockNumber); + return blockNumber; + } catch (error) { + this.logger.error('error while getting block number', error); + } + return 0; + } +} + +function delay(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} diff --git a/app/apps/onebox/src/polling.block/polygon.polling.block.service.ts b/app/apps/onebox/src/polling.block/polygon.polling.block.service.ts index 54b5577..fadd353 100644 --- a/app/apps/onebox/src/polling.block/polygon.polling.block.service.ts +++ b/app/apps/onebox/src/polling.block/polygon.polling.block.service.ts @@ -112,8 +112,6 @@ export class PolygonPollingBlockService { blockNumber++ ) { try { - this.logger.log(`last emitted block ${blockNumber}`); - // emit event detect block with blocknumber this.workerClient.emit( TopicName.POLYGON_DETECTED_BLOCK, @@ -129,7 +127,7 @@ export class PolygonPollingBlockService { ); this.detectInfo.blockNumber = blockNumber; - + this.logger.debug(`last emitted block ${blockNumber}`); //only update last sync for confirm await this.updateLastSyncBlock( blockNumber - SupportedChain.POLYGON.confirmationBlock, diff --git a/app/apps/worker-service/src/worker-service.controller.ts b/app/apps/worker-service/src/worker-service.controller.ts index ad9e299..e146555 100644 --- a/app/apps/worker-service/src/worker-service.controller.ts +++ b/app/apps/worker-service/src/worker-service.controller.ts @@ -6,6 +6,7 @@ import { EthereumWorker } from './worker/ethereum.worker'; import { PolygonWorker } from './worker/polygon.worker'; import { BlockTransportDto } from '@app/utils/dto/transport.dto'; import { AvaxWorker } from './worker/avax.worker'; +import { MantleWorker } from './worker/mantle.worker'; @Controller() export class WorkerServiceController { @@ -15,6 +16,7 @@ export class WorkerServiceController { private readonly ethereumWorker: EthereumWorker, private readonly polygonWorker: PolygonWorker, private readonly avaxWorker: AvaxWorker, + private readonly mantleWorker: MantleWorker, ) {} @Get() @@ -57,4 +59,14 @@ export class WorkerServiceController { }), ]); } + + @EventPattern(TopicName.MANTLE_DETECTED_BLOCK) + async mantleDetectBlock(data: any) { + await Promise.race([ + this.mantleWorker.handleBlock(data), + this.delay(200).then(() => { + return; + }), + ]); + } } diff --git a/app/apps/worker-service/src/worker-service.module.ts b/app/apps/worker-service/src/worker-service.module.ts index 5fc276a..0a9baf6 100644 --- a/app/apps/worker-service/src/worker-service.module.ts +++ b/app/apps/worker-service/src/worker-service.module.ts @@ -8,6 +8,7 @@ import { WorkerServiceService } from './worker-service.service'; import { EthereumWorker } from './worker/ethereum.worker'; import { PolygonWorker } from './worker/polygon.worker'; import { AvaxWorker } from './worker/avax.worker'; +import { MantleWorker } from './worker/mantle.worker'; @Module({ imports: [ @@ -84,6 +85,12 @@ import { AvaxWorker } from './worker/avax.worker'; BlockHistoryModelModule, ], controllers: [WorkerServiceController], - providers: [WorkerServiceService, EthereumWorker, PolygonWorker, AvaxWorker], + providers: [ + WorkerServiceService, + EthereumWorker, + PolygonWorker, + AvaxWorker, + MantleWorker, + ], }) export class WorkerServiceModule {} diff --git a/app/apps/worker-service/src/worker/mantle.worker.ts b/app/apps/worker-service/src/worker/mantle.worker.ts new file mode 100644 index 0000000..983c916 --- /dev/null +++ b/app/apps/worker-service/src/worker/mantle.worker.ts @@ -0,0 +1,165 @@ +import { MantleBlockHistoryRepository } from '@app/shared_modules/block_history/repositories/block_history.repository'; +import { MonitorNetwork } from '@app/shared_modules/monitor/schemas/monitor.schema'; +import { BlockTransportDto } from '@app/utils/dto/transport.dto'; +import { TopicName } from '@app/utils/topicUtils'; +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { ClientKafka } from '@nestjs/microservices'; +import { Block, ethers, Log } from 'ethers'; + +@Injectable() +export class MantleWorker { + private readonly logger = new Logger(MantleWorker.name); + rpcUrl: string; + provider: ethers.Provider; + @Inject('MONITOR_CLIENT_SERVICE') + private readonly monitorClient: ClientKafka; + + @Inject('WORKER_CLIENT_SERVICE') + private readonly workerClient: ClientKafka; + + @Inject() + private readonly blockHistoryRepository: MantleBlockHistoryRepository; + + constructor() { + if (process.env.MANTLE_PROVIDER_URL) { + this.rpcUrl = process.env.MANTLE_PROVIDER_URL; + this.provider = new ethers.JsonRpcProvider( + process.env.MANTLE_PROVIDER_URL, + null, + { staticNetwork: true }, + ); + } + } + + async handleBlock(data: BlockTransportDto) { + const start = Date.now(); + const blockNumber = data.blockNumber; + if (!blockNumber) { + this.logger.error( + 'receive invalid message with block number is undefined', + ); + return; + } + try { + const startGetBlock = Date.now(); + const result = await Promise.all([ + this.provider.getBlock(blockNumber, true), + this.provider.getLogs({ + fromBlock: blockNumber, + toBlock: blockNumber, + topics: [ + '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', + ], + }), + ]); + // Retrieve all transaction in block + const block = result[0]; + const logs = result[1]; + const endGetBlock = Date.now(); + + const emitStart = Date.now(); + await Promise.all([ + this.emitNativeTransaction(block, data.confirmed), + this.emitLog(logs, data.confirmed), + ]); + const emitEnd = Date.now(); + //only update last sync for confirm + await this.saveBlockHistory(blockNumber, data.confirmed); + this.logger.log( + `${ + data.confirmed ? 'CONFIRM' : 'DETECT' + } Scanning block ${blockNumber} with ${ + block.length + logs.length + } events in ${Date.now() - start}ms and emit ${ + emitEnd - emitStart + }ms and GetEvent ${endGetBlock - startGetBlock}ms`, + ); + } catch (error) { + this.logger.error([ + `${data.confirmed ? 'CONFIRM' : 'DETECT'}`, + `Error scanning block ${blockNumber}:`, + error, + ]); + await this.saveBlockHistory( + blockNumber, + data.confirmed, + true, + error, + data.retry + 1 || 1, + ); + } + + return; + } + + private async emitLog(logs: Log[], confirm: boolean): Promise { + // handle extracted event for erc20 and nft + logs.forEach((event) => { + if (event.topics.length === 3) { + this.logger.debug(`emit event on ERC20 ${JSON.stringify(event)}`); + this.monitorClient.emit(TopicName.MANTLE_ERC20_TRANSFER, { + event: event, + confirm: confirm, + }); + } else if (event.topics.length === 4) { + this.logger.debug(`emit event on ERC721 ${JSON.stringify(event)}`); + this.monitorClient.emit(TopicName.MANTLE_ERC721_TRANSFER, { + event: event, + confirm: confirm, + }); + } + }); + } + + private async emitNativeTransaction( + block: Block, + confirm: boolean, + ): Promise { + if (block === null) { + this.logger.error('get block from network return null'); + throw new Error('get block from network return null'); + } + // handle extracted event for native + block.prefetchedTransactions.forEach((transaction) => { + this.logger.debug(`emit event on NATIVE ${JSON.stringify(transaction)}`); + this.monitorClient.emit(TopicName.MANTLE_NATIVE_TRANSFER, { + transaction: transaction, + confirm: confirm, + }); + }); + } + + private async saveBlockHistory( + blockNumber: number, + confirmed: boolean, + isError?: boolean, + error?: any, + retry?: number, + ): Promise { + if (isError && retry < 3) { + this.logger.warn( + `emit error block ${blockNumber} to kafka with retry ${retry}`, + ); + this.workerClient.emit(TopicName.MANTLE_DETECTED_BLOCK, { + key: 'error', + value: new BlockTransportDto( + blockNumber, + confirmed, + retry, + isError, + error !== undefined ? JSON.stringify(error) : '', + ), + }); + } + this.logger.debug(`save block history ${blockNumber}`); + await this.blockHistoryRepository.saveBlockHistory({ + blockNumber: blockNumber, + chain: MonitorNetwork.Avalanche, + confirmed: confirmed, + isError: isError || false, + errorDetail: error !== undefined ? JSON.stringify(error) : '', + retry: retry || 0, + dateCreated: new Date(), + }); + } +} diff --git a/app/libs/shared_modules/src/block_history/block_history.module.ts b/app/libs/shared_modules/src/block_history/block_history.module.ts index a9a99e3..14b0412 100644 --- a/app/libs/shared_modules/src/block_history/block_history.module.ts +++ b/app/libs/shared_modules/src/block_history/block_history.module.ts @@ -4,6 +4,7 @@ import { AvaxBlockHistoryRepository, BscBlockHistoryRepository, EthBlockHistoryRepository, + MantleBlockHistoryRepository, PolygonBlockHistoryRepository, } from './repositories/block_history.repository'; import { BlockHistoryProviders } from './block_history.provider'; @@ -16,6 +17,7 @@ import { BlockHistoryProviders } from './block_history.provider'; BscBlockHistoryRepository, PolygonBlockHistoryRepository, AvaxBlockHistoryRepository, + MantleBlockHistoryRepository, ], exports: [ ...BlockHistoryProviders, @@ -23,6 +25,7 @@ import { BlockHistoryProviders } from './block_history.provider'; BscBlockHistoryRepository, PolygonBlockHistoryRepository, AvaxBlockHistoryRepository, + MantleBlockHistoryRepository, ], }) export class BlockHistoryModelModule {} diff --git a/app/libs/shared_modules/src/block_history/block_history.provider.ts b/app/libs/shared_modules/src/block_history/block_history.provider.ts index 51b95b9..26971ee 100644 --- a/app/libs/shared_modules/src/block_history/block_history.provider.ts +++ b/app/libs/shared_modules/src/block_history/block_history.provider.ts @@ -42,4 +42,14 @@ export const BlockHistoryProviders = [ ), inject: ['DATABASE_CONNECTION'], }, + { + provide: 'MANTLE_BLOCK_HISTORY_MODEL', + useFactory: (connection: Connection) => + connection.model( + 'MantleBlockHistory', + BlockHistorySchema, + 'mantle_block_history', + ), + inject: ['DATABASE_CONNECTION'], + }, ]; diff --git a/app/libs/shared_modules/src/block_history/repositories/block_history.repository.ts b/app/libs/shared_modules/src/block_history/repositories/block_history.repository.ts index bc17646..d1740fc 100644 --- a/app/libs/shared_modules/src/block_history/repositories/block_history.repository.ts +++ b/app/libs/shared_modules/src/block_history/repositories/block_history.repository.ts @@ -77,3 +77,12 @@ export class AvaxBlockHistoryRepository extends BlockHistoryRepository { super(MonitorNetwork.Avalanche, model); } } + +@Injectable() +export class MantleBlockHistoryRepository extends BlockHistoryRepository { + constructor( + @Inject('MANTLE_BLOCK_HISTORY_MODEL') model: Model, + ) { + super(MonitorNetwork.Mantle, model); + } +} diff --git a/app/libs/shared_modules/src/event_history/event_history.module.ts b/app/libs/shared_modules/src/event_history/event_history.module.ts index 66ccea2..2d6b594 100644 --- a/app/libs/shared_modules/src/event_history/event_history.module.ts +++ b/app/libs/shared_modules/src/event_history/event_history.module.ts @@ -4,6 +4,7 @@ import { AvaxEventHistoryRepository, BscEventHistoryRepository, EthEventHistoryRepository, + MantleEventHistoryRepository, PolygonEventHistoryRepository, } from './repositories/event_history.repository'; import { EventHistoryProviders } from './event_history.provider'; @@ -16,6 +17,7 @@ import { EventHistoryProviders } from './event_history.provider'; BscEventHistoryRepository, PolygonEventHistoryRepository, AvaxEventHistoryRepository, + MantleEventHistoryRepository, ], exports: [ ...EventHistoryProviders, @@ -23,6 +25,7 @@ import { EventHistoryProviders } from './event_history.provider'; BscEventHistoryRepository, PolygonEventHistoryRepository, AvaxEventHistoryRepository, + MantleEventHistoryRepository, ], }) export class EventHistoryModelModule {} diff --git a/app/libs/shared_modules/src/event_history/event_history.provider.ts b/app/libs/shared_modules/src/event_history/event_history.provider.ts index c2e37c5..e58a0b9 100644 --- a/app/libs/shared_modules/src/event_history/event_history.provider.ts +++ b/app/libs/shared_modules/src/event_history/event_history.provider.ts @@ -42,4 +42,14 @@ export const EventHistoryProviders = [ ), inject: ['DATABASE_CONNECTION'], }, + { + provide: 'MANTLE_EVENT_HISTORY_MODEL', + useFactory: (connection: Connection) => + connection.model( + 'MantleEventHistory', + EventHistorySchema, + 'mantle_event_history', + ), + inject: ['DATABASE_CONNECTION'], + }, ]; diff --git a/app/libs/shared_modules/src/event_history/repositories/event_history.repository.ts b/app/libs/shared_modules/src/event_history/repositories/event_history.repository.ts index c4f0910..14493a9 100644 --- a/app/libs/shared_modules/src/event_history/repositories/event_history.repository.ts +++ b/app/libs/shared_modules/src/event_history/repositories/event_history.repository.ts @@ -107,3 +107,12 @@ export class AvaxEventHistoryRepository extends EventHistoryRepository { super(MonitorNetwork.Avalanche, model); } } + +@Injectable() +export class MantleEventHistoryRepository extends EventHistoryRepository { + constructor( + @Inject('MANTLE_EVENT_HISTORY_MODEL') model: Model, + ) { + super(MonitorNetwork.Mantle, model); + } +} diff --git a/app/libs/shared_modules/src/monitor/monitor.module.ts b/app/libs/shared_modules/src/monitor/monitor.module.ts index 0618795..1e6ba65 100644 --- a/app/libs/shared_modules/src/monitor/monitor.module.ts +++ b/app/libs/shared_modules/src/monitor/monitor.module.ts @@ -5,6 +5,7 @@ import { AvaxMonitorAddressRepository, BscMonitorAddressRepository, EthMonitorAddressRepository, + MantleMonitorAddressRepository, PolygonMonitorAddressRepository, } from './repositories/monitor.address.repository'; import { MonitorRepository } from './repositories/monitor.repository'; @@ -20,6 +21,7 @@ import { MonitorWebhookService } from './services/monitor.webhook.service'; BscMonitorAddressRepository, PolygonMonitorAddressRepository, AvaxMonitorAddressRepository, + MantleMonitorAddressRepository, ], exports: [ ...MonitorProviders, @@ -29,6 +31,7 @@ import { MonitorWebhookService } from './services/monitor.webhook.service'; BscMonitorAddressRepository, PolygonMonitorAddressRepository, AvaxMonitorAddressRepository, + MantleMonitorAddressRepository, ], }) export class MonitorModule {} diff --git a/app/libs/shared_modules/src/monitor/monitor.provider.ts b/app/libs/shared_modules/src/monitor/monitor.provider.ts index 08dc196..ddd1817 100644 --- a/app/libs/shared_modules/src/monitor/monitor.provider.ts +++ b/app/libs/shared_modules/src/monitor/monitor.provider.ts @@ -49,4 +49,14 @@ export const MonitorProviders = [ ), inject: ['DATABASE_CONNECTION'], }, + { + provide: 'MANTLE_MONITOR_ADDRESS_MODEL', + useFactory: (connection: Connection) => + connection.model( + 'MantleMonitorAddress', + MonitorAddressSchema, + 'mantle_monitor_address', + ), + inject: ['DATABASE_CONNECTION'], + }, ]; diff --git a/app/libs/shared_modules/src/monitor/repositories/monitor.address.repository.ts b/app/libs/shared_modules/src/monitor/repositories/monitor.address.repository.ts index 20a5403..a7c070a 100644 --- a/app/libs/shared_modules/src/monitor/repositories/monitor.address.repository.ts +++ b/app/libs/shared_modules/src/monitor/repositories/monitor.address.repository.ts @@ -128,3 +128,12 @@ export class AvaxMonitorAddressRepository extends MonitorAddressRepository { super(MonitorNetwork.Avalanche, model); } } + +@Injectable() +export class MantleMonitorAddressRepository extends MonitorAddressRepository { + constructor( + @Inject('MANTLE_MONITOR_ADDRESS_MODEL') model: Model, + ) { + super(MonitorNetwork.Mantle, model); + } +} diff --git a/app/libs/shared_modules/src/monitor/schemas/monitor.schema.ts b/app/libs/shared_modules/src/monitor/schemas/monitor.schema.ts index cf054bd..9e22cf8 100644 --- a/app/libs/shared_modules/src/monitor/schemas/monitor.schema.ts +++ b/app/libs/shared_modules/src/monitor/schemas/monitor.schema.ts @@ -7,6 +7,7 @@ export enum MonitorNetwork { BSC = 'BSC', Polygon = 'Polygon', Avalanche = 'Avalanche', + Mantle = 'Mantle', Arbitrum = 'Arbitrum', Optimism = 'Optimism', Celo = 'Celo', diff --git a/app/libs/utils/src/topicUtils.ts b/app/libs/utils/src/topicUtils.ts index 0fa0c23..048ce6d 100644 --- a/app/libs/utils/src/topicUtils.ts +++ b/app/libs/utils/src/topicUtils.ts @@ -13,4 +13,9 @@ export enum TopicName { AVAX_NATIVE_TRANSFER = 'avax-native-transfer', AVAX_ERC20_TRANSFER = 'avax-erc20-transfer', AVAX_ERC721_TRANSFER = 'avax-erc721-transfer', + + MANTLE_DETECTED_BLOCK = 'mantle-detect-block', + MANTLE_NATIVE_TRANSFER = 'mantle-native-transfer', + MANTLE_ERC20_TRANSFER = 'mantle-erc20-transfer', + MANTLE_ERC721_TRANSFER = 'mantle-erc721-transfer', }