Skip to content

Commit

Permalink
sync metrics updates
Browse files Browse the repository at this point in the history
  • Loading branch information
arikalon1 committed Oct 31, 2021
1 parent e1a51d3 commit 2dff223
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions src/robusta/utils/task_queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import threading
import time
from threading import Thread
from queue import Queue, Full
Expand Down Expand Up @@ -27,6 +28,7 @@ def __init_metrics(self):
self.metrics_thread = Thread(target=self.__report_metrics)
self.metrics_thread.daemon = True
self.metrics_thread.start()
self.metrics_lock = threading.Lock()

def __report_metrics(self):
while True:
Expand All @@ -47,10 +49,12 @@ def add_task(self, task, *args, **kwargs):
try:
self.put((task, args, kwargs), block=False)
except Full:
self.metrics.rejected += 1
with self.metrics_lock:
self.metrics.rejected += 1
return

self.metrics.queued += 1
with self.metrics_lock:
self.metrics.queued += 1

def __start_workers(self):
for i in range(self.num_workers):
Expand All @@ -61,8 +65,10 @@ def __start_workers(self):
def worker(self):
while True:
item, args, kwargs = self.get()
self.metrics.processed += 1
with self.metrics_lock:
self.metrics.processed += 1
start_time = time.time()
item(*args, **kwargs)
self.metrics.total_process_time += (time.time() - start_time)
with self.metrics_lock:
self.metrics.total_process_time += (time.time() - start_time)
self.task_done()

0 comments on commit 2dff223

Please sign in to comment.