Skip to content

Commit

Permalink
pghoard: move DownloadResultsProcessor start and stop calls from WebS…
Browse files Browse the repository at this point in the history
…erver to PGHoard
  • Loading branch information
egor-voynov-aiven committed Apr 3, 2024
1 parent 94b795f commit 300980d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 16 deletions.
9 changes: 8 additions & 1 deletion pghoard/pghoard.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from pghoard.receivexlog import PGReceiveXLog
from pghoard.transfer import (TransferAgent, TransferQueue, UploadEvent, UploadEventProgressTracker)
from pghoard.walreceiver import WALReceiver
from pghoard.webserver import WebServer
from pghoard.webserver import DownloadResultsProcessor, WebServer


@dataclass
Expand Down Expand Up @@ -149,6 +149,10 @@ def __init__(self, config_path):
self.webserver = WebServer(
self.config, self.requested_basebackup_sites, self.compression_queue, self.transfer_queue, self.metrics
)
self.download_results_processor = DownloadResultsProcessor(
self.config, self.webserver.lock, self.webserver.download_results, self.webserver.pending_download_ops,
self.webserver.prefetch_404
)

self.wal_file_deleter = WALFileDeleterThread(
config=self.config, wal_file_deletion_queue=self.wal_file_deletion_queue, metrics=self.metrics
Expand Down Expand Up @@ -701,6 +705,7 @@ def start_threads_on_startup(self):
self.inotify.start()
self.upload_tracker.start()
self.webserver.start()
self.download_results_processor.start()
self.wal_file_deleter.start()
for compressor in self.compressors:
compressor.start()
Expand Down Expand Up @@ -983,6 +988,8 @@ def _get_all_threads(self):

if hasattr(self, "webserver"):
all_threads.append(self.webserver)
if hasattr(self, "download_results_processor"):
all_threads.append(self.download_results_processor)
all_threads.extend(self.basebackups.values())
all_threads.extend(self.receivexlogs.values())
all_threads.extend(self.walreceivers.values())
Expand Down
22 changes: 8 additions & 14 deletions pghoard/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from queue import Empty, Queue
from socketserver import ThreadingMixIn
from threading import RLock
from typing import Dict
from typing import Any, Dict

from rohmu.errors import Error, FileNotFoundFromStorageError

Expand Down Expand Up @@ -102,16 +102,19 @@ class DownloadResultsProcessor(PGHoardThread):
Processes download_results queue, validates WAL and renames tmp file to target (".prefetch")
"""
def __init__(
self, lock: RLock, log: logging.Logger, download_results: Queue, pending_download_ops: Dict[str, PendingDownloadOp],
prefetch_404: deque
self, config: Dict[str, Any], lock: RLock, download_results: Queue,
pending_download_ops: Dict[str, PendingDownloadOp], prefetch_404: deque
) -> None:
super().__init__(name=self.__class__.__name__)
self.running = False
self.log = log
self.log = logging.getLogger("WebServer")
self.lock = lock
self.download_results = download_results
self.pending_download_ops = pending_download_ops
self.prefetch_404 = prefetch_404
# PGHoard expects the threads to have these attributes
self.running = False
self.config = config
self.site_transfers = {}

def run_safe(self) -> None:
self.running = True
Expand Down Expand Up @@ -174,9 +177,6 @@ def process_queue_item(self, download_result: CallbackEvent) -> None:
pending_download_op.target_path, metadata.get("host"), metadata.get("hash-algorithm"), metadata.get("hash")
)

def stop(self) -> None:
self.running = False


class WebServer(PGHoardThread):
def __init__(self, config, requested_basebackup_sites, compression_queue, transfer_queue, metrics):
Expand All @@ -197,9 +197,6 @@ def __init__(self, config, requested_basebackup_sites, compression_queue, transf
self.log.debug("WebServer initialized with address: %r port: %r", self.address, self.port)
self.is_initialized = threading.Event()
self.prefetch_404 = deque(maxlen=32) # pylint: disable=attribute-defined-outside-init
self.download_results_processor = DownloadResultsProcessor(
self.lock, self.log, self.download_results, self.pending_download_ops, self.prefetch_404
)

def run_safe(self):
# We bind the port only when we start running
Expand All @@ -216,16 +213,13 @@ def run_safe(self):
download_results=self.download_results,
prefetch_404=self.prefetch_404,
metrics=self.metrics)
self.download_results_processor.start()
self.is_initialized.set()
self.server.serve_forever()

def close(self):
self.log.debug("Closing WebServer")
if self.server:
self.server.shutdown()
self.download_results_processor.stop()
self.download_results_processor.join()
self.log.debug("Closed WebServer")
self._running = False

Expand Down
2 changes: 1 addition & 1 deletion test/test_webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ def test_uncontrolled_target_path(self, pghoard):

@pytest.fixture(name="download_results_processor")
def fixture_download_results_processor() -> DownloadResultsProcessor:
return DownloadResultsProcessor(threading.RLock(), logging.getLogger("WebServer"), Queue(), {}, deque())
return DownloadResultsProcessor({}, threading.RLock(), Queue(), {}, deque())


class TestDownloadResultsProcessor:
Expand Down

0 comments on commit 300980d

Please sign in to comment.