From 82b6463fcaa7fae6d51d875e0c60d69a7b4a1161 Mon Sep 17 00:00:00 2001 From: pajowu Date: Tue, 21 Nov 2023 11:51:05 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Group=20TasksInState=20and=20Queue?= =?UTF-8?q?=20metric=20by=20task=20type?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/transcribee_backend/metrics.py | 35 ++++++++++++++++++-------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/backend/transcribee_backend/metrics.py b/backend/transcribee_backend/metrics.py index 651a5918..56b016f4 100644 --- a/backend/transcribee_backend/metrics.py +++ b/backend/transcribee_backend/metrics.py @@ -7,6 +7,7 @@ from fastapi.security import HTTPBasic, HTTPBasicCredentials from prometheus_client import Gauge from sqlmodel import Session, col, func +from transcribee_proto.api import TaskType from transcribee_backend.config import settings from transcribee_backend.db import SessionContextManager @@ -25,15 +26,21 @@ def refresh(self, session: Session): class TasksInState(Metric): def __init__(self): - self.collector = Gauge("tasks", "Number of tasks", ["state"]) + self.collector = Gauge("tasks", "Number of tasks", ["state", "task_type"]) def refresh(self, session: Session): - result = session.query(Task.state, func.count()).group_by(Task.state).all() - counts = {x: 0 for x in TaskState} - for state, count in result: - counts[state] = count - for state, count in counts.items(): - self.collector.labels(state=state.value).set(count) + result = ( + session.query(Task.state, Task.task_type, func.count()) + .group_by(Task.state, Task.task_type) + .all() + ) + counts = {(x, y): 0 for x in TaskState for y in TaskType} + for state, task_type, count in result: + counts[(state, task_type)] = count + for (state, task_type), count in counts.items(): + self.collector.labels(state=state.value, task_type=task_type.value).set( + count + ) class Workers(Metric): @@ -76,11 +83,12 @@ def refresh(self, session: Session): class Queue(Metric): def __init__(self): - self.collector = Gauge("queue", "Queue length in seconds") + self.collector = Gauge("queue", "Queue length in seconds", ["task_type"]) def refresh(self, session: Session): - (result,) = ( + result = ( session.query( + Task.task_type, func.coalesce( func.sum( Document.duration * (1 - func.coalesce(TaskAttempt.progress, 0)) @@ -90,9 +98,14 @@ def refresh(self, session: Session): ) .join(Task, Task.document_id == Document.id) .join(TaskAttempt, Task.current_attempt_id == TaskAttempt.id, isouter=True) + .group_by(Task.task_type) .where(col(Task.state).in_(["NEW", "ASSIGNED"])) - ).one() - self.collector.set(result) + ).all() + counts = {x: 0 for x in TaskType} + for task_type, count in result: + counts[task_type] = count + for task_type, count in counts.items(): + self.collector.labels(task_type=task_type.value).set(count) METRIC_CLASSES: List[type[Metric]] = [TasksInState, Workers, Users, Documents, Queue]