Skip to content

Commit

Permalink
pghoard: fix typing issues for webserver.py
Browse files Browse the repository at this point in the history
  • Loading branch information
egor-voynov-aiven committed Mar 26, 2024
1 parent abd4650 commit 6558c26
Showing 1 changed file with 56 additions and 24 deletions.
80 changes: 56 additions & 24 deletions pghoard/webserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ class OwnHTTPServer(PoolMixIn, HTTPServer):
pool = ThreadPoolExecutor(max_workers=10)
requested_basebackup_sites = None

def __init__(self, server_address, RequestHandlerClass):
def __init__(
self, server_address, request_handler, *, config, log, requested_basebackup_sites, compression_queue, transfer_queue,
lock, pending_download_ops, download_results, prefetch_404, metrics
):
# to avoid any kind of regression where the server address is not a legal ip address, catch any ValueError
try:
# specifying an empty http_address will make pghoard listen on all IPV4 addresses,
Expand All @@ -59,7 +62,26 @@ def __init__(self, server_address, RequestHandlerClass):
self.address_family = socket.AF_INET6
except ValueError:
pass
HTTPServer.__init__(self, server_address, RequestHandlerClass)
HTTPServer.__init__(self, server_address, request_handler)

self.config = config
self.log = log
self.requested_basebackup_sites = requested_basebackup_sites
self.compression_queue = compression_queue
self.transfer_queue = transfer_queue
self.lock = lock
self.pending_download_ops = pending_download_ops
self.download_results = download_results
self.most_recently_served_files = {}
# Bounded list of files returned from local disk. Sometimes the file on disk is in some way "bad"
# and PostgreSQL doesn't accept it and keeps on requesting it again. If the file was recently served
# from disk serve it from file storage instead because the file there could be different.
self.served_from_disk = deque(maxlen=10)
# Bounded negative cache for failed prefetch operations - we don't want to try prefetching files that
# aren't there. This isn't used for explicit download requests as it's possible that a file appears
# later on in the object store.
self.prefetch_404 = prefetch_404
self.metrics = metrics


class HttpResponse(Exception):
Expand Down Expand Up @@ -102,21 +124,23 @@ def run_safe(self) -> None:
self.log.exception("Unhandled exception in %s", self.__class__.__name__)

def process_queue_item(self, download_result: CallbackEvent) -> None:
key = download_result.opaque
key = str(download_result.opaque)
if not isinstance(download_result.payload, dict) or "target_path" not in download_result.payload:
raise RuntimeError(f"Invalid payload in callback event: {download_result}, payload: {download_result.payload}")
src_tmp_file_path = download_result.payload["target_path"]
with self.lock:
op = self.pending_download_ops.pop(key, None)
if not op:
self.log.warning("Orphaned download operation %r completed: %r", key, download_result)
if download_result.success:
with suppress(OSError):
os.unlink(download_result.payload["target_path"])
os.unlink(src_tmp_file_path)
return
if download_result.success:
if os.path.isfile(op.target_path):
self.log.warning("Target path for %r already exists, skipping", key)
return
# verify wal
src_tmp_file_path = download_result.payload["target_path"]
dst_file_path = op.target_path
if op.filetype == "xlog":
try:
Expand Down Expand Up @@ -174,25 +198,32 @@ def __init__(self, config, requested_basebackup_sites, compression_queue, transf
def run_safe(self):
# We bind the port only when we start running
self._running = True
self.server = OwnHTTPServer((self.address, self.port), RequestHandler)
self.server.config = self.config # pylint: disable=attribute-defined-outside-init
self.server.log = self.log # pylint: disable=attribute-defined-outside-init
self.server.requested_basebackup_sites = self.requested_basebackup_sites
self.server.compression_queue = self.compression_queue # pylint: disable=attribute-defined-outside-init
self.server.transfer_queue = self.transfer_queue # pylint: disable=attribute-defined-outside-init
self.server.lock = self.lock # pylint: disable=attribute-defined-outside-init
self.server.pending_download_ops = self.pending_download_ops # pylint: disable=attribute-defined-outside-init
self.server.download_results = self.download_results # pylint: disable=attribute-defined-outside-init
self.server.most_recently_served_files = {} # pylint: disable=attribute-defined-outside-init
# Bounded list of files returned from local disk. Sometimes the file on disk is in some way "bad"
# and PostgreSQL doesn't accept it and keeps on requesting it again. If the file was recently served
# from disk serve it from file storage instead because the file there could be different.
self.server.served_from_disk = deque(maxlen=10) # pylint: disable=attribute-defined-outside-init
# Bounded negative cache for failed prefetch operations - we don't want to try prefetching files that
# aren't there. This isn't used for explicit download requests as it's possible that a file appears
# later on in the object store.
self.server.prefetch_404 = self.prefetch_404 # pylint: disable=attribute-defined-outside-init
self.server.metrics = self.metrics # pylint: disable=attribute-defined-outside-init
self.server = OwnHTTPServer((self.address, self.port),
RequestHandler,
config=self.config,
log=self.log,
requested_basebackup_sites=self.requested_basebackup_sites,
compression_queue=self.compression_queue,
transfer_queue=self.transfer_queue,
lock=self.lock,
pending_download_ops=self.pending_download_ops,
download_results=self.download_results,
prefetch_404=self.prefetch_404,
metrics=self.metrics)
'''
self.config = self.config # pylint: disable=attribute-defined-outside-init
self.log = self.log # pylint: disable=attribute-defined-outside-init
self.requested_basebackup_sites = self.requested_basebackup_sites
self.compression_queue = self.compression_queue # pylint: disable=attribute-defined-outside-init
self.transfer_queue = self.transfer_queue # pylint: disable=attribute-defined-outside-init
self.lock = self.lock # pylint: disable=attribute-defined-outside-init
self.pending_download_ops = self.pending_download_ops # pylint: disable=attribute-defined-outside-init
self.download_results = self.download_results # pylint: disable=attribute-defined-outside-init
self.most_recently_served_files = {} # pylint: disable=attribute-defined-outside-init
self.served_from_disk = deque(maxlen=10) # pylint: disable=attribute-defined-outside-init
self.prefetch_404 = self.prefetch_404 # pylint: disable=attribute-defined-outside-init
self.metrics = self.metrics # pylint: disable=attribute-defined-outside-init
'''
self.download_results_processor.start()
self.is_initialized.set()
self.server.serve_forever()
Expand Down Expand Up @@ -227,6 +258,7 @@ def running(self, value):
class RequestHandler(BaseHTTPRequestHandler):
disable_nagle_algorithm = True
server_version = "pghoard/" + __version__
server: OwnHTTPServer

@contextmanager
def _response_handler(self, method):
Expand Down

0 comments on commit 6558c26

Please sign in to comment.