Skip to content

Commit

Permalink
Implement monitor draft
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexey Tsymbal authored and tsmbl committed May 25, 2022
1 parent f2f7df7 commit b45649c
Showing 1 changed file with 71 additions and 103 deletions.
174 changes: 71 additions & 103 deletions src/monitor/saber-monitoring-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,144 +14,67 @@ import {
Trace,
} from '@dialectlabs/monitor';
import { Duration } from 'luxon';
import {
getWarsInfo,
PoolInfo,
toDecimals,
} from '../saber-wars-api/saber-wars-api';
import { getWarsInfo, PoolInfo } from '../saber-wars-api/saber-wars-api';
import { NoopSubscriberRepository } from './noop-subscriber-repository';
import { Cron } from '@nestjs/schedule';
import { DialectConnection } from './dialect-connection';
import { Subject } from 'rxjs';
import { QuarryEventSubscription } from '../saber-wars-api/quarry-event-api';
import { getOwner, quarrySDK } from '../saber-wars-api/quarry-sdk-factory';
import { getTokenInfo } from '../saber-wars-api/token-info-api';
import { TwitterNotificationSink } from './twitter-notification-sink';
import { OnChainSubscriberRepository } from '@dialectlabs/monitor/lib/cjs/internal/on-chain-subscriber.repository';
import { InMemorySubscriberRepository } from '@dialectlabs/monitor/lib/cjs/internal/in-memory-subscriber.repository';
import { quarrySDK } from '../saber-wars-api/quarry-sdk-factory';
import { ConsoleNotificationSink } from './console-notification-sink';

