Skip to content

Commit

Permalink
Log mem usage and current task for every child worker when max_mem_pe…
Browse files Browse the repository at this point in the history
…rcent threshold reached
  • Loading branch information
alanhamlett committed Oct 9, 2023
1 parent b31bb9b commit d68c5db
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions wakaq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Child:
"done",
"soft_timeout",
"hard_timeout",
"current_task",
]

def __init__(self, pid, stdin, pingin, broadcastout):
Expand All @@ -68,6 +69,7 @@ def close(self):
close_fd(self.broadcastout)

def set_timeouts(self, wakaq, task=None, queue=None):
self.current_task = task
self.soft_timeout = wakaq.soft_timeout
self.hard_timeout = wakaq.hard_timeout
if task and task.soft_timeout:
Expand All @@ -79,6 +81,10 @@ def set_timeouts(self, wakaq, task=None, queue=None):
elif queue and queue.hard_timeout:
self.hard_timeout = queue.hard_timeout

@property
def mem_usage_percent(self):
return psutil.Process(self.pid).memory_percent()


class Worker:
__slots__ = [
Expand Down Expand Up @@ -449,25 +455,37 @@ def _check_max_mem_percent(self):
if not self.wakaq.max_mem_percent:
return
task_timeout = self.wakaq.hard_timeout or self.wakaq.soft_timeout or 120
if time.time() - self._max_mem_reached_at < task_timeout:
now = time.time()
if now - self._max_mem_reached_at < task_timeout:
return
if len(self.children) == 0:
return
percent_used = int(round(psutil.virtual_memory().percent))
if percent_used < self.wakaq.max_mem_percent:
return
self._max_mem_reached_at = time.time()
log.info(f"Mem usage {percent_used}% is more than max_mem_percent threshold ({self.wakaq.max_mem_percent}%)")
self._log_mem_usage_of_all_children()
self._max_mem_reached_at = now
child = self._child_using_most_mem()
if child:
log.info(
f"Mem usage {percent_used}% is more than max_mem_percent threshold ({self.wakaq.max_mem_percent}%)... stopping child process {child.pid}"
)
log.info(f"Stopping child process {child.pid}...")
child.soft_timeout_reached = True # prevent raising SoftTimeout twice for same child
kill(child.pid, signal.SIGQUIT)

def _log_mem_usage_of_all_children(self):
for child in self.children:
task = ""
if child.current_task:
task = f" while processing task {child.current_task.name}"
try:
log.info(f"Child process {child.pid} using {round(child.mem_usage_percent, 2)}% ram{task}")
except:
log.warning(f"Unable to get ram usage of child process {child.pid}{task}")
log.warning(traceback.format_exc())

def _child_using_most_mem(self):
try:
return max(self.children, lambda c: psutil.Process(c.pid).memory_percent())
return max(self.children, lambda c: c.mem_usage_percent)
except:
return random.choice(self.children)

Expand Down

0 comments on commit d68c5db

Please sign in to comment.