Skip to content

Commit

Permalink
chore(monitoring): refactor service & worker
Browse files Browse the repository at this point in the history
  • Loading branch information
kishore03109 committed Jun 18, 2024
1 parent 46b5c6e commit 60a4288
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 108 deletions.
5 changes: 0 additions & 5 deletions common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import {
Reviewer,
ReviewRequestView,
} from "@database/models"
import MonitoringService from "@root/monitoring"
import AuditLogsService from "@root/services/admin/AuditLogsService"
import RepoManagementService from "@root/services/admin/RepoManagementService"
import GitFileCommitService from "@root/services/db/GitFileCommitService"
Expand Down Expand Up @@ -249,7 +248,3 @@ export const auditLogsService = new AuditLogsService({
sitesService,
usersService,
})

export const monitoringService = new MonitoringService({
launchesService,
})
109 changes: 109 additions & 0 deletions src/monitoring/MonitoringService.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import autoBind from "auto-bind"
import { Job, Queue, Worker } from "bullmq"
import _ from "lodash"
import { ResultAsync } from "neverthrow"

import parentLogger from "@logger/logger"
import logger from "@logger/logger"

import config from "@root/config/config"
import MonitoringError from "@root/errors/MonitoringError"
import convertNeverThrowToPromise from "@root/utils/neverthrow"

import MonitoringWorker from "./MonitoringWorker"

const ONE_MINUTE = 60000
interface MonitoringServiceInterface {
monitoringWorker: MonitoringWorker
}

export default class MonitoringService {
private readonly monitoringServiceLogger = parentLogger.child({
module: "monitoringService",
})

private readonly REDIS_CONNECTION = {
host: config.get("bullmq.redisHostname"),
port: 6379,
}

private readonly queue = new Queue("MonitoringQueue", {
connection: {
...this.REDIS_CONNECTION,
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: {
type: "exponential",
delay: ONE_MINUTE, // this operation is not critical, so we can wait a minute
},
},
})

private readonly worker: Worker<unknown, string, string>

private readonly monitoringWorker: MonitoringServiceInterface["monitoringWorker"]

constructor({ monitoringWorker }: MonitoringServiceInterface) {
this.monitoringWorker = monitoringWorker
autoBind(this)
const jobName = "dnsMonitoring"

const FIVE_MINUTE_CRON = "5 * * * *"

const jobData = {
name: "monitoring sites",
}

ResultAsync.fromPromise(
this.queue.add(jobName, jobData, {
repeat: {
pattern: FIVE_MINUTE_CRON,
},
}),
(e) => e
)
.map((okRes) => {
this.monitoringServiceLogger.info(
`Monitoring job scheduled at interval ${FIVE_MINUTE_CRON}`
)
return okRes
})
.mapErr((errRes) => {
this.monitoringServiceLogger.error(`Failed to schedule job: ${errRes}`)
})

this.worker = new Worker(
this.queue.name,
async (job: Job) => {
this.monitoringServiceLogger.info(`Monitoring Worker ${job.id}`)
if (job.name === jobName) {
// The retry's work on a thrown error, so we need to convert the neverthrow to a promise
const res = await convertNeverThrowToPromise(
this.monitoringWorker.driver()
)
return res
}
throw new MonitoringError("Invalid job name")
},
{
connection: {
...this.REDIS_CONNECTION,
},
lockDuration: ONE_MINUTE, // since this is a relatively expensive operation
}
)

this.worker.on("failed", (job: Job | undefined, error: Error) => {
logger.error({
message: "Monitoring service has failed",
error,
meta: {
...job?.data,
},
})
})
}
}
111 changes: 16 additions & 95 deletions src/monitoring/index.ts → src/monitoring/MonitoringWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@ import { retry } from "@octokit/plugin-retry"
import { Octokit } from "@octokit/rest"
import autoBind from "auto-bind"
import axios from "axios"
import { Job, Queue, Worker } from "bullmq"
import _ from "lodash"
import { errAsync, okAsync, ResultAsync } from "neverthrow"
import { ResultAsync, errAsync, okAsync } from "neverthrow"

import parentLogger from "@logger/logger"
import logger from "@logger/logger"

import config from "@root/config/config"
import MonitoringError from "@root/errors/MonitoringError"
import { gb } from "@root/middleware/featureFlag"
import LaunchesService from "@root/services/identity/LaunchesService"
import { dnsMonitor } from "@root/utils/dns-utils"
import { isMonitoringEnabled } from "@root/utils/growthbook-utils"
import convertNeverThrowToPromise from "@root/utils/neverthrow"
import promisifyPapaParse from "@root/utils/papa-parse"

interface MonitoringServiceInterface {
launchesService: LaunchesService
}

