From 6e9f31d1e93904542ab809b3d0f5d00b340dcc0d Mon Sep 17 00:00:00 2001 From: Weves Date: Mon, 20 Nov 2023 16:41:51 -0800 Subject: [PATCH] Fix ResourceLogger blocking main thread --- backend/danswer/background/indexing/dask_utils.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/backend/danswer/background/indexing/dask_utils.py b/backend/danswer/background/indexing/dask_utils.py index aae91e24db5..84335041dc4 100644 --- a/backend/danswer/background/indexing/dask_utils.py +++ b/backend/danswer/background/indexing/dask_utils.py @@ -1,4 +1,4 @@ -import time +import asyncio import psutil from dask.distributed import WorkerPlugin @@ -18,8 +18,11 @@ def setup(self, worker: Worker) -> None: self.worker = worker worker.loop.add_callback(self.log_resources) - def log_resources(self) -> None: - """Periodically log CPU and memory usage.""" + async def log_resources(self) -> None: + """Periodically log CPU and memory usage. + + NOTE: must be async or else will clog up the worker indefinitely due to the fact that + Dask uses Tornado under the hood (which is async)""" while True: cpu_percent = psutil.cpu_percent(interval=None) memory_available_gb = psutil.virtual_memory().available / (1024.0**3) @@ -27,4 +30,4 @@ def log_resources(self) -> None: logger.debug( f"Worker {self.worker.address}: CPU usage {cpu_percent}%, Memory available {memory_available_gb}GB" ) - time.sleep(self.log_interval) + await asyncio.sleep(self.log_interval)