-
Notifications
You must be signed in to change notification settings - Fork 107
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: implement metric backend (#13)
- Loading branch information
Showing
58 changed files
with
2,917 additions
and
940 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
name: Release Preview | ||
|
||
on: | ||
push: | ||
tags-ignore: | ||
- '**' | ||
branches: | ||
- 'main' | ||
paths-ignore: | ||
- 'docs*/**' | ||
- '*.md' | ||
|
||
concurrency: | ||
group: ${{github.workflow}}-${{github.head_ref}} | ||
cancel-in-progress: false | ||
|
||
env: | ||
CI: true | ||
|
||
jobs: | ||
version: | ||
timeout-minutes: 10 | ||
runs-on: ubuntu-latest | ||
steps: | ||
- uses: actions/checkout@v3 | ||
with: | ||
fetch-depth: 0 | ||
|
||
- uses: ./.github/actions/node | ||
with: | ||
working-directory: ${{ env.WORKING_DIRECTORY }} | ||
repo-token: ${{ secrets.GITHUB_TOKEN }} | ||
|
||
- name: Install | ||
run: | | ||
pnpm add -g @lerna-lite/[email protected] @lerna-lite/[email protected] @lerna-lite/[email protected] @commitlint/[email protected] | ||
- name: Generate next release (dry-run) | ||
run: pnpm run release-preview --yes | ||
env: | ||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} | ||
|
||
- name: Show CHANGELOG.md | ||
run: echo -e "\`\`\`diff\n$(git --no-pager diff './packages/*/CHANGELOG.md')\n\`\`\`" >> $GITHUB_STEP_SUMMARY |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import tls from 'node:tls'; | ||
import { readFile } from 'node:fs/promises'; | ||
import fp from 'fastify-plugin'; | ||
import PgBoss from 'pg-boss'; | ||
import './pgboss.js'; | ||
|
||
declare module 'fastify' { | ||
interface FastifyInstance { | ||
pgboss: PgBoss; | ||
} | ||
} | ||
|
||
export interface PgBossOptions { | ||
databaseConnectionUrl: string; | ||
ssl?: { | ||
// Necessary only if the server uses a self-signed certificate. | ||
caPath?: string; | ||
// Necessary only if the server requires client certificate authentication. | ||
keyPath?: string; | ||
certPath?: string; | ||
}; | ||
} | ||
|
||
export default fp<PgBossOptions>(async function PgBossPlugin(fastify, opts) { | ||
const config: PgBoss.ConstructorOptions = { | ||
connectionString: opts.databaseConnectionUrl, | ||
application_name: 'controlplane', | ||
// How many days a job may be in created or retry state before it's archived. Must be >=1 | ||
retentionDays: 30, | ||
// How many minutes a job may be in active state before it is failed because of expiration. Must be >=1 | ||
expireInMinutes: 15, | ||
// Specifies how long in seconds completed jobs get archived (12 hours). | ||
archiveCompletedAfterSeconds: 12 * 60 * 60, | ||
// Specifies how long in seconds failed jobs get archived (12 hours). | ||
archiveFailedAfterSeconds: 12 * 60 * 60, | ||
// When jobs in the archive table become eligible for deletion. | ||
deleteAfterDays: 30, | ||
// How often maintenance operations are run against the job and archive tables. | ||
maintenanceIntervalMinutes: 1, | ||
}; | ||
|
||
if (opts.ssl) { | ||
const sslOptions: tls.ConnectionOptions = { | ||
rejectUnauthorized: false, | ||
}; | ||
|
||
// Necessary only if the server uses a self-signed certificate. | ||
if (opts.ssl.caPath) { | ||
sslOptions.ca = await readFile(opts.ssl.caPath, 'utf8'); | ||
} | ||
|
||
// Necessary only if the server requires client certificate authentication. | ||
if (opts.ssl.certPath) { | ||
sslOptions.cert = await readFile(opts.ssl.certPath, 'utf8'); | ||
} | ||
|
||
if (opts.ssl.keyPath) { | ||
sslOptions.key = await readFile(opts.ssl.keyPath, 'utf8'); | ||
} | ||
|
||
config.ssl = sslOptions; | ||
} | ||
|
||
const boss = new PgBoss(config); | ||
|
||
boss.on('error', (error) => fastify.log.error(error, 'PgBoss error')); | ||
|
||
await boss.start(); | ||
|
||
boss.on('wip', (data) => { | ||
const progress = data.filter((worker) => worker.state === 'active').length; | ||
const failed = data.filter((worker) => worker.state === 'failed').length; | ||
// @ts-ignore https://github.com/timgit/pg-boss/issues/422 | ||
const stopping = data.filter((worker) => worker.state === 'stopping').length; | ||
|
||
fastify.log.debug({ progress, stopping, failed }, `PgBoss Worker report`); | ||
}); | ||
|
||
fastify.addHook('onClose', async () => { | ||
const destroy = process.env.NODE_ENV !== 'production'; | ||
fastify.log.info({ gracePeriod: '30s', destroy }, 'Shutting down PgBoss ...'); | ||
|
||
const stopOptions: PgBoss.StopOptions = { | ||
timeout: 30_000, | ||
graceful: true, | ||
destroy, | ||
}; | ||
await boss.stop(stopOptions); | ||
|
||
// Wait until pgBoss has gracefully stopped. | ||
// https://github.com/timgit/pg-boss/issues/421 | ||
await new Promise((resolve) => { | ||
boss.once('stopped', () => { | ||
fastify.log.info('PgBoss stopped'); | ||
resolve(undefined); | ||
}); | ||
}); | ||
|
||
fastify.log.info('PgBoss shutdown complete'); | ||
}); | ||
|
||
fastify.decorate<PgBoss>('pgboss', boss); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import PgBoss from 'pg-boss'; | ||
|
||
interface RegisterTrafficAnalyzerOptions { | ||
graphId: string; | ||
} | ||
|
||
interface TrafficAnalyzerJob { | ||
graphId: string; | ||
type: 'traffic/analyzer'; | ||
} | ||
|
||
export default class TrafficAnalyzerWorker { | ||
constructor(private boss: PgBoss) {} | ||
|
||
/** | ||
* Register a new graph to be analyzed. This method is idempotent. | ||
* @param opts | ||
*/ | ||
public async register(opts: RegisterTrafficAnalyzerOptions): Promise<void> { | ||
const queue = `traffic/analyzer/graph/${opts.graphId}`; | ||
// Create a cron job that runs every 5 minute. | ||
await this.boss.schedule(queue, '*/5 * * * *', { type: 'traffic', graphId: opts.graphId }); | ||
} | ||
|
||
/** | ||
* Subscribe to the traffic analyzer queue with a max concurrency of 100 | ||
* and a team size of 10.sc | ||
*/ | ||
public async subscribe(): Promise<void> { | ||
await this.boss.work<TrafficAnalyzerJob>( | ||
`traffic/analyzer/graph/*`, | ||
{ teamSize: 10, teamConcurrency: 100 }, | ||
(job) => this.handler(job), | ||
); | ||
} | ||
|
||
/** | ||
* Handle a traffic analyzer job. | ||
* @param event | ||
*/ | ||
// eslint-disable-next-line require-await | ||
public async handler(event: PgBoss.Job<TrafficAnalyzerJob>): Promise<void> { | ||
// TODO: Implement me! | ||
console.log(event); | ||
} | ||
} |
Oops, something went wrong.