Skip to content

Commit

Permalink
pghoard: change DownloadResultsProcessor start/stop approach
Browse files Browse the repository at this point in the history
  • Loading branch information
egor-voynov-aiven committed Apr 4, 2024
1 parent a5b43bf commit 1f798a6
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __init__(self, config_path):
self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics
)
self.download_results_processor = DownloadResultsProcessor(
self.config, self.webserver.lock, self.webserver.download_results, self.webserver.pending_download_ops,
self.webserver.lock, self.webserver.download_results, self.webserver.pending_download_ops,
self.webserver.prefetch_404
)

Expand Down
33 changes: 22 additions & 11 deletions pghoard/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,32 +101,43 @@ class DownloadResultsProcessor(PGHoardThread):
"""
Processes download_results queue, validates WAL and renames tmp file to target (".prefetch")
"""
class StopMarker:
pass

def __init__(
self, config: Dict[str, Any], lock: RLock, download_results: Queue,
pending_download_ops: Dict[str, PendingDownloadOp], prefetch_404: deque
self, lock: RLock, download_results: Queue, pending_download_ops: Dict[str, PendingDownloadOp], prefetch_404: deque
) -> None:
super().__init__(name=self.__class__.__name__)
self.log = logging.getLogger("WebServer")
self.lock = lock
self.download_results = download_results
self.pending_download_ops = pending_download_ops
self.prefetch_404 = prefetch_404
# PGHoard expects the threads to have these attributes
self.running = False
self.config = config
self.site_transfers: Dict[str, Any] = {}
self._running = False

def run_safe(self) -> None:
self.running = True
while self.running:
self._running = True
while True:
item = self.download_results.get(block=True)
if item is self.StopMarker:
break
try:
item = self.download_results.get(block=True, timeout=0.1)
self.process_queue_item(item)
except Empty:
pass
except Exception: # pylint: disable=broad-except
self.log.exception("Unhandled exception in %s", self.__class__.__name__)

@property
def running(self) -> bool:
return self._running

@running.setter
def running(self, value: bool) -> None:
if self._running == value:
return
if not value:
self.download_results.put(self.StopMarker)
self._running = value

def process_queue_item(self, download_result: CallbackEvent) -> None:
key = str(download_result.opaque)
pending_download_op = self.pending_download_ops.pop(key, None)
Expand Down
2 changes: 1 addition & 1 deletion test/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ def test_uncontrolled_target_path(self, pghoard):

@pytest.fixture(name="download_results_processor")
def fixture_download_results_processor() -> DownloadResultsProcessor:
return DownloadResultsProcessor({}, threading.RLock(), Queue(), {}, deque())
return DownloadResultsProcessor(threading.RLock(), Queue(), {}, deque())


class TestDownloadResultsProcessor:
Expand Down

0 comments on commit 1f798a6

Please sign in to comment.