Skip to content

Commit

Permalink
feat: job time metric; fix: queue size metric
Browse files Browse the repository at this point in the history
  • Loading branch information
sanoel committed Oct 31, 2024
1 parent a81c871 commit 68d19e4
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 77 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@oada/jobs",
"version": "4.7.1",
"version": "4.7.2",
"description": "A library for oada job based microservices",
"source": "src/index.ts",
"main": "dist/index.js",
Expand Down Expand Up @@ -80,6 +80,7 @@
"moment": "^2.30.1",
"p-queue": "^8.0.1",
"p-timeout": "^6.1.2",
"perf_hooks": "^0.0.1",
"serialize-error": "^11.0.3",
"tiny-json-http": "^7.5.1",
"tslib": "^2.7.0",
Expand Down
38 changes: 13 additions & 25 deletions src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import '@oada/pino-debug';
import PQueue from 'p-queue';
import moment from 'moment';

Expand Down Expand Up @@ -49,22 +49,9 @@ export class Queue {
_service: Service,
private readonly _id: string,
) {
// , token?: string) {
this.#service = _service;
this.#oada = _service.getClient(); // .clone(token ?? '');
this.#queue = new PQueue({ concurrency: this.#service.concurrency });
/*
If (typeof domainOrOada === 'string') {
this.oada = service.getClient().clone(token ?? '');
} else {
debug(
'[Queue ',
id,
']: Using default existing OADA connection for default queue'
);
this.oada = domainOrOada;
}
*/
}

/**
Expand Down Expand Up @@ -196,18 +183,19 @@ export class Queue {
*/
async #doJobs(jobs: OADAJobs | OADAJobsChange): Promise<void> {
// Queue up the Runners in parallel
for (const [jobKey, value] of Object.entries(jobs)) {
void this.#queue.add(async () => {
const { _id } = value as Link;
if (!_id) return;
// Fetch the job
const { job, isJob } = await Job.fromOada(this.#oada, _id);
this.#service.metrics.inc({
service: this.#service.name,
type: job.type,
state: 'queued',
});
for await (const [jobKey, value] of Object.entries(jobs)) {
const { _id } = value as Link;
if (!_id) return;
// Fetch the job
const { job, isJob } = await Job.fromOada(this.#oada, _id);

this.#service.metrics.jobs.inc({
service: this.#service.name,
type: job.type,
state: 'queued',
});

void this.#queue.add(async () => {
// Instantiate a runner to manage the job
const runner = new Runner(this.#service, jobKey, job, this.#oada);

Expand Down
2 changes: 1 addition & 1 deletion src/Report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import '@oada/pino-debug';
import { CronJob } from 'cron';
import clone from 'clone-deep';
import debug from 'debug';
Expand Down
23 changes: 19 additions & 4 deletions src/Runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type { Json, JsonCompatible } from './index.js';
import { debug, error, info, trace } from './utils.js';
import { Job } from './Job.js';
import type { Logger } from '@oada/pino-debug';
import { performance } from 'perf_hooks';
import type { Service } from './Service.js';
import { tree } from './tree.js';

Expand All @@ -49,6 +50,7 @@ export class Runner {
readonly #job: Job;
readonly #oada: OADAClient;
readonly #log: Logger;
#startTime: undefined | number;

/**
* Create a Runner
Expand Down Expand Up @@ -77,7 +79,7 @@ export class Runner {
* appropriate.
*/
public async run(): Promise<void> {
this.#service.metrics.inc({
this.#service.metrics.jobs.inc({
service: this.#service.name,
type: this.#job.type,
state: 'running',
Expand Down Expand Up @@ -112,6 +114,7 @@ export class Runner {
// Annotate the Runner finishing
await this.postUpdate('started', 'Runner started');
trace('Update posted');
this.#startTime = performance.now();

// NOTE: pTimeout will reject after `worker.timeout` ms and attempt
// `cancel()` the promise returned by `worker.work`; However, if that
Expand All @@ -131,13 +134,25 @@ export class Runner {

info(`[job ${this.#job.oadaId}] Successful`);
await this.finish('success', r, moment());
const duration = performance.now() - this.#startTime;
this.#service.metrics['job-times'].observe({
service: this.#service.name,
type: this.#job.type,
status: 'success',
}, duration);
} catch (error_: any) {
error(`[job ${this.#job.oadaId}] Failed`);
trace(`[job ${this.#job.oadaId}] Error: %O`, error_);

await (error_ instanceof TimeoutError
? this.finish('failure', error_ as unknown as Json, moment(), 'timeout')
: this.finish('failure', error_, moment(), error_.JobError));
const duration = performance.now() - this.#startTime!;
this.#service.metrics['job-times'].observe({
service: this.#service.name,
type: this.#job.type,
status: 'failure',
}, duration);
}
}

Expand Down Expand Up @@ -252,20 +267,20 @@ export class Runner {
);

