Skip to content

Commit

Permalink
add multichannel, split farm vs whale mons, updated deps etc
Browse files Browse the repository at this point in the history
  • Loading branch information
G authored and tsmbl committed May 25, 2022
1 parent 54f4297 commit 74e2162
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 99 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"test:e2e": "jest --config ./test/jest-e2e.json"
},
"dependencies": {
"@dialectlabs/monitor": "^1.2.12",
"@dialectlabs/monitor": "^2.0.6",
"@dialectlabs/web3": "^0.2.0",
"@gokiprotocol/client": "^0.6.1",
"@nestjs/common": "^8.0.0",
Expand Down Expand Up @@ -64,7 +64,7 @@
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-prettier": "^4.0.0",
"jest": "^27.2.5",
"prettier": "^2.3.2",
"prettier": "^2.6.2",
"source-map-support": "^0.5.20",
"supertest": "^6.1.3",
"ts-jest": "^27.0.3",
Expand Down
6 changes: 4 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Module } from '@nestjs/common';
import { SaberMonitoringService } from './monitor/saber-monitoring-service';
import { ScheduleModule } from '@nestjs/schedule';
import { DialectConnection } from './monitor/dialect-connection';
import { WhaleMonitoringService } from './monitor/whale-monitoring-service';
import { FarmMonitoringService } from './monitor/farm-monitoring-service';