@Injectable()
export class SaberMonitoringService implements OnModuleInit, OnModuleDestroy {
private readonly notificationSink: TwitterNotificationSink =
new TwitterNotificationSink();
private readonly notificationSink: ConsoleNotificationSink<DialectNotification> =
new ConsoleNotificationSink<DialectNotification>();

private readonly logger = new Logger(SaberMonitoringService.name);
private readonly numberFormat = new Intl.NumberFormat('en-US');

constructor(private readonly dialectConnection: DialectConnection) {}

private static getTriggerOutput(trace: Trace[]) {
return trace.find((it) => it.type === 'trigger')?.output;
}

onModuleInit() {
this.initWhaleAlertMonitor();
this.initFarmMonitor();
}

@Cron('0 0 11,19 * * *', {
// @Cron('0,30 * * * * *', {
name: 'notifications',
timeZone: 'America/New_York',
})
async handleCron() {
this.logger.log('Cron execution started');
const { poolsInfo, epochInfo } = await getWarsInfo();

const totalCurrentEpochShare = poolsInfo
.map((it) => it.currentEpochAbsoluteShare)
.reduce((acc, next) => acc + next, 0);
const totalNextEpochShare = poolsInfo
.map((it) => it.nextEpochAbsoluteShare)
.reduce((acc, next) => acc + next, 0);
const voteLeader = poolsInfo[0];
const message = `Epoch ${epochInfo.currentEpoch + 1} progress: ${(
(totalNextEpochShare / totalCurrentEpochShare) *
100
).toFixed(0)}% of votes committed
${this.numberFormat.format(
Math.round(totalNextEpochShare),
)} votes committed (${this.numberFormat.format(
Math.round(totalCurrentEpochShare),
)} cast in epoch ${epochInfo.currentEpoch})
Vote leader: ${voteLeader.name} | ${this.numberFormat.format(
Math.round(voteLeader.nextEpochAbsoluteShare),
)} votes and ${this.numberFormat.format(
Math.round(voteLeader.nextEpochRewardsPerDay),
)} SBR/day
Time remaining in epoch: ${epochInfo.currentEpochRemainingTime.toFormat(
'dd:hh:mm:ss',
)}
⚔️⚔️⚔️⚔️⚔️#SABERWARS⚔️⚔️⚔️⚔️⚔️`;
await this.notificationSink.push({ message });
}

async onModuleDestroy() {
await Monitors.shutdown();
}
constructor(private readonly dialectConnection: DialectConnection) {}

private initFarmMonitor() {
const onChainSubscriberRepository = new OnChainSubscriberRepository(
this.dialectConnection.getProgram(),
this.dialectConnection.getKeypair(),
);
const subscriberRepository = InMemorySubscriberRepository.decorate(
onChainSubscriberRepository,
);

const quarryEvents = new Subject<SourceData<DialectNotification>>();
new QuarryEventSubscription(quarrySDK.programs.Mine, async (evt) => {
if ((await subscriberRepository.findAll()).length === 0) {
this.logger.warn('No subscribers, skipping event');
return;
}
const resourceId = process.env.TEST_MODE
? (await subscriberRepository.findAll())[0]
: await getOwner(evt.data.authority);
const subject = new Subject<SourceData<DialectNotification>>();
new QuarryEventSubscription(quarrySDK.programs.Mine, (evt) => {
if (evt.name === 'StakeEvent') {
const tokenInfo = await getTokenInfo(evt.data.token);
quarryEvents.next({
subject.next({
data: {
message: `Success! You staked ${this.numberFormat.format(
toDecimals(evt.data.amount, tokenInfo.decimals),
)} ${tokenInfo.symbol} to ${tokenInfo.name}`,
message: JSON.stringify(evt),
},
resourceId,
resourceId: evt.data.authority,
});
}
if (evt.name === 'ClaimEvent') {
const tokenInfo = await getTokenInfo(evt.data.stakedToken);
quarryEvents.next({
subject.next({
data: {
message: `Success! You claimed ${this.numberFormat.format(
toDecimals(evt.data.amount, tokenInfo.decimals),
)} ${tokenInfo.symbol} from ${tokenInfo.name}`,
message: JSON.stringify(evt),
},
resourceId,
resourceId: evt.data.authority,
});
}
}).start();
const builder = Monitors.builder({
subscriberRepository,

const monitor: Monitor<DialectNotification> = Monitors.builder({
monitorKeypair: this.dialectConnection.getKeypair(),
dialectProgram: this.dialectConnection.getProgram(),
})
.defineDataSource<DialectNotification>()
.push(quarryEvents)
.push(subject)
.notify()
.dialectThread(({ value }) => {
this.logger.log(`Sending message ${value.message}`);
return value;
})
.custom(({ value }) => value, new ConsoleNotificationSink())
.and()
.dispatch('unicast');
const monitor: Monitor<DialectNotification> = builder.build();
.dispatch('unicast')
.build();
monitor.start();
}

private initWhaleAlertMonitor() {
const threshold = parseInt(process.env.WHALE_MONITOR_THRESHOLD!);
const threshold = parseInt(process.env.WHALE_MONITOR_THRESHOLD);

const monitor: Monitor<PoolInfo> = Monitors.builder({
subscriberRepository: new NoopSubscriberRepository(),
Expand Down Expand Up @@ -179,12 +102,11 @@ Time remaining in epoch: ${epochInfo.currentEpochRemainingTime.toFormat(
})
.notify()
.custom(({ context }) => {
const message = this.createWhaleAlert(context);
this.logger.log(message);
this.logger.log('Building whale alert');
return {
message,
message: this.createWhaleAlert(context),
};
}, this.notificationSink)
}, new ConsoleNotificationSink())
.and()
.dispatch('unicast')
.build();
Expand All @@ -195,7 +117,7 @@ Time remaining in epoch: ${epochInfo.currentEpochRemainingTime.toFormat(
trace,
origin: { name: poolName, nextEpochRewardsPerDay: rewardsPerDay },
}: Context<PoolInfo>) {
const triggerOutput = SaberMonitoringService.getTriggerOutput(trace)!;
const triggerOutput = SaberMonitoringService.getTriggerOutput(trace);
return `⚔️🐳🚨 Whale alert! 🚨🐳⚔️
${this.numberFormat.format(
Expand All @@ -206,4 +128,50 @@ ${this.numberFormat.format(
⚔️⚔️⚔️⚔️⚔️#SABERWARS⚔️⚔️⚔️⚔️⚔️`;
}

private static getTriggerOutput(trace: Trace[]) {
return trace.find((it) => it.type === 'trigger')?.output;
}

@Cron('0 0 11,19 * * *', {
// @Cron('0,30 * * * * *', {
name: 'notifications',
timeZone: 'America/New_York',
})
async handleCron() {
this.logger.log('Cron execution started');
const { poolsInfo, epochInfo } = await getWarsInfo();

const totalCurrentEpochShare = poolsInfo
.map((it) => it.currentEpochAbsoluteShare)
.reduce((acc, next) => acc + next, 0);
const totalNextEpochShare = poolsInfo
.map((it) => it.nextEpochAbsoluteShare)
.reduce((acc, next) => acc + next, 0);
const voteLeader = poolsInfo[0];
const message = `Epoch ${epochInfo.currentEpoch + 1} progress: ${(
(totalNextEpochShare / totalCurrentEpochShare) *
100
).toFixed(0)}% of votes committed
${this.numberFormat.format(
Math.round(totalNextEpochShare),
)} votes committed (${this.numberFormat.format(
Math.round(totalCurrentEpochShare),
)} cast in epoch ${epochInfo.currentEpoch})
Vote leader: ${voteLeader.name} | ${this.numberFormat.format(
Math.round(voteLeader.nextEpochAbsoluteShare),
)} votes and ${this.numberFormat.format(
Math.round(voteLeader.nextEpochRewardsPerDay),
)} SBR/day
Time remaining in epoch: ${epochInfo.currentEpochRemainingTime.toFormat(
'dd:hh:mm:ss',
)}
⚔️⚔️⚔️⚔️⚔️#SABERWARS⚔️⚔️⚔️⚔️⚔️`;
await this.notificationSink.push({ message }, []);
}

async onModuleDestroy() {
await Monitors.shutdown();
}
}

0 comments on commit b45649c

Please sign in to comment.