Skip to content

Commit

Permalink
feat: added one time jobs feature
Browse files Browse the repository at this point in the history
  • Loading branch information
mckrava committed Feb 20, 2024
1 parent c212d4c commit 7fe33c8
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 35 deletions.
5 changes: 3 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import config from './modulesConfig';
import { PlatformBootstrapperModule } from './platformBootstrapper/platformBootstrapper.module';
import { ScheduleModule } from '@nestjs/schedule';
import { AccountSyncSchedulerModule } from './modules/accountSyncScheduler/accountSyncScheduler.module';
import { AggregatorStateManagerModule } from './modules/aggregatorStateManager/aggregatorStateManager.module';

dotenv.config();

Expand Down Expand Up @@ -50,10 +51,10 @@ dotenv.config();
TransferNativeModule,
VoteNativeModule,
PlatformBootstrapperModule,
AccountSyncSchedulerModule
AccountSyncSchedulerModule,
AggregatorStateManagerModule,
],
})

export class AppModule implements NestModule {
configure(consumer: MiddlewareConsumer) {
// consumer.apply(ApiToggleMiddleware).forRoutes('*');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AggregatorState } from './entities/aggregatorState.entity';
import { AggregatorStateManagerService } from './aggregatorStateManager.service';
import { OneTimeJobsManagerService } from './oneTimeJobsManager.service';
import { AccountAggregationFlowProducer } from '../queueProcessor/services/producers/accountAggregationFlow.producer';
import { DatasourceChunksParallelHandlingProducer } from '../queueProcessor/services/producers/datasourceChunksParallelHandling.producer';
import { DatasourceHandlingProducer } from '../queueProcessor/services/producers/datasourceHandling.producer';
import { registerBullQueues } from '../../modulesConfig/bullModule.forRoot';

@Module({
imports: [TypeOrmModule.forFeature([AggregatorState]), registerBullQueues()],
providers: [
AggregatorStateManagerService,
OneTimeJobsManagerService,
AccountAggregationFlowProducer,
DatasourceChunksParallelHandlingProducer,
DatasourceHandlingProducer,
],
exports: [AggregatorStateManagerService, OneTimeJobsManagerService],
})
export class AggregatorStateManagerModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { In, Repository } from 'typeorm';
import { AggregatorState } from './entities/aggregatorState.entity';

@Injectable()
export class AggregatorStateManagerService {
constructor(
@InjectRepository(AggregatorState)
public readonly aggregatorStateRepository: Repository<AggregatorState>,
) {}

async getOrCreateAggregatorState() {
let existingState = await this.aggregatorStateRepository.findOne({
where: { id: '1' },
});
if (existingState) return existingState;
existingState = new AggregatorState();
existingState.id = '1';
existingState.oneTimeJobs = [];
await this.aggregatorStateRepository.save(existingState);
return existingState;
}

async updateDataSourcesState(newState: Partial<AggregatorState>) {
const stateEntity = await this.getOrCreateAggregatorState();

if ('oneTimeJobs' in newState)
stateEntity.oneTimeJobs = newState.oneTimeJobs;

await this.aggregatorStateRepository.save(stateEntity);
return stateEntity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Column, Entity, PrimaryColumn } from 'typeorm';
import { Field } from '@nestjs/graphql';

@Entity()
export class AggregatorState {
@PrimaryColumn()
id: string;

@Column('text', {
array: true,
nullable: false,
default: [],
name: 'one_time_jobs',
})
oneTimeJobs?: string[];
}
92 changes: 92 additions & 0 deletions src/modules/aggregatorStateManager/oneTimeJobsManager.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Injectable } from '@nestjs/common';
import { AggregatorStateManagerService } from './aggregatorStateManager.service';
import { AccountAggregationFlowProducer } from '../queueProcessor/services/producers/accountAggregationFlow.producer';
import { DatasourceChunksParallelHandlingProducer } from '../queueProcessor/services/producers/datasourceChunksParallelHandling.producer';
import { DatasourceHandlingProducer } from '../queueProcessor/services/producers/datasourceHandling.producer';

type TerminateAllActiveJobsJobPayload = {};

type OneTimeJob = { id: string; action: string } & {
payload: TerminateAllActiveJobsJobPayload;
};

const oneTimeJobsList: OneTimeJob[] = [
{
id: '1707177600',
action: 'activeStakingHandleDailyAggregationJob',
payload: {},
},
];

@Injectable()
export class OneTimeJobsManagerService {
constructor(
public aggregatorStateManagerService: AggregatorStateManagerService,
public accountAggregationFlowProducer: AccountAggregationFlowProducer,
public datasourceChunksParallelHandlingProducer: DatasourceChunksParallelHandlingProducer,
public datasourceHandlingProducer: DatasourceHandlingProducer,
) {}

async runOneTimeJobs() {
const migrationsMap = new Map(
oneTimeJobsList.map((item) => [item.id, item]),
);
let pendingMigrationIds: string[] = [];
const aggregatorState =
await this.aggregatorStateManagerService.getOrCreateAggregatorState();

if (
!aggregatorState.oneTimeJobs ||
aggregatorState.oneTimeJobs.length === 0
) {
pendingMigrationIds = [...migrationsMap.keys()];
} else {
for (const item of aggregatorState.oneTimeJobs) {
migrationsMap.delete(item);
}
pendingMigrationIds = [...migrationsMap.keys()];
}

if (!pendingMigrationIds || pendingMigrationIds.length === 0) {
console.log(`OneTime Jobs :: No pending OneTime Job found.`);
return;
}

const processedJobs = [];

console.log('pendingOneTimeJobIds - ', pendingMigrationIds);

for (const jobId of pendingMigrationIds) {
const jobDetails = migrationsMap.get(jobId);

switch (jobDetails.action) {
case 'activeStakingHandleDailyAggregationJob': {
try {
await this.accountAggregationFlowProducer.removeAllActiveJobs();
await this.datasourceChunksParallelHandlingProducer.removeAllActiveJobs();
await this.datasourceHandlingProducer.removeAllActiveJobs();

processedJobs.push(jobId);
console.log(
`OneTime Jobs :: OneTime Job has been completed with details: [id: ${jobDetails.id} // action: ${jobDetails.action}]`,
);
} catch (e) {
console.log(
`OneTime Jobs :: OneTime Job ${jobId} has been processed with error.`,
);
console.log(e);
}
break;
}

default:
}
}

await this.aggregatorStateManagerService.updateDataSourcesState({
oneTimeJobs: [
...new Set([...aggregatorState.oneTimeJobs, ...processedJobs]).values(),
],
});
}
}
12 changes: 6 additions & 6 deletions src/modules/dataAggregator/services/aggregation.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ export class AggregationHelper {

const runQuery = async (offset: number = 0) => {
const currentOffset = offset;
console.log(
`${pubicKeyShort} :: query START :: ${inputData.blockchainTag} :: ${inputData.chunkStartBlock}/${inputData.chunkEndBlock} :: offset ${currentOffset}`,
);
// console.log(
// `${pubicKeyShort} :: query START :: ${inputData.blockchainTag} :: ${inputData.chunkStartBlock}/${inputData.chunkEndBlock} :: offset ${currentOffset}`,
// );

const resp = await this.dataSourceUtils.getTransfersByAccount({
blockchainTag: inputData.blockchainTag,
Expand All @@ -243,9 +243,9 @@ export class AggregationHelper {
blockNumber_lt: inputData.chunkEndBlock,
queryUrl: inputData.sourceUrl,
});
console.log(
`${pubicKeyShort} :: query COMPLETED :: ${inputData.blockchainTag} :: ${inputData.chunkStartBlock}/${inputData.chunkEndBlock} `,
);
// console.log(
// `${pubicKeyShort} :: query COMPLETED :: ${inputData.blockchainTag} :: ${inputData.chunkStartBlock}/${inputData.chunkEndBlock} `,
// );
if (resp.transfers.length === 0) return;
responseBuffer.push(...resp.transfers);

Expand Down
23 changes: 2 additions & 21 deletions src/modules/queueProcessor/queueProcessor.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { HistoryUpdateSubscription } from '../accountSyncScheduler/entities/hist
import { DatasourceChunksParallelHandlingProducer } from './services/producers/datasourceChunksParallelHandling.producer';
import { join } from 'path';
import { DatasourceChunkParallelHandlingConsumer } from './services/consumers/datasourceChunkParallelHandling.consumer';
import { registerBullQueues } from '../../modulesConfig/bullModule.forRoot';

@Module({
imports: [
Expand All @@ -38,27 +39,7 @@ import { DatasourceChunkParallelHandlingConsumer } from './services/consumers/da
AccountTransaction,
HistoryUpdateSubscription,
]),
BullModule.registerQueue(
{
name: SubIdAggregatorQueueName.ACCOUNT_AGGREGATION_FLOW,
},
{
name: SubIdAggregatorQueueName.DATASOURCE_HANDLING,
},
{
name: SubIdAggregatorQueueName.DATASOURCE_CHUNKS_PARALLEL_HANDLING,
// processors: [
// {
// concurrency: 101,
// name: 'TRANSFER_CHUNK',
// path: join(
// __dirname,
// 'services/workers/collectTransfersDataChunk.worker.js',
// ),
// },
// ],
},
),
registerBullQueues(),
BullBoardModule.forFeature(
{
name: SubIdAggregatorQueueName.ACCOUNT_AGGREGATION_FLOW,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ export class AccountAggregationFlowConsumer {
await this.dataAggregatorService.handleRefreshAccountTransactionsHistory(
job,
);
console.log(
'REFRESH_TX_HISTORY_FOR_ACCOUNT_SCHEDULED:: handleRefreshAccountTransactionsHistory completed',
);
// console.log(
// 'REFRESH_TX_HISTORY_FOR_ACCOUNT_SCHEDULED:: handleRefreshAccountTransactionsHistory completed',
// );

await job.releaseLock();
await job.moveToCompleted('done', true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,12 @@ export class AccountAggregationFlowProducer {

return job;
}

async removeAllActiveJobs() {
const allJobs = await this.accountAggregationFlowQueue.getJobs(['active']);

for (const job of allJobs) {
await job.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,14 @@ export class DatasourceChunksParallelHandlingProducer {
}
});
}

async removeAllActiveJobs() {
const allJobs = await this.datasourceChunksParallelHandlingQueue.getJobs([
'active',
]);

for (const job of allJobs) {
await job.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ export class DatasourceHandlingProducer {
},
);
}
async removeAllActiveJobs() {
const allJobs = await this.datasourceHandlingQueue.getJobs(['active']);

for (const job of allJobs) {
await job.remove();
}
}

//
// async enqueueAndWaitCollectTransferEventDataChunkJobProducer(
// requestData: CollectEventDataChunkFromDataSourceInput,
Expand Down
16 changes: 16 additions & 0 deletions src/modulesConfig/bullModule.forRoot.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { AppConfig } from '../config.module';
import { Redis } from 'ioredis';
import { SharedBullAsyncConfiguration } from '@nestjs/bull/dist/interfaces';
import { SubIdAggregatorQueueName } from '../constants/queues';
import { BullModule } from '@nestjs/bull';

export default {
inject: [AppConfig],
Expand Down Expand Up @@ -69,3 +71,17 @@ export default {
};
},
} as SharedBullAsyncConfiguration;

export function registerBullQueues() {
return BullModule.registerQueue(
{
name: SubIdAggregatorQueueName.ACCOUNT_AGGREGATION_FLOW,
},
{
name: SubIdAggregatorQueueName.DATASOURCE_HANDLING,
},
{
name: SubIdAggregatorQueueName.DATASOURCE_CHUNKS_PARALLEL_HANDLING,
},
);
}
2 changes: 2 additions & 0 deletions src/modulesConfig/typeOrmModule.forRoot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { RewardNative } from '../modules/entities/rewardNative/entities/rewardNa
import { Transaction } from '../modules/entities/transaction/entities/transaction.entity';
import { TransferNative } from '../modules/entities/transferNative/entities/transferNative.entity';
import { VoteNative } from '../modules/entities/voteNative/entities/voteNative.entity';
import { AggregatorState } from '../modules/aggregatorStateManager/entities/aggregatorState.entity';

export default {
inject: [AppConfig],
Expand Down Expand Up @@ -47,6 +48,7 @@ export default {
Transaction,
TransferNative,
VoteNative,
AggregatorState,
],
};
},
Expand Down
8 changes: 7 additions & 1 deletion src/platformBootstrapper/common.bootstrapper.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
import { BlockchainService } from '../modules/entities/blockchain/blockchain.service';
import { OneTimeJobsManagerService } from '../modules/aggregatorStateManager/oneTimeJobsManager.service';

@Injectable()
export class CommonBootstrapperService implements OnApplicationBootstrap {
constructor(private blockchainService: BlockchainService) {}
constructor(
private blockchainService: BlockchainService,
private oneTimeJobsManagerService: OneTimeJobsManagerService,
) {}

async onApplicationBootstrap(): Promise<void> {
await this.blockchainService.initSupportedBlockchains();

await this.oneTimeJobsManagerService.runOneTimeJobs();
}
}
Loading

0 comments on commit 7fe33c8

Please sign in to comment.