@Module({
imports: [ScheduleModule.forRoot()],
Expand All @@ -11,7 +12,8 @@ import { DialectConnection } from './monitor/dialect-connection';
provide: DialectConnection,
useValue: DialectConnection.initialize(),
},
SaberMonitoringService,
WhaleMonitoringService,
FarmMonitoringService,
],
})
export class AppModule {}
126 changes: 126 additions & 0 deletions src/monitor/farm-monitoring-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import {
Injectable,
Logger,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
import {
DialectNotification,
Monitors,
SourceData,
} from '@dialectlabs/monitor';
import {
toDecimals,
} from '../saber-wars-api/saber-wars-api';
import { DialectConnection } from './dialect-connection';
import { Subject } from 'rxjs';
import { QuarryEventSubscription } from '../saber-wars-api/quarry-event-api';
import { getOwner, quarrySDK } from '../saber-wars-api/quarry-sdk-factory';
import { getTokenInfo } from '../saber-wars-api/token-info-api';
import { OnChainSubscriberRepository } from '@dialectlabs/monitor/lib/cjs/internal/on-chain-subscriber.repository';
import { InMemorySubscriberRepository } from '@dialectlabs/monitor/lib/cjs/internal/in-memory-subscriber.repository';
import { PublicKey } from '@solana/web3.js';

@Injectable()
export class FarmMonitoringService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(FarmMonitoringService.name);
private readonly numberFormat = new Intl.NumberFormat('en-US');
constructor(private readonly dialectConnection: DialectConnection) {}

onModuleInit() {
this.initFarmMonitor();
}

async onModuleDestroy() {
await Monitors.shutdown();
}

private initFarmMonitor() {
const onChainSubscriberRepository = new OnChainSubscriberRepository(
this.dialectConnection.getProgram(),
this.dialectConnection.getKeypair(),
);
const subscriberRepository = InMemorySubscriberRepository.decorate(
onChainSubscriberRepository,
);

const quarryEvents = new Subject<SourceData<DialectNotification>>();
new QuarryEventSubscription(quarrySDK.programs.Mine, async (evt) => {
if ((await subscriberRepository.findAll()).length === 0) {
this.logger.warn('No subscribers, skipping event');
return;
}
const resourceId = process.env.TEST_MODE
? (await subscriberRepository.findAll())[0]
: await getOwner(evt.data.authority);
if (evt.name === 'StakeEvent') {
const tokenInfo = await getTokenInfo(evt.data.token);
quarryEvents.next({
data: {
message: `Success! You staked ${this.numberFormat.format(
toDecimals(evt.data.amount, tokenInfo.decimals),
)} ${tokenInfo.symbol} to ${tokenInfo.name}`,
},
groupingKey: resourceId.toBase58(),
});
}
if (evt.name === 'ClaimEvent') {
const tokenInfo = await getTokenInfo(evt.data.stakedToken);
quarryEvents.next({
data: {
message: `Success! You claimed ${this.numberFormat.format(
toDecimals(evt.data.amount, tokenInfo.decimals),
)} ${tokenInfo.symbol} from ${tokenInfo.name}`,
},
groupingKey: resourceId.toBase58(),
});
}
}).start();

const farmMonitor = Monitors.builder({
subscriberRepository,
monitorKeypair: this.dialectConnection.getKeypair(),
dialectProgram: this.dialectConnection.getProgram(),
})
.defineDataSource<DialectNotification>()
.push(quarryEvents)
.notify()
.dialectThread(
({ value }) => {
this.logger.log(`Sending message ${value.message}`);
return {
message: value.message,
};
},
{ dispatch: 'unicast', to: ({ groupingKey }) => new PublicKey(groupingKey) },
)
.telegram(
({ value }) => {
return {
body: `⚔️ SABER: ` + value.message,
};
},
{ dispatch: 'unicast', to: ({ groupingKey }) => new PublicKey(groupingKey) },
)
.sms(
({ value }) => {
return {
body: `⚔️ SABER: ` + value.message,
};
},
{ dispatch: 'unicast', to: ({ groupingKey }) => new PublicKey(groupingKey) },
)
.email(
({ value }) => {
return {
subject: `⚔️ SABER: Succesful ${value.message.includes("claimed") ? "Claim" : "Stake"}`,
text: value.message,
};
},
{ dispatch: 'unicast', to: ({ groupingKey }) => new PublicKey(groupingKey) },
)
.and()
.build();
farmMonitor.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
} from '@nestjs/common';
import {
Context,
DialectNotification,
Monitor,
Monitors,
Pipelines,
Expand All @@ -17,27 +16,19 @@ import { Duration } from 'luxon';
import {
getWarsInfo,
PoolInfo,
toDecimals,
} from '../saber-wars-api/saber-wars-api';
import { NoopSubscriberRepository } from './noop-subscriber-repository';
import { Cron } from '@nestjs/schedule';
import { DialectConnection } from './dialect-connection';
import { Subject } from 'rxjs';
import { QuarryEventSubscription } from '../saber-wars-api/quarry-event-api';
import { getOwner, quarrySDK } from '../saber-wars-api/quarry-sdk-factory';
import { getTokenInfo } from '../saber-wars-api/token-info-api';
import { TwitterNotificationSink } from './twitter-notification-sink';
import { OnChainSubscriberRepository } from '@dialectlabs/monitor/lib/cjs/internal/on-chain-subscriber.repository';
import { InMemorySubscriberRepository } from '@dialectlabs/monitor/lib/cjs/internal/in-memory-subscriber.repository';
import { PublicKey } from '@solana/web3.js';

@Injectable()
export class SaberMonitoringService implements OnModuleInit, OnModuleDestroy {
private readonly notificationSink: TwitterNotificationSink =
export class WhaleMonitoringService implements OnModuleInit, OnModuleDestroy {
private readonly twitterNotificationSink: TwitterNotificationSink =
new TwitterNotificationSink();

private readonly logger = new Logger(SaberMonitoringService.name);
private readonly logger = new Logger(WhaleMonitoringService.name);
private readonly numberFormat = new Intl.NumberFormat('en-US');

constructor(private readonly dialectConnection: DialectConnection) {}

private static getTriggerOutput(trace: Trace[]) {
Expand All @@ -46,7 +37,6 @@ export class SaberMonitoringService implements OnModuleInit, OnModuleDestroy {

onModuleInit() {
this.initWhaleAlertMonitor();
this.initFarmMonitor();
}

@Cron('0 0 11,19 * * *', {
Expand Down Expand Up @@ -84,72 +74,13 @@ Time remaining in epoch: ${epochInfo.currentEpochRemainingTime.toFormat(
)}
⚔️⚔️⚔️⚔️⚔️#SABERWARS⚔️⚔️⚔️⚔️⚔️`;
await this.notificationSink.push({ message });
await this.twitterNotificationSink.push({ message });
}

async onModuleDestroy() {
await Monitors.shutdown();
}

private initFarmMonitor() {
const onChainSubscriberRepository = new OnChainSubscriberRepository(
this.dialectConnection.getProgram(),
this.dialectConnection.getKeypair(),
);
const subscriberRepository = InMemorySubscriberRepository.decorate(
onChainSubscriberRepository,
);

const quarryEvents = new Subject<SourceData<DialectNotification>>();
new QuarryEventSubscription(quarrySDK.programs.Mine, async (evt) => {
if ((await subscriberRepository.findAll()).length === 0) {
this.logger.warn('No subscribers, skipping event');
return;
}
const resourceId = process.env.TEST_MODE
? (await subscriberRepository.findAll())[0]
: await getOwner(evt.data.authority);
if (evt.name === 'StakeEvent') {
const tokenInfo = await getTokenInfo(evt.data.token);
quarryEvents.next({
data: {
message: `Success! You staked ${this.numberFormat.format(
toDecimals(evt.data.amount, tokenInfo.decimals),
)} ${tokenInfo.symbol} to ${tokenInfo.name}`,
},
resourceId,
});
}
if (evt.name === 'ClaimEvent') {
const tokenInfo = await getTokenInfo(evt.data.stakedToken);
quarryEvents.next({
data: {
message: `Success! You claimed ${this.numberFormat.format(
toDecimals(evt.data.amount, tokenInfo.decimals),
)} ${tokenInfo.symbol} from ${tokenInfo.name}`,
},
resourceId,
});
}
}).start();
const builder = Monitors.builder({
subscriberRepository,
monitorKeypair: this.dialectConnection.getKeypair(),
dialectProgram: this.dialectConnection.getProgram(),
})
.defineDataSource<DialectNotification>()
.push(quarryEvents)
.notify()
.dialectThread(({ value }) => {
this.logger.log(`Sending message ${value.message}`);
return value;
})
.and()
.dispatch('unicast');
const monitor: Monitor<DialectNotification> = builder.build();
monitor.start();
}

private initWhaleAlertMonitor() {
const threshold = parseInt(process.env.WHALE_MONITOR_THRESHOLD!);

Expand All @@ -163,7 +94,7 @@ Time remaining in epoch: ${epochInfo.currentEpochRemainingTime.toFormat(
const sourceData: SourceData<PoolInfo>[] = warsInfo.poolsInfo.map(
(data) => ({
data,
resourceId: data.address,
groupingKey: data.address.toBase58(),
}),
);
return Promise.resolve(sourceData);
Expand All @@ -178,15 +109,21 @@ Time remaining in epoch: ${epochInfo.currentEpochRemainingTime.toFormat(
],
})
.notify()
.custom(({ context }) => {
const message = this.createWhaleAlert(context);
this.logger.log(message);
return {
message,
};
}, this.notificationSink)
.custom(
(val) => {
const message = this.createWhaleAlert(val.context);
this.logger.log(message);
return {
message,
};
},
this.twitterNotificationSink,
{
dispatch: 'unicast',
to: (val) => new PublicKey(val.groupingKey),
}
)
.and()
.dispatch('unicast')
.build();
monitor.start();
}
Expand All @@ -195,7 +132,7 @@ Time remaining in epoch: ${epochInfo.currentEpochRemainingTime.toFormat(
trace,
origin: { name: poolName, nextEpochRewardsPerDay: rewardsPerDay },
}: Context<PoolInfo>) {
const triggerOutput = SaberMonitoringService.getTriggerOutput(trace)!;
const triggerOutput = WhaleMonitoringService.getTriggerOutput(trace)!;
return `⚔️🐳🚨 Whale alert! 🚨🐳⚔️
${this.numberFormat.format(
Expand Down
Loading

0 comments on commit 74e2162

Please sign in to comment.