diff --git a/pghoard/webserver.py b/pghoard/webserver.py index b23f573e..39e21fbd 100644 --- a/pghoard/webserver.py +++ b/pghoard/webserver.py @@ -48,7 +48,10 @@ class OwnHTTPServer(PoolMixIn, HTTPServer): pool = ThreadPoolExecutor(max_workers=10) requested_basebackup_sites = None - def __init__(self, server_address, RequestHandlerClass): + def __init__( + self, server_address, request_handler, *, config, log, requested_basebackup_sites, compression_queue, transfer_queue, + lock, pending_download_ops, download_results, prefetch_404, metrics + ): # to avoid any kind of regression where the server address is not a legal ip address, catch any ValueError try: # specifying an empty http_address will make pghoard listen on all IPV4 addresses, @@ -59,7 +62,26 @@ def __init__(self, server_address, RequestHandlerClass): self.address_family = socket.AF_INET6 except ValueError: pass - HTTPServer.__init__(self, server_address, RequestHandlerClass) + HTTPServer.__init__(self, server_address, request_handler) + + self.config = config + self.log = log + self.requested_basebackup_sites = requested_basebackup_sites + self.compression_queue = compression_queue + self.transfer_queue = transfer_queue + self.lock = lock + self.pending_download_ops = pending_download_ops + self.download_results = download_results + self.most_recently_served_files = {} + # Bounded list of files returned from local disk. Sometimes the file on disk is in some way "bad" + # and PostgreSQL doesn't accept it and keeps on requesting it again. If the file was recently served + # from disk serve it from file storage instead because the file there could be different. + self.served_from_disk = deque(maxlen=10) + # Bounded negative cache for failed prefetch operations - we don't want to try prefetching files that + # aren't there. This isn't used for explicit download requests as it's possible that a file appears + # later on in the object store. + self.prefetch_404 = prefetch_404 + self.metrics = metrics class HttpResponse(Exception): @@ -102,21 +124,23 @@ def run_safe(self) -> None: self.log.exception("Unhandled exception in %s", self.__class__.__name__) def process_queue_item(self, download_result: CallbackEvent) -> None: - key = download_result.opaque + key = str(download_result.opaque) + if not isinstance(download_result.payload, dict) or "target_path" not in download_result.payload: + raise RuntimeError(f"Invalid payload in callback event: {download_result}, payload: {download_result.payload}") + src_tmp_file_path = download_result.payload["target_path"] with self.lock: op = self.pending_download_ops.pop(key, None) if not op: self.log.warning("Orphaned download operation %r completed: %r", key, download_result) if download_result.success: with suppress(OSError): - os.unlink(download_result.payload["target_path"]) + os.unlink(src_tmp_file_path) return if download_result.success: if os.path.isfile(op.target_path): self.log.warning("Target path for %r already exists, skipping", key) return # verify wal - src_tmp_file_path = download_result.payload["target_path"] dst_file_path = op.target_path if op.filetype == "xlog": try: @@ -174,25 +198,18 @@ def __init__(self, config, requested_basebackup_sites, compression_queue, transf def run_safe(self): # We bind the port only when we start running self._running = True - self.server = OwnHTTPServer((self.address, self.port), RequestHandler) - self.server.config = self.config # pylint: disable=attribute-defined-outside-init - self.server.log = self.log # pylint: disable=attribute-defined-outside-init - self.server.requested_basebackup_sites = self.requested_basebackup_sites - self.server.compression_queue = self.compression_queue # pylint: disable=attribute-defined-outside-init - self.server.transfer_queue = self.transfer_queue # pylint: disable=attribute-defined-outside-init - self.server.lock = self.lock # pylint: disable=attribute-defined-outside-init - self.server.pending_download_ops = self.pending_download_ops # pylint: disable=attribute-defined-outside-init - self.server.download_results = self.download_results # pylint: disable=attribute-defined-outside-init - self.server.most_recently_served_files = {} # pylint: disable=attribute-defined-outside-init - # Bounded list of files returned from local disk. Sometimes the file on disk is in some way "bad" - # and PostgreSQL doesn't accept it and keeps on requesting it again. If the file was recently served - # from disk serve it from file storage instead because the file there could be different. - self.server.served_from_disk = deque(maxlen=10) # pylint: disable=attribute-defined-outside-init - # Bounded negative cache for failed prefetch operations - we don't want to try prefetching files that - # aren't there. This isn't used for explicit download requests as it's possible that a file appears - # later on in the object store. - self.server.prefetch_404 = self.prefetch_404 # pylint: disable=attribute-defined-outside-init - self.server.metrics = self.metrics # pylint: disable=attribute-defined-outside-init + self.server = OwnHTTPServer((self.address, self.port), + RequestHandler, + config=self.config, + log=self.log, + requested_basebackup_sites=self.requested_basebackup_sites, + compression_queue=self.compression_queue, + transfer_queue=self.transfer_queue, + lock=self.lock, + pending_download_ops=self.pending_download_ops, + download_results=self.download_results, + prefetch_404=self.prefetch_404, + metrics=self.metrics) self.download_results_processor.start() self.is_initialized.set() self.server.serve_forever() @@ -227,6 +244,7 @@ def running(self, value): class RequestHandler(BaseHTTPRequestHandler): disable_nagle_algorithm = True server_version = "pghoard/" + __version__ + server: OwnHTTPServer @contextmanager def _response_handler(self, method):