Skip to content

Commit

Permalink
Merge pull request #17 from crypitor/config
Browse files Browse the repository at this point in the history
Config polling service
  • Loading branch information
cauta authored Mar 30, 2024
2 parents 14c4e3c + 0706f86 commit 218f661
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 14 deletions.
9 changes: 6 additions & 3 deletions app/apps/onebox/src/main.module.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { DatabaseModule } from '@app/database';
import { GlobalModule, GlobalService } from '@app/global';
import { SharedModulesModule } from '@app/shared_modules';
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ScheduleModule } from '@nestjs/schedule';
import { SwaggerModule } from '@nestjs/swagger';
import { AuthModule } from './modules/auth/auth.module';
import { UsersModule } from './modules/users/users.module';
import { WalletModule } from './modules/wallet/wallet.module';
import { EthMonitorModule } from './modules/webhooks/ethereum/eth.monitor.module';
import { PollingBlockService } from './polling.block/polling.block.service';

@Module({
imports: [
Expand All @@ -21,8 +23,9 @@ import { EthMonitorModule } from './modules/webhooks/ethereum/eth.monitor.module
GlobalModule,
WalletModule,
SwaggerModule,
EthMonitorModule,
SharedModulesModule,
ScheduleModule.forRoot(),
],
providers: [GlobalService],
providers: [GlobalService, PollingBlockService],
})
export class MainModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
ValidateNested,
} from 'class-validator';
import {
FilterValue,
FilterRange,
MonitorCondition,
MonitoringType,
NotificationMethod,
Expand Down Expand Up @@ -57,9 +57,6 @@ export class CreateEthMonitorDto {
@Type(() => NotificationMethod)
notificationMethods: NotificationMethod[];

@ApiProperty()
filter: FilterValue;

@ApiProperty()
note: string;
}
18 changes: 18 additions & 0 deletions app/apps/onebox/src/polling.block/polling.block.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Test, TestingModule } from '@nestjs/testing';
import { PollingBlockService } from './polling.block.service';

describe('PollingBlockService', () => {
let service: PollingBlockService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [PollingBlockService],
}).compile();

service = module.get<PollingBlockService>(PollingBlockService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});
});
139 changes: 139 additions & 0 deletions app/apps/onebox/src/polling.block/polling.block.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Client, ClientKafka, Transport } from '@nestjs/microservices';
import { BlockSyncService } from '../modules/blocksync/blocksync.service';
import { ethers } from 'ethers';
import { EthereumWorker } from 'apps/worker-service/src/worker/evm.worker';
import { Cron, CronExpression } from '@nestjs/schedule';

@Injectable()
export class PollingBlockService {
private readonly confirmBlock = 12;
private detectInfo = { flag: false, blockNumber: 0 };

private readonly logger = new Logger(EthereumWorker.name);
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'worker',
brokers: process.env.KAFKA_BROKERS.split(','),
},
consumer: {
groupId: 'worker-consumer',
},
},
})
private readonly workerClient: ClientKafka;

@Inject(BlockSyncService)
private readonly blockSyncService: BlockSyncService;

rpcUrl: string;
provider: ethers.Provider;

onModuleInit() {
console.log(`The module has been initialized.`);
if (process.env.EVM_DISABLE === 'true') {
this.detectInfo.flag = true;
return;
}
this.rpcUrl = process.env.ETH_PROVIDER_URL;
this.provider = new ethers.JsonRpcProvider(process.env.ETH_PROVIDER_URL);
this.init();
}

async init() {
this.detectInfo.flag = true;
let blockSync = await this.blockSyncService.findOne(this.rpcUrl);
if (!blockSync) {
blockSync = await this.blockSyncService.create({
rpcUrl: this.rpcUrl,
lastSync: parseInt(process.env.EVM_START_BLOCK),
});
}
// checking force latest block config
const startBlockConfig = process.env.EVM_START_BLOCK_CONFIG;
if (startBlockConfig === 'latest') {
const latestBlockNumber = await this.provider.getBlockNumber();
this.logger.warn(
'force running latest block from network ' + latestBlockNumber,
);
this.updateLastSyncBlock(latestBlockNumber);
this.detectInfo.blockNumber = latestBlockNumber - 1;
} else if (startBlockConfig === 'config') {
this.logger.warn(
'force running start block from config ' + process.env.EVM_START_BLOCK,
);
this.updateLastSyncBlock(parseInt(process.env.EVM_START_BLOCK));
this.detectInfo.blockNumber = parseInt(process.env.EVM_START_BLOCK) - 1;
} else {
this.logger.warn('running start block from db ' + blockSync.lastSync);
this.detectInfo.blockNumber = blockSync.lastSync + this.confirmBlock;
}
this.detectInfo.flag = false;
}

