Skip to content

Commit

Permalink
Expose work queues status information (#38)
Browse files Browse the repository at this point in the history
Co-authored-by: Mike Drepin <[email protected]>
  • Loading branch information
rodgar-nvkz and Mike Drepin authored Jul 25, 2024
1 parent 2228970 commit 6847479
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
14 changes: 13 additions & 1 deletion metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,19 @@ def collect(self):
"type",
"work_pool_id",
"work_pool_name",
"status"
"status",
"healthy",
"late_runs_count",
"last_polled",
"health_check_policy_maximum_late_runs",
"health_check_policy_maximum_seconds_since_last_polled",
],
)

for work_queue in work_queues:
state = 0 if work_queue.get("is_paused") else 1
status_info = work_queue.get("status_info", {})
health_check_policy = status_info.get("health_check_policy", {})
prefect_info_work_queues.add_metric(
[
str(work_queue.get("created", "null")),
Expand All @@ -360,6 +367,11 @@ def collect(self):
str(work_queue.get("work_pool_id", "null")),
str(work_queue.get("work_pool_name", "null")),
str(work_queue.get("status", "null")),
str(status_info.get("healthy", "null")),
str(status_info.get("late_runs_count", "null")),
str(status_info.get("last_polled", "null")),
str(health_check_policy.get("maximum_late_runs", "null")),
str(health_check_policy.get("maximum_seconds_since_last_polled", "null")),
],
state,
)
Expand Down
34 changes: 34 additions & 0 deletions metrics/work_queues.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import uuid

import requests
import time

Expand Down Expand Up @@ -48,4 +50,36 @@ def get_work_queues_info(self) -> dict:
else:
break

work_queues_info = resp.json()
for queue_info in work_queues_info:
queue_info["status_info"] = self.get_work_queue_status_info(queue_info["id"])

return work_queues_info

def get_work_queue_status_info(self, work_queue_id: uuid.UUID) -> dict:
"""
Get status information for a specific work queue.
Args:
work_queue_id (uuid.UUID): The UUID of the work queue.
Returns:
dict: JSON response containing work queue status information.
"""
endpoint = f"{self.url}/{self.uri}/{work_queue_id}/status"

for retry in range(self.max_retries):
try:
resp = requests.get(endpoint, headers=self.headers)
resp.raise_for_status()

except requests.exceptions.HTTPError as err:
self.logger.error(err)
if retry >= self.max_retries - 1:
time.sleep(1)
raise SystemExit(err)
else:
break

return resp.json()

0 comments on commit 6847479

Please sign in to comment.