Skip to content

Commit

Permalink
✨ Group TasksInState and Queue metric by task type
Browse files Browse the repository at this point in the history
  • Loading branch information
pajowu committed Nov 21, 2023
1 parent 0472589 commit 82b6463
Showing 1 changed file with 24 additions and 11 deletions.
35 changes: 24 additions & 11 deletions backend/transcribee_backend/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from prometheus_client import Gauge
from sqlmodel import Session, col, func
from transcribee_proto.api import TaskType

from transcribee_backend.config import settings
from transcribee_backend.db import SessionContextManager
Expand All @@ -25,15 +26,21 @@ def refresh(self, session: Session):

class TasksInState(Metric):
def __init__(self):
self.collector = Gauge("tasks", "Number of tasks", ["state"])
self.collector = Gauge("tasks", "Number of tasks", ["state", "task_type"])

def refresh(self, session: Session):
result = session.query(Task.state, func.count()).group_by(Task.state).all()
counts = {x: 0 for x in TaskState}
for state, count in result:
counts[state] = count
for state, count in counts.items():
self.collector.labels(state=state.value).set(count)
result = (
session.query(Task.state, Task.task_type, func.count())
.group_by(Task.state, Task.task_type)
.all()
)
counts = {(x, y): 0 for x in TaskState for y in TaskType}
for state, task_type, count in result:
counts[(state, task_type)] = count
for (state, task_type), count in counts.items():
self.collector.labels(state=state.value, task_type=task_type.value).set(
count
)


class Workers(Metric):
Expand Down Expand Up @@ -76,11 +83,12 @@ def refresh(self, session: Session):

class Queue(Metric):
def __init__(self):
self.collector = Gauge("queue", "Queue length in seconds")
self.collector = Gauge("queue", "Queue length in seconds", ["task_type"])

def refresh(self, session: Session):
(result,) = (
result = (
session.query(
Task.task_type,
func.coalesce(
func.sum(
Document.duration * (1 - func.coalesce(TaskAttempt.progress, 0))
Expand All @@ -90,9 +98,14 @@ def refresh(self, session: Session):
)
.join(Task, Task.document_id == Document.id)
.join(TaskAttempt, Task.current_attempt_id == TaskAttempt.id, isouter=True)
.group_by(Task.task_type)
.where(col(Task.state).in_(["NEW", "ASSIGNED"]))
).one()
self.collector.set(result)
).all()
counts = {x: 0 for x in TaskType}
for task_type, count in result:
counts[task_type] = count
for task_type, count in counts.items():
self.collector.labels(task_type=task_type.value).set(count)


METRIC_CLASSES: List[type[Metric]] = [TasksInState, Workers, Users, Documents, Queue]
Expand Down

0 comments on commit 82b6463

Please sign in to comment.