private async updateLastSyncBlock(blockNumber: number): Promise<void> {
// Update the last sync block in MongoDB
await this.blockSyncService.updateLastSync(this.rpcUrl, blockNumber);
}

@Cron(CronExpression.EVERY_10_SECONDS, {
disabled: process.env.EVM_DISABLE === 'true',
})
async ethPollingBlock() {
console.log('Start detect block');
if (this.detectInfo.flag) {
return;
}
this.detectInfo.flag = true;
const lastDetectedBlock = this.detectInfo.blockNumber + 1;

// Get the latest block number
const latestDetectedBlockNumber = await this.getBlockNumber();

// Scan each block
for (
let blockNumber = lastDetectedBlock;
blockNumber <= latestDetectedBlockNumber;
blockNumber++
) {
try {
this.logger.debug(['DETECT', `Scanning block ${blockNumber}`]);

// emit event detect block with blocknumber
this.workerClient.emit('eth-detect-block', {
blockNumber: blockNumber,
});

// emit event confirm block with block number - confirm block
this.workerClient.emit('eth-confirm-block', {
blockNumber: blockNumber - this.confirmBlock,
});

//only update last sync for confirm
await this.updateLastSyncBlock(blockNumber - this.confirmBlock);
} catch (error) {
this.logger.error([
'DETECT',
`Error scanning block ${blockNumber}:`,
error,
]);
break;
}
}

this.detectInfo.flag = false;
return;
}

private async getBlockNumber(): Promise<number> {
try {
const blockNumber = await this.provider.getBlockNumber();
return blockNumber;
} catch (error) {
this.logger.error('error while getting block number', error);
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export type BlockSyncDocument = HydratedDocument<BlockSync>;

@Schema()
export class BlockSync {
@Prop({ required: true, unique: true, lowercase: true })
@Prop({ required: true, unique: true, lowercase: true, index: true })
rpcUrl: string;

@Prop({ default: 'ETH' })
Expand Down
2 changes: 1 addition & 1 deletion app/apps/worker-service/src/worker/evm.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class EthereumWorker {
this.logger.warn(
'force running latest block from network ' + latestBlockNumber,
);
this.blockSyncService.updateLastSync(this.rpcUrl, latestBlockNumber);
this.updateLastSyncBlock(latestBlockNumber);
this.detectInfo.blockNumber = latestBlockNumber - 1;
this.confirmInfo.blockNumber = latestBlockNumber - 1;
} else if (startBlockConfig === 'config') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ export class MonitorCondition {
@Prop()
native: boolean;

@Prop()
internal: boolean;

@Prop()
erc721: boolean;

Expand All @@ -23,7 +26,9 @@ export class MonitorCondition {

@Prop({ type: Object })
cryptos: {
[key: string]: boolean;
[key: string]: {
filterValue: FilterRange; // @todo check this later
};
};

// ALL = 'ALL', // native, erc20, erc721, erc1155
Expand Down Expand Up @@ -51,7 +56,7 @@ export class NotificationMethod {
// note: only apply for purchased user
// tokens not having price will be excluded in when calculating USD value
// filter value in USD
export class FilterValue {
export class FilterRange {
@Prop()
min: bigint;
@Prop()
Expand Down Expand Up @@ -80,9 +85,6 @@ export class EthMonitor {
@Prop({ required: true })
notificationMethods: NotificationMethod[];

@Prop({ required: false })
filter: FilterValue;

@Prop({ require: false, maxlength: 200 })
note: string;

Expand Down

0 comments on commit 218f661

Please sign in to comment.