From 60a428860ab4e9118a66da4fec8790b5b9b5920a Mon Sep 17 00:00:00 2001 From: Kishore <42832651+kishore03109@users.noreply.github.com> Date: Tue, 18 Jun 2024 11:04:26 +0800 Subject: [PATCH] chore(monitoring): refactor service & worker --- common/index.ts | 5 - src/monitoring/MonitoringService.ts | 109 +++++++++++++++++ .../{index.ts => MonitoringWorker.ts} | 111 +++--------------- support/index.ts | 19 +-- 4 files changed, 136 insertions(+), 108 deletions(-) create mode 100644 src/monitoring/MonitoringService.ts rename src/monitoring/{index.ts => MonitoringWorker.ts} (66%) diff --git a/common/index.ts b/common/index.ts index d0e7e976c..21f22bbc1 100644 --- a/common/index.ts +++ b/common/index.ts @@ -27,7 +27,6 @@ import { Reviewer, ReviewRequestView, } from "@database/models" -import MonitoringService from "@root/monitoring" import AuditLogsService from "@root/services/admin/AuditLogsService" import RepoManagementService from "@root/services/admin/RepoManagementService" import GitFileCommitService from "@root/services/db/GitFileCommitService" @@ -249,7 +248,3 @@ export const auditLogsService = new AuditLogsService({ sitesService, usersService, }) - -export const monitoringService = new MonitoringService({ - launchesService, -}) diff --git a/src/monitoring/MonitoringService.ts b/src/monitoring/MonitoringService.ts new file mode 100644 index 000000000..3a411a9f2 --- /dev/null +++ b/src/monitoring/MonitoringService.ts @@ -0,0 +1,109 @@ +import autoBind from "auto-bind" +import { Job, Queue, Worker } from "bullmq" +import _ from "lodash" +import { ResultAsync } from "neverthrow" + +import parentLogger from "@logger/logger" +import logger from "@logger/logger" + +import config from "@root/config/config" +import MonitoringError from "@root/errors/MonitoringError" +import convertNeverThrowToPromise from "@root/utils/neverthrow" + +import MonitoringWorker from "./MonitoringWorker" + +const ONE_MINUTE = 60000 +interface MonitoringServiceInterface { + monitoringWorker: MonitoringWorker +} + +export default class MonitoringService { + private readonly monitoringServiceLogger = parentLogger.child({ + module: "monitoringService", + }) + + private readonly REDIS_CONNECTION = { + host: config.get("bullmq.redisHostname"), + port: 6379, + } + + private readonly queue = new Queue("MonitoringQueue", { + connection: { + ...this.REDIS_CONNECTION, + }, + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: true, + attempts: 3, + backoff: { + type: "exponential", + delay: ONE_MINUTE, // this operation is not critical, so we can wait a minute + }, + }, + }) + + private readonly worker: Worker + + private readonly monitoringWorker: MonitoringServiceInterface["monitoringWorker"] + + constructor({ monitoringWorker }: MonitoringServiceInterface) { + this.monitoringWorker = monitoringWorker + autoBind(this) + const jobName = "dnsMonitoring" + + const FIVE_MINUTE_CRON = "5 * * * *" + + const jobData = { + name: "monitoring sites", + } + + ResultAsync.fromPromise( + this.queue.add(jobName, jobData, { + repeat: { + pattern: FIVE_MINUTE_CRON, + }, + }), + (e) => e + ) + .map((okRes) => { + this.monitoringServiceLogger.info( + `Monitoring job scheduled at interval ${FIVE_MINUTE_CRON}` + ) + return okRes + }) + .mapErr((errRes) => { + this.monitoringServiceLogger.error(`Failed to schedule job: ${errRes}`) + }) + + this.worker = new Worker( + this.queue.name, + async (job: Job) => { + this.monitoringServiceLogger.info(`Monitoring Worker ${job.id}`) + if (job.name === jobName) { + // The retry's work on a thrown error, so we need to convert the neverthrow to a promise + const res = await convertNeverThrowToPromise( + this.monitoringWorker.driver() + ) + return res + } + throw new MonitoringError("Invalid job name") + }, + { + connection: { + ...this.REDIS_CONNECTION, + }, + lockDuration: ONE_MINUTE, // since this is a relatively expensive operation + } + ) + + this.worker.on("failed", (job: Job | undefined, error: Error) => { + logger.error({ + message: "Monitoring service has failed", + error, + meta: { + ...job?.data, + }, + }) + }) + } +} diff --git a/src/monitoring/index.ts b/src/monitoring/MonitoringWorker.ts similarity index 66% rename from src/monitoring/index.ts rename to src/monitoring/MonitoringWorker.ts index 0437216e8..91f102211 100644 --- a/src/monitoring/index.ts +++ b/src/monitoring/MonitoringWorker.ts @@ -2,12 +2,10 @@ import { retry } from "@octokit/plugin-retry" import { Octokit } from "@octokit/rest" import autoBind from "auto-bind" import axios from "axios" -import { Job, Queue, Worker } from "bullmq" import _ from "lodash" -import { errAsync, okAsync, ResultAsync } from "neverthrow" +import { ResultAsync, errAsync, okAsync } from "neverthrow" import parentLogger from "@logger/logger" -import logger from "@logger/logger" import config from "@root/config/config" import MonitoringError from "@root/errors/MonitoringError" @@ -15,13 +13,8 @@ import { gb } from "@root/middleware/featureFlag" import LaunchesService from "@root/services/identity/LaunchesService" import { dnsMonitor } from "@root/utils/dns-utils" import { isMonitoringEnabled } from "@root/utils/growthbook-utils" -import convertNeverThrowToPromise from "@root/utils/neverthrow" import promisifyPapaParse from "@root/utils/papa-parse" -interface MonitoringServiceInterface { - launchesService: LaunchesService -} - const IsomerHostedDomainType = { REDIRECTION: "redirection", INDIRECTION: "indirection", @@ -52,93 +45,21 @@ function isKeyCdnResponse(object: unknown): object is KeyCdnZoneAlias[] { if (Array.isArray(object)) return object.every(isKeyCdnZoneAlias) return false } -const ONE_MINUTE = 60000 -export default class MonitoringService { - private readonly launchesService: MonitoringServiceInterface["launchesService"] - private readonly monitoringServiceLogger = parentLogger.child({ - module: "monitoringService", - }) - - private readonly REDIS_CONNECTION = { - host: config.get("bullmq.redisHostname"), - port: 6379, - } +interface MonitoringWorkerInterface { + launchesService: LaunchesService +} - private readonly queue = new Queue("MonitoringQueue", { - connection: { - ...this.REDIS_CONNECTION, - }, - defaultJobOptions: { - removeOnComplete: true, - removeOnFail: true, - attempts: 3, - backoff: { - type: "exponential", - delay: ONE_MINUTE, // this operation is not critical, so we can wait a minute - }, - }, +export default class MonitoringWorker { + private readonly monitoringWorkerLogger = parentLogger.child({ + module: "monitoringWorker", }) - private readonly worker: Worker + private readonly launchesService: MonitoringWorkerInterface["launchesService"] - constructor({ launchesService }: MonitoringServiceInterface) { - autoBind(this) - const jobName = "dnsMonitoring" + constructor({ launchesService }: MonitoringWorkerInterface) { this.launchesService = launchesService - - const FIVE_MINUTE_CRON = "5 * * * *" - - const jobData = { - name: "monitoring sites", - } - - ResultAsync.fromPromise( - this.queue.add(jobName, jobData, { - repeat: { - pattern: FIVE_MINUTE_CRON, - }, - }), - (e) => e - ) - .map((okRes) => { - this.monitoringServiceLogger.info( - `Monitoring job scheduled at interval ${FIVE_MINUTE_CRON}` - ) - return okRes - }) - .mapErr((errRes) => { - this.monitoringServiceLogger.error(`Failed to schedule job: ${errRes}`) - }) - - this.worker = new Worker( - this.queue.name, - async (job: Job) => { - this.monitoringServiceLogger.info(`Monitoring Worker ${job.id}`) - if (job.name === jobName) { - // The retry's work on a thrown error, so we need to convert the neverthrow to a promise - const res = await convertNeverThrowToPromise(this.driver()) - return res - } - throw new MonitoringError("Invalid job name") - }, - { - connection: { - ...this.REDIS_CONNECTION, - }, - lockDuration: 60000, // 1 minute, since this is a relatively expensive operation - } - ) - - this.worker.on("failed", (job: Job | undefined, error: Error) => { - logger.error({ - message: "Monitoring service has failed", - error, - meta: { - ...job?.data, - }, - }) - }) + autoBind(this) } getKeyCdnDomains() { @@ -228,7 +149,7 @@ export default class MonitoringService { * of any subdomains and redirects. */ getAllDomains() { - this.monitoringServiceLogger.info("Fetching all domains") + this.monitoringWorkerLogger.info("Fetching all domains") return ResultAsync.combine([ this.getAmplifyDeployments().mapErr( (err) => new MonitoringError(err.message) @@ -236,7 +157,7 @@ export default class MonitoringService { this.getRedirectionDomains(), this.getKeyCdnDomains(), ]).andThen(([amplifyDeployments, redirectionDomains, keyCdnDomains]) => { - this.monitoringServiceLogger.info("Fetched all domains") + this.monitoringWorkerLogger.info("Fetched all domains") return okAsync( _.sortBy( [...amplifyDeployments, ...redirectionDomains, ...keyCdnDomains], @@ -256,19 +177,19 @@ export default class MonitoringService { driver() { if (!isMonitoringEnabled(gb)) return okAsync("Monitoring Service disabled") const start = Date.now() - this.monitoringServiceLogger.info("Monitoring service started") + this.monitoringWorkerLogger.info("Monitoring service started") return this.getAllDomains() .andThen(this.generateReportCard) .mapErr((reportCardErr: MonitoringError | string[]) => { if (reportCardErr instanceof MonitoringError) { - this.monitoringServiceLogger.error({ + this.monitoringWorkerLogger.error({ error: reportCardErr, message: "Error running monitoring service", }) return } - this.monitoringServiceLogger.error({ + this.monitoringWorkerLogger.error({ message: "Error running monitoring service", meta: { dnsCheckerResult: reportCardErr, @@ -278,7 +199,7 @@ export default class MonitoringService { }) .orElse(() => okAsync([])) .andThen(() => { - this.monitoringServiceLogger.info( + this.monitoringWorkerLogger.info( `Monitoring service completed in ${Date.now() - start}ms` ) return okAsync("Monitoring service completed") diff --git a/support/index.ts b/support/index.ts index 3c8e454d5..535aeab29 100644 --- a/support/index.ts +++ b/support/index.ts @@ -2,16 +2,12 @@ import "module-alias/register" import express from "express" -import { - infraService, - launchesService, - monitoringService, - sequelize, -} from "@common/index" +import { infraService, launchesService, sequelize } from "@common/index" import { useSharedMiddleware } from "@common/middleware" import { config } from "@root/config/config" import logger from "@root/logger/logger" -import MonitoringService from "@root/monitoring" +import MonitoringService from "@root/monitoring/MonitoringService" +import MonitoringWorker from "@root/monitoring/monitoringWorker" import { ROUTE_VERSION } from "./constants" import { v2Router } from "./routes" @@ -24,7 +20,14 @@ const app = express() // poller site launch updates infraService.pollMessages() -monitoringService.driver() +// only needed for support container +export const monitoringWorker = new MonitoringWorker({ + launchesService, +}) + +export const monitoringService = new MonitoringService({ + monitoringWorker, +}) const ROUTE_PREFIX_ISOBOT = `/${ROUTE_VERSION}/isobot` app.use(ROUTE_PREFIX_ISOBOT, isobotRouter)