// Decrement queued job count?
this.#service.metrics.dec({
this.#service.metrics.jobs.dec({
service: this.#service.name,
type: this.#job.type,
state: 'queued',
});
// Decrement running job count
this.#service.metrics.dec({
this.#service.metrics.jobs.dec({
service: this.#service.name,
type: this.#job.type,
state: 'running',
});

// Increment successes or failures based on status
this.#service.metrics.inc({
this.#service.metrics.jobs.inc({
service: this.#service.name,
type: this.#job.type,
state: status,
Expand Down
81 changes: 36 additions & 45 deletions src/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

import '@oada/pino-debug';
import { Gauge, Histogram } from '@oada/lib-prom';
import { type Logger, pino } from '@oada/pino-debug';

import type { Config } from '@oada/client';
import { Gauge } from '@oada/lib-prom';
import { OADAClient } from '@oada/client';
import { assert as assertQueue } from '@oada/types/oada/service/queue.js';

Expand Down Expand Up @@ -131,11 +132,40 @@ export class Service {
this.token = this.#oada.getToken()[0]!;
this.concurrency = object.concurrency ?? this.#oada.getConcurrency();
// TODO: Get total pending jobs in collect callback?
this.metrics = new Gauge({
name: 'oada_jobs_total',
help: 'Number of OADA jobs',
labelNames: ['service', 'type', 'state'] as const,
});
this.metrics = {
'jobs': new Gauge({
name: 'oada_jobs_total',
help: 'Number of OADA jobs',
labelNames: ['service', 'type', 'state'] as const,
}),
'job-times': new Histogram({
name: 'job-times',
help: 'Histogram of job times',
labelNames: ['service', 'type', 'status'] as const,
buckets: [
1, // 1 second
2, // 2 seconds
4, // 4 seconds
8, // 8 seconds
16, // 16 seconds
32, // 32 seconds
64, // 1.06 minutes
128, // 2.13 minutes
256, // 4.26 minutes
512, // 8.53 minutes
1024, // 17.07 minutes
2048, // 34.13 minutes
4096, // 1.14 hours
8192, // 2.28 hours
16384, // 4.55 hours
32768, // 9.1 hours
65536, // 18.2 hours
131072, // 1.52 days
262144, // 3.04 days
524288 // 6.08 days
]
})
};

if (object.opts) {
this.opts = object.opts;
Expand Down Expand Up @@ -275,43 +305,4 @@ export class Service {
this.log.debug('Unable to stop queue %0', error_);
}
}

/*
Async #initTotalMetrics(): Promise<void> {
const date = new Date().toISOString().split('T')[0];
for await (const status of ['success', 'failure']) {
try {
const { data } = await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}`,
});
const keys = Object.keys(data as Record<string, any>).filter(
(key) => !key.startsWith('_'),
);
this.metrics.set(keys.length);
} catch {
this.metrics.set(0);
}
}
}
async #initTypedMetrics(type: string): Promise<void> {
const mType = type.replaceAll('-', '_').replaceAll(' ', '_');
const date = new Date().toISOString().split('T')[0];
for await (const status of ['success', 'failure']) {
try {
this.metrics[`${this.name}_${status}_${mType}`].set(0);
const { data } = await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}`,
});
for await (const job of Object.keys(data as Record<string, any>)) {
const { data: index } = (await this.#oada.get({
path: `/bookmarks/services/${this.name}/jobs/${status}/day-index/${date}/${job}`,
})) as unknown as { data: { j: string; [k: string]: any } };
if (index.type === type)
this.metrics[`${this.name}_${status}_${mType}`].inc();
}
} catch {}
}
}
*/
}
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import '@oada/pino-debug';
import _debug from 'debug';

import type { ConnectionResponse, Json, OADAClient } from '@oada/client';
Expand Down
8 changes: 8 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ __metadata:
moment: "npm:^2.30.1"
p-queue: "npm:^8.0.1"
p-timeout: "npm:^6.1.2"
perf_hooks: "npm:^0.0.1"
prettier: "npm:^3.3.3"
serialize-error: "npm:^11.0.3"
tiny-json-http: "npm:^7.5.1"
Expand Down Expand Up @@ -5779,6 +5780,13 @@ __metadata:
languageName: node
linkType: hard

"perf_hooks@npm:^0.0.1":
version: 0.0.1
resolution: "perf_hooks@npm:0.0.1"
checksum: 10/a38f6397943cd55b7780e1ae504f187d218a4c986afcdd674a68fc4937aa415f29e3e007c26e498b55e277052f1bee3028b348ce7509ad1a47c2e8d0d381bd87
languageName: node
linkType: hard

"picocolors@npm:^1.0.1":
version: 1.1.0
resolution: "picocolors@npm:1.1.0"
Expand Down

0 comments on commit 68d19e4

Please sign in to comment.