const IsomerHostedDomainType = {
REDIRECTION: "redirection",
INDIRECTION: "indirection",
Expand Down Expand Up @@ -52,93 +45,21 @@ function isKeyCdnResponse(object: unknown): object is KeyCdnZoneAlias[] {
if (Array.isArray(object)) return object.every(isKeyCdnZoneAlias)
return false
}
const ONE_MINUTE = 60000
export default class MonitoringService {
private readonly launchesService: MonitoringServiceInterface["launchesService"]

private readonly monitoringServiceLogger = parentLogger.child({
module: "monitoringService",
})

private readonly REDIS_CONNECTION = {
host: config.get("bullmq.redisHostname"),
port: 6379,
}
interface MonitoringWorkerInterface {
launchesService: LaunchesService
}

private readonly queue = new Queue("MonitoringQueue", {
connection: {
...this.REDIS_CONNECTION,
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: {
type: "exponential",
delay: ONE_MINUTE, // this operation is not critical, so we can wait a minute
},
},
export default class MonitoringWorker {
private readonly monitoringWorkerLogger = parentLogger.child({
module: "monitoringWorker",
})

private readonly worker: Worker<unknown, string, string>
private readonly launchesService: MonitoringWorkerInterface["launchesService"]

constructor({ launchesService }: MonitoringServiceInterface) {
autoBind(this)
const jobName = "dnsMonitoring"
constructor({ launchesService }: MonitoringWorkerInterface) {
this.launchesService = launchesService

const FIVE_MINUTE_CRON = "5 * * * *"

const jobData = {
name: "monitoring sites",
}

ResultAsync.fromPromise(
this.queue.add(jobName, jobData, {
repeat: {
pattern: FIVE_MINUTE_CRON,
},
}),
(e) => e
)
.map((okRes) => {
this.monitoringServiceLogger.info(
`Monitoring job scheduled at interval ${FIVE_MINUTE_CRON}`
)
return okRes
})
.mapErr((errRes) => {
this.monitoringServiceLogger.error(`Failed to schedule job: ${errRes}`)
})

this.worker = new Worker(
this.queue.name,
async (job: Job) => {
this.monitoringServiceLogger.info(`Monitoring Worker ${job.id}`)
if (job.name === jobName) {
// The retry's work on a thrown error, so we need to convert the neverthrow to a promise
const res = await convertNeverThrowToPromise(this.driver())
return res
}
throw new MonitoringError("Invalid job name")
},
{
connection: {
...this.REDIS_CONNECTION,
},
lockDuration: 60000, // 1 minute, since this is a relatively expensive operation
}
)

this.worker.on("failed", (job: Job | undefined, error: Error) => {
logger.error({
message: "Monitoring service has failed",
error,
meta: {
...job?.data,
},
})
})
autoBind(this)
}

getKeyCdnDomains() {
Expand Down Expand Up @@ -228,15 +149,15 @@ export default class MonitoringService {
* of any subdomains and redirects.
*/
getAllDomains() {
this.monitoringServiceLogger.info("Fetching all domains")
this.monitoringWorkerLogger.info("Fetching all domains")
return ResultAsync.combine([
this.getAmplifyDeployments().mapErr(
(err) => new MonitoringError(err.message)
),
this.getRedirectionDomains(),
this.getKeyCdnDomains(),
]).andThen(([amplifyDeployments, redirectionDomains, keyCdnDomains]) => {
this.monitoringServiceLogger.info("Fetched all domains")
this.monitoringWorkerLogger.info("Fetched all domains")
return okAsync(
_.sortBy(
[...amplifyDeployments, ...redirectionDomains, ...keyCdnDomains],
Expand All @@ -256,19 +177,19 @@ export default class MonitoringService {
driver() {
if (!isMonitoringEnabled(gb)) return okAsync("Monitoring Service disabled")
const start = Date.now()
this.monitoringServiceLogger.info("Monitoring service started")
this.monitoringWorkerLogger.info("Monitoring service started")

return this.getAllDomains()
.andThen(this.generateReportCard)
.mapErr((reportCardErr: MonitoringError | string[]) => {
if (reportCardErr instanceof MonitoringError) {
this.monitoringServiceLogger.error({
this.monitoringWorkerLogger.error({
error: reportCardErr,
message: "Error running monitoring service",
})
return
}
this.monitoringServiceLogger.error({
this.monitoringWorkerLogger.error({
message: "Error running monitoring service",
meta: {
dnsCheckerResult: reportCardErr,
Expand All @@ -278,7 +199,7 @@ export default class MonitoringService {
})
.orElse(() => okAsync([]))
.andThen(() => {
this.monitoringServiceLogger.info(
this.monitoringWorkerLogger.info(
`Monitoring service completed in ${Date.now() - start}ms`
)
return okAsync("Monitoring service completed")
Expand Down
19 changes: 11 additions & 8 deletions support/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@ import "module-alias/register"

import express from "express"

import {
infraService,
launchesService,
monitoringService,
sequelize,
} from "@common/index"
import { infraService, launchesService, sequelize } from "@common/index"
import { useSharedMiddleware } from "@common/middleware"
import { config } from "@root/config/config"
import logger from "@root/logger/logger"
import MonitoringService from "@root/monitoring"
import MonitoringService from "@root/monitoring/MonitoringService"
import MonitoringWorker from "@root/monitoring/monitoringWorker"

Check failure on line 10 in support/index.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find module '@root/monitoring/monitoringWorker' or its corresponding type declarations.

Check failure on line 10 in support/index.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find module '@root/monitoring/monitoringWorker' or its corresponding type declarations.

import { ROUTE_VERSION } from "./constants"
import { v2Router } from "./routes"
Expand All @@ -24,7 +20,14 @@ const app = express()
// poller site launch updates
infraService.pollMessages()

monitoringService.driver()
// only needed for support container
export const monitoringWorker = new MonitoringWorker({
launchesService,
})

export const monitoringService = new MonitoringService({
monitoringWorker,
})

const ROUTE_PREFIX_ISOBOT = `/${ROUTE_VERSION}/isobot`
app.use(ROUTE_PREFIX_ISOBOT, isobotRouter)
Expand Down

0 comments on commit 60a4288

Please sign in